From d34c8d3e81a9baae02848984fbb84773556af4b5 Mon Sep 17 00:00:00 2001 From: Khyati Soneji Date: Wed, 24 Apr 2024 09:27:20 +0530 Subject: [PATCH 1/6] [WIP] IAM Assume Role chaining Credentials Provider --- kafka-connect-s3/pom.xml | 7 +- .../connect/s3/S3SinkConnectorConfig.java | 101 ++++++++++++++++-- .../iamAssume/AwsIAMAssumeRoleChaining.java | 99 +++++++++++++++++ 3 files changed, 197 insertions(+), 10 deletions(-) create mode 100644 kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java diff --git a/kafka-connect-s3/pom.xml b/kafka-connect-s3/pom.xml index 85e31a9a8..5e85409fc 100644 --- a/kafka-connect-s3/pom.xml +++ b/kafka-connect-s3/pom.xml @@ -33,6 +33,7 @@ + 2.20.133 1.12.650 0.2.5 0.11.1 @@ -83,6 +84,11 @@ httpclient ${httpclient.version} + + software.amazon.awssdk + sts + ${aws.sdk.version} + org.xerial.snappy @@ -180,7 +186,6 @@ test - diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index a537e73f4..c8f63e9af 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -17,14 +17,13 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; -import io.confluent.connect.storage.common.util.StringUtils; +import io.confluent.connect.s3.auth.iamAssume.AwsIAMAssumeRoleChaining; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -62,6 +61,7 @@ import io.confluent.connect.s3.format.parquet.ParquetFormat; import io.confluent.connect.s3.storage.CompressionType; import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.storage.common.util.StringUtils; import io.confluent.connect.storage.StorageSinkConnectorConfig; import io.confluent.connect.storage.common.ComposableConfig; import io.confluent.connect.storage.common.GenericRecommender; @@ -118,12 +118,21 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1 ); + public static final String AUTH_METHOD = "authentication.method"; + public static final String AWS_AUTH_DEFAULT = "Access key and secret"; public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; public static final String AWS_ACCESS_KEY_ID_DEFAULT = ""; public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; public static final Password AWS_SECRET_ACCESS_KEY_DEFAULT = new Password(null); + public static final String CUSTOMER_ROLE_ARN_CONFIG = "aws.iam.assume.role"; + public static final String CUSTOMER_ROLE_ARN_DEFAULT = ""; + public static final String CUSTOMER_ROLE_EXTERNAL_ID_CONFIG = "aws.iam.external.id"; + public static final Password CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT = new Password(null); + public static final String MIDDLEWARE_ROLE_ARN_CONFIG = "middleware.aws.iam.assume.role"; + public static final String MIDDLEWARE_ROLE_ARN_DEFAULT = ""; + public static final String REGION_CONFIG = "s3.region"; public static final String REGION_DEFAULT = Regions.DEFAULT_REGION.getName(); @@ -375,6 +384,54 @@ public static ConfigDef newConfigDef() { "AWS Credentials Provider Class" ); + configDef.define( + AUTH_METHOD, + Type.STRING, + AWS_AUTH_DEFAULT, + Importance.HIGH, + "Authentication method used for S3 Sink connector", + group, + ++orderInGroup, + Width.LONG, + "Authentication method" + ); + + configDef.define( + CUSTOMER_ROLE_ARN_CONFIG, + Type.STRING, + CUSTOMER_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + + configDef.define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + Type.PASSWORD, + CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT, + Importance.HIGH, + "External ID", + group, + ++orderInGroup, + Width.LONG, + "AWS External ID" + ); + + configDef.define( + MIDDLEWARE_ROLE_ARN_CONFIG, + Type.STRING, + MIDDLEWARE_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + configDef.define( AWS_ACCESS_KEY_ID_CONFIG, Type.STRING, @@ -878,26 +935,52 @@ public Password awsSecretKeyId() { return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG); } + public String awsCustomerRoleARN() { + return getString(CUSTOMER_ROLE_ARN_CONFIG); + } + + public String awsMiddlewareRoleARN() { + return getString(MIDDLEWARE_ROLE_ARN_CONFIG); + } + + public String awsExternalId() { + return getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + } + public int getPartSize() { return getInt(PART_SIZE_CONFIG); } + public String getAuthenticationMethod() { + return getString(AUTH_METHOD); + } + @SuppressWarnings("unchecked") public AWSCredentialsProvider getCredentialsProvider() { try { AWSCredentialsProvider provider = ((Class) - getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); + getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); if (provider instanceof Configurable) { Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( - CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() + CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() )); - configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); - configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + String authMethod = getAuthenticationMethod(); + if (authMethod == "IAM Assume Role") { + configs.put(CUSTOMER_ROLE_ARN_CONFIG, awsCustomerRoleARN()); + configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId()); + configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN()); - ((Configurable) provider).configure(configs); + provider = new AwsIAMAssumeRoleChaining(); + ((AwsIAMAssumeRoleChaining) provider).configure(configs); + } + else { + configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); + configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + ((Configurable) provider).configure(configs); + } } else { final String accessKeyId = awsAccessKeyId(); final String secretKey = awsSecretKeyId().value(); @@ -910,8 +993,8 @@ public AWSCredentialsProvider getCredentialsProvider() { return provider; } catch (IllegalAccessException | InstantiationException e) { throw new ConnectException( - "Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, - e + "Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + e ); } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java new file mode 100644 index 000000000..ecc17f05a --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java @@ -0,0 +1,99 @@ +package io.confluent.connect.s3.auth.iamAssume; + +import com.amazonaws.auth.*; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.services.sts.model.Tag; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG; +import java.util.Map; + +public class AwsIAMAssumeRoleChaining implements AWSCredentialsProvider { + + private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() + .define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.MEDIUM, + "The role external ID used when retrieving session credentials under an assumed role." + ).define( + CUSTOMER_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ).define( + MIDDLEWARE_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ); + + private String customerRoleArn; + private String customerRoleExternalId; + private String middlewareRoleArn; + private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; + + // Method to initiate role chaining + public void configure(Map configs) { + // Assume the initial role + AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs); + customerRoleArn = config.getString(CUSTOMER_ROLE_ARN_CONFIG); + customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG); + + STSAssumeRoleSessionCredentialsProvider initialProvider = buildProvider(middlewareRoleArn, "middlewareSession", "", null); + + // Use the credentials from the initial role to assume the subsequent role + stsCredentialProvider = buildProvider(customerRoleArn, "customerSession", customerRoleExternalId, initialProvider); + } + + // Updated buildProvider to optionally accept an existing AwsCredentialsProvider + private STSAssumeRoleSessionCredentialsProvider buildProvider(final String roleArn, final String roleSessionName, final String roleExternalId, STSAssumeRoleSessionCredentialsProvider existingProvider) { + STSAssumeRoleSessionCredentialsProvider credentialsProvider; + // If an existing credentials provider is provided, use it for creating the STS client + if (existingProvider != null) { + AWSCredentials basicCredentials = existingProvider.getCredentials(); + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() + ) + .withExternalId(roleExternalId) + .build(); + } else { + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .withExternalId(roleExternalId) + .build(); + } + return credentialsProvider; + } + + // Helper method to build the AssumeRoleRequest + private AssumeRoleRequest buildRequest(String roleExternalId, String roleArn) { + return AssumeRoleRequest.builder() + .roleArn(roleArn) + .externalId(roleExternalId) + .build(); + } + + @Override + public AWSCredentials getCredentials() { + return stsCredentialProvider.getCredentials(); + } + + @Override + public void refresh() { + stsCredentialProvider.refresh(); + } +} From 4a8bd5bd6f5d06a70854c001db42e88307e2557c Mon Sep 17 00:00:00 2001 From: Khyati Soneji Date: Wed, 24 Apr 2024 09:31:05 +0530 Subject: [PATCH 2/6] Fix lint errors --- kafka-connect-s3/pom.xml | 1 + .../connect/s3/S3SinkConnectorConfig.java | 89 ++++++------- .../iamAssume/AwsIAMAssumeRoleChaining.java | 99 --------------- .../iamassume/AwsIamAssumeRoleChaining.java | 119 ++++++++++++++++++ .../connect/s3/S3SinkConnectorConfigTest.java | 28 +++++ 5 files changed, 195 insertions(+), 141 deletions(-) delete mode 100644 kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java create mode 100644 kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java diff --git a/kafka-connect-s3/pom.xml b/kafka-connect-s3/pom.xml index 5e85409fc..275b33dbd 100644 --- a/kafka-connect-s3/pom.xml +++ b/kafka-connect-s3/pom.xml @@ -186,6 +186,7 @@ test + diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index c8f63e9af..a4288834a 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -17,13 +17,14 @@ import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.RegionUtils; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; -import io.confluent.connect.s3.auth.iamAssume.AwsIAMAssumeRoleChaining; +import io.confluent.connect.s3.auth.iamassume.AwsIamAssumeRoleChaining; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -281,6 +282,44 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { Arrays.stream(AffixType.names()).collect(Collectors.toList())); } + public static void addIamConfigDef(ConfigDef configDef, final String group, int orderInGroup) { + configDef.define( + CUSTOMER_ROLE_ARN_CONFIG, + Type.STRING, + CUSTOMER_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + + configDef.define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + Type.PASSWORD, + CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT, + Importance.HIGH, + "External ID", + group, + ++orderInGroup, + Width.LONG, + "AWS External ID" + ); + + configDef.define( + MIDDLEWARE_ROLE_ARN_CONFIG, + Type.STRING, + MIDDLEWARE_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + } + public static ConfigDef newConfigDef() { ConfigDef configDef = StorageSinkConnectorConfig.newConfigDef( FORMAT_CLASS_RECOMMENDER, @@ -396,41 +435,8 @@ public static ConfigDef newConfigDef() { "Authentication method" ); - configDef.define( - CUSTOMER_ROLE_ARN_CONFIG, - Type.STRING, - CUSTOMER_ROLE_ARN_DEFAULT, - Importance.HIGH, - "Role ARN", - group, - ++orderInGroup, - Width.LONG, - "AWS Role ARN" - ); - - configDef.define( - CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, - Type.PASSWORD, - CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT, - Importance.HIGH, - "External ID", - group, - ++orderInGroup, - Width.LONG, - "AWS External ID" - ); - - configDef.define( - MIDDLEWARE_ROLE_ARN_CONFIG, - Type.STRING, - MIDDLEWARE_ROLE_ARN_DEFAULT, - Importance.HIGH, - "Role ARN", - group, - ++orderInGroup, - Width.LONG, - "AWS Role ARN" - ); + // Define IAM Role ARN ConfigDef + addIamConfigDef(configDef, group, orderInGroup); configDef.define( AWS_ACCESS_KEY_ID_CONFIG, @@ -959,12 +965,12 @@ public String getAuthenticationMethod() { public AWSCredentialsProvider getCredentialsProvider() { try { AWSCredentialsProvider provider = ((Class) - getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); + getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); if (provider instanceof Configurable) { Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); configs.remove(CREDENTIALS_PROVIDER_CLASS_CONFIG.substring( - CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() + CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() )); String authMethod = getAuthenticationMethod(); @@ -973,10 +979,9 @@ public AWSCredentialsProvider getCredentialsProvider() { configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId()); configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN()); - provider = new AwsIAMAssumeRoleChaining(); - ((AwsIAMAssumeRoleChaining) provider).configure(configs); - } - else { + provider = new AwsIamAssumeRoleChaining(); + ((AwsIamAssumeRoleChaining) provider).configure(configs); + } else { configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); ((Configurable) provider).configure(configs); diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java deleted file mode 100644 index ecc17f05a..000000000 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamAssume/AwsIAMAssumeRoleChaining.java +++ /dev/null @@ -1,99 +0,0 @@ -package io.confluent.connect.s3.auth.iamAssume; - -import com.amazonaws.auth.*; -import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.StsAssumeRoleCredentialsProvider; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sts.StsClient; -import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; -import software.amazon.awssdk.services.sts.model.Tag; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; - -import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; -import static io.confluent.connect.s3.S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG; -import java.util.Map; - -public class AwsIAMAssumeRoleChaining implements AWSCredentialsProvider { - - private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() - .define( - CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, - ConfigDef.Type.STRING, - ConfigDef.Importance.MEDIUM, - "The role external ID used when retrieving session credentials under an assumed role." - ).define( - CUSTOMER_ROLE_ARN_CONFIG, - ConfigDef.Type.STRING, - ConfigDef.Importance.HIGH, - "Role ARN to use when starting a session." - ).define( - MIDDLEWARE_ROLE_ARN_CONFIG, - ConfigDef.Type.STRING, - ConfigDef.Importance.HIGH, - "Role ARN to use when starting a session." - ); - - private String customerRoleArn; - private String customerRoleExternalId; - private String middlewareRoleArn; - private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; - - // Method to initiate role chaining - public void configure(Map configs) { - // Assume the initial role - AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs); - customerRoleArn = config.getString(CUSTOMER_ROLE_ARN_CONFIG); - customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); - middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG); - - STSAssumeRoleSessionCredentialsProvider initialProvider = buildProvider(middlewareRoleArn, "middlewareSession", "", null); - - // Use the credentials from the initial role to assume the subsequent role - stsCredentialProvider = buildProvider(customerRoleArn, "customerSession", customerRoleExternalId, initialProvider); - } - - // Updated buildProvider to optionally accept an existing AwsCredentialsProvider - private STSAssumeRoleSessionCredentialsProvider buildProvider(final String roleArn, final String roleSessionName, final String roleExternalId, STSAssumeRoleSessionCredentialsProvider existingProvider) { - STSAssumeRoleSessionCredentialsProvider credentialsProvider; - // If an existing credentials provider is provided, use it for creating the STS client - if (existingProvider != null) { - AWSCredentials basicCredentials = existingProvider.getCredentials(); - credentialsProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder - .standard() - .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() - ) - .withExternalId(roleExternalId) - .build(); - } else { - credentialsProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) - .withExternalId(roleExternalId) - .build(); - } - return credentialsProvider; - } - - // Helper method to build the AssumeRoleRequest - private AssumeRoleRequest buildRequest(String roleExternalId, String roleArn) { - return AssumeRoleRequest.builder() - .roleArn(roleArn) - .externalId(roleExternalId) - .build(); - } - - @Override - public AWSCredentials getCredentials() { - return stsCredentialProvider.getCredentials(); - } - - @Override - public void refresh() { - stsCredentialProvider.refresh(); - } -} diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java new file mode 100644 index 000000000..1b8b525b0 --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java @@ -0,0 +1,119 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.s3.auth.iamassume; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG; +import java.util.Map; + +public class AwsIamAssumeRoleChaining implements AWSCredentialsProvider { + + private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() + .define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.MEDIUM, + "The role external ID used when retrieving session credentials under an assumed role." + ).define( + CUSTOMER_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ).define( + MIDDLEWARE_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ); + + private String customerRoleArn; + private String customerRoleExternalId; + private String middlewareRoleArn; + private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; + + // Method to initiate role chaining + public void configure(Map configs) { + // Assume the initial role + AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs); + customerRoleArn = config.getString(CUSTOMER_ROLE_ARN_CONFIG); + customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG); + + STSAssumeRoleSessionCredentialsProvider initialProvider = buildProvider( + middlewareRoleArn, + "middlewareSession", + "", + null + ); + + // Use the credentials from the initial role to assume the subsequent role + stsCredentialProvider = buildProvider( + customerRoleArn, + "customerSession", + customerRoleExternalId, + initialProvider + ); + } + + // Updated buildProvider to optionally accept an existing AwsCredentialsProvider + private STSAssumeRoleSessionCredentialsProvider buildProvider( + final String roleArn, + final String roleSessionName, + final String roleExternalId, + STSAssumeRoleSessionCredentialsProvider existingProvider) { + + STSAssumeRoleSessionCredentialsProvider credentialsProvider; + // If an existing credentials provider is provided, use it for creating the STS client + if (existingProvider != null) { + AWSCredentials basicCredentials = existingProvider.getCredentials(); + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() + ) + .withExternalId(roleExternalId) + .build(); + } else { + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .withExternalId(roleExternalId) + .build(); + } + return credentialsProvider; + } + + @Override + public AWSCredentials getCredentials() { + return stsCredentialProvider.getCredentials(); + } + + @Override + public void refresh() { + stsCredentialProvider.refresh(); + } +} diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java index acf1b8dbe..d44a7cb3c 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import io.confluent.connect.s3.S3SinkConnectorConfig.AffixType; import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider; import io.confluent.connect.s3.format.avro.AvroFormat; import io.confluent.connect.s3.format.json.JsonFormat; @@ -236,6 +237,33 @@ public void testConfigurableCredentialProvider() { assertEquals(SECRET_KEY_VALUE, credentialsProvider.getCredentials().getAWSSecretKey()); } + @Test + public void testConfigurableRoleChainingCredentialProvider() { + final String CUSTOMER_ROLE_ARN = "givenARN"; + final String CUSTOMER_EXTERNAL_ID = "WhoIsJohnGalt?"; + //final String MIDDLEWARE_ROLE_ARN = "exampleARN"; + + properties.put( + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + DummyAssertiveCredentialsProvider.class.getName() + ); + String configPrefix = S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX; + properties.put( + configPrefix.concat(DummyAssertiveCredentialsProvider.ACCESS_KEY_NAME), + CUSTOMER_ROLE_ARN + ); + properties.put( + configPrefix.concat(DummyAssertiveCredentialsProvider.SECRET_KEY_NAME), + CUSTOMER_EXTERNAL_ID + ); + connectorConfig = new S3SinkConnectorConfig(properties); + + AWSCredentialsProvider credentialsProvider = connectorConfig.getCredentialsProvider(); + + assertEquals(CUSTOMER_ROLE_ARN, credentialsProvider.getCredentials().getAWSAccessKeyId()); + assertEquals(CUSTOMER_EXTERNAL_ID, credentialsProvider.getCredentials().getAWSSecretKey()); + } + @Test public void testConfigurableAwsAssumeRoleCredentialsProvider() { properties.put( From e1b52454225fb6fe9615b36a5c6f61521038f49f Mon Sep 17 00:00:00 2001 From: Khyati Soneji Date: Thu, 25 Apr 2024 08:27:22 +0530 Subject: [PATCH 3/6] Add testing changes remove validations --- .../connect/s3/S3SinkConnectorConfig.java | 38 +++++++++++-------- .../iamassume/AwsIamAssumeRoleChaining.java | 29 ++++++++++---- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index a4288834a..49149b529 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -75,11 +75,14 @@ import io.confluent.connect.storage.partitioner.HourlyPartitioner; import io.confluent.connect.storage.partitioner.PartitionerConfig; import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { + private static final Logger log = LoggerFactory.getLogger(S3SinkConnectorConfig.class); // S3 Group public static final String S3_BUCKET_CONFIG = "s3.bucket.name"; @@ -949,8 +952,8 @@ public String awsMiddlewareRoleARN() { return getString(MIDDLEWARE_ROLE_ARN_CONFIG); } - public String awsExternalId() { - return getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + public Password awsExternalId() { + return getPassword(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); } public int getPartSize() { @@ -966,6 +969,9 @@ public AWSCredentialsProvider getCredentialsProvider() { try { AWSCredentialsProvider provider = ((Class) getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); + + String authMethod = getAuthenticationMethod(); + log.info("Authentication method: {}", authMethod); if (provider instanceof Configurable) { Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); @@ -973,25 +979,27 @@ public AWSCredentialsProvider getCredentialsProvider() { CREDENTIALS_PROVIDER_CONFIG_PREFIX.length() )); - String authMethod = getAuthenticationMethod(); - if (authMethod == "IAM Assume Role") { + configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); + configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); + ((Configurable) provider).configure(configs); + } else { + authMethod = getAuthenticationMethod(); + + if (authMethod.equals("IAM Assume Role")) { + Map configs = new HashMap(); configs.put(CUSTOMER_ROLE_ARN_CONFIG, awsCustomerRoleARN()); - configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId()); + configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId().value()); configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN()); provider = new AwsIamAssumeRoleChaining(); ((AwsIamAssumeRoleChaining) provider).configure(configs); } else { - configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); - configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); - ((Configurable) provider).configure(configs); - } - } else { - final String accessKeyId = awsAccessKeyId(); - final String secretKey = awsSecretKeyId().value(); - if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { - BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); - provider = new AWSStaticCredentialsProvider(basicCredentials); + final String accessKeyId = awsAccessKeyId(); + final String secretKey = awsSecretKeyId().value(); + if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { + BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); + provider = new AWSStaticCredentialsProvider(basicCredentials); + } } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java index 1b8b525b0..dbd29de4f 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java @@ -21,8 +21,11 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; + import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; @@ -31,11 +34,12 @@ public class AwsIamAssumeRoleChaining implements AWSCredentialsProvider { + private static final Logger log = LoggerFactory.getLogger(AwsIamAssumeRoleChaining.class); private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() .define( CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, ConfigDef.Type.STRING, - ConfigDef.Importance.MEDIUM, + ConfigDef.Importance.HIGH, "The role external ID used when retrieving session credentials under an assumed role." ).define( CUSTOMER_ROLE_ARN_CONFIG, @@ -53,6 +57,7 @@ public class AwsIamAssumeRoleChaining implements AWSCredentialsProvider { private String customerRoleExternalId; private String middlewareRoleArn; private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; + private STSAssumeRoleSessionCredentialsProvider initialProvider; // Method to initiate role chaining public void configure(Map configs) { @@ -62,12 +67,13 @@ public void configure(Map configs) { customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG); - STSAssumeRoleSessionCredentialsProvider initialProvider = buildProvider( + initialProvider = buildProvider( middlewareRoleArn, "middlewareSession", "", null ); + log.info("Got initial provider"); // Use the credentials from the initial role to assume the subsequent role stsCredentialProvider = buildProvider( @@ -76,6 +82,7 @@ public void configure(Map configs) { customerRoleExternalId, initialProvider ); + log.info("Got final credentials"); } // Updated buildProvider to optionally accept an existing AwsCredentialsProvider @@ -99,10 +106,9 @@ private STSAssumeRoleSessionCredentialsProvider buildProvider( .build(); } else { credentialsProvider = new STSAssumeRoleSessionCredentialsProvider - .Builder(roleArn, roleSessionName) - .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) - .withExternalId(roleExternalId) - .build(); + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .build(); } return credentialsProvider; } @@ -114,6 +120,15 @@ public AWSCredentials getCredentials() { @Override public void refresh() { - stsCredentialProvider.refresh(); + if (initialProvider != null) { + initialProvider.refresh(); + stsCredentialProvider = buildProvider( + customerRoleArn, + "customerSession", + customerRoleExternalId, + initialProvider + ); + } + //stsCredentialProvider.refresh(); } } From d7d990839296422c680ec669e71b07ca38e85ec7 Mon Sep 17 00:00:00 2001 From: Khyati Soneji Date: Tue, 7 May 2024 03:35:57 +0530 Subject: [PATCH 4/6] Add tests --- .../connect/s3/S3SinkConnectorConfig.java | 5 - .../AwsAssumeRoleCredentialsProvider.java | 4 + .../iamassume/AwsIamAssumeRoleChaining.java | 14 ++- .../s3/auth/AwsIamRoleChainingTest.java | 91 +++++++++++++++++++ 4 files changed, 101 insertions(+), 13 deletions(-) create mode 100644 kafka-connect-s3/src/test/java/io/confluent/connect/s3/auth/AwsIamRoleChainingTest.java diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 49149b529..944d2e57c 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -75,14 +75,10 @@ import io.confluent.connect.storage.partitioner.HourlyPartitioner; import io.confluent.connect.storage.partitioner.PartitionerConfig; import io.confluent.connect.storage.partitioner.TimeBasedPartitioner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { - - private static final Logger log = LoggerFactory.getLogger(S3SinkConnectorConfig.class); // S3 Group public static final String S3_BUCKET_CONFIG = "s3.bucket.name"; @@ -971,7 +967,6 @@ public AWSCredentialsProvider getCredentialsProvider() { getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); String authMethod = getAuthenticationMethod(); - log.info("Authentication method: {}", authMethod); if (provider instanceof Configurable) { Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java index 880a1045b..6908d1873 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java @@ -114,4 +114,8 @@ public void refresh() { } } + // Visible for test + public STSAssumeRoleSessionCredentialsProvider getProvider() { + return stsCredentialProvider; + } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java index dbd29de4f..1924564b5 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 Confluent Inc. + * Copyright 2024 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +24,6 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; @@ -33,8 +31,6 @@ import java.util.Map; public class AwsIamAssumeRoleChaining implements AWSCredentialsProvider { - - private static final Logger log = LoggerFactory.getLogger(AwsIamAssumeRoleChaining.class); private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() .define( CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, @@ -73,7 +69,6 @@ public void configure(Map configs) { "", null ); - log.info("Got initial provider"); // Use the credentials from the initial role to assume the subsequent role stsCredentialProvider = buildProvider( @@ -82,7 +77,6 @@ public void configure(Map configs) { customerRoleExternalId, initialProvider ); - log.info("Got final credentials"); } // Updated buildProvider to optionally accept an existing AwsCredentialsProvider @@ -129,6 +123,10 @@ public void refresh() { initialProvider ); } - //stsCredentialProvider.refresh(); + } + + // Visible for test + public STSAssumeRoleSessionCredentialsProvider getProvider() { + return stsCredentialProvider; } } diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/auth/AwsIamRoleChainingTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/auth/AwsIamRoleChainingTest.java new file mode 100644 index 000000000..a406a3fc1 --- /dev/null +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/auth/AwsIamRoleChainingTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.s3.auth; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import io.confluent.connect.s3.S3SinkConnectorTestBase; +import io.confluent.connect.s3.auth.iamassume.AwsIamAssumeRoleChaining; +import io.confluent.connect.s3.S3SinkConnectorConfig; + +import org.junit.After; +import org.junit.Before; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.Map; + +class AwsIamAssumeRoleChainingTest extends S3SinkConnectorTestBase { + + protected Map configs = new HashMap<>(); + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + final String CUSTOMER_ROLE_ARN = "arn:aws:iam::012345678901:role/my-restricted-role"; + final String CUSTOMER_EXTERNAL_ID = "exampleExternaID"; + final String MIDDLEWARE_ROLE_ARN = "arn:aws:iam::109876543210:role/my-example-role"; + + configs.put( + S3SinkConnectorConfig.AUTH_METHOD, + "IAM Assume Role" + ); + configs.put( + S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG, + CUSTOMER_ROLE_ARN + ); + configs.put( + S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + CUSTOMER_EXTERNAL_ID + ); + configs.put( + S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG, + MIDDLEWARE_ROLE_ARN + ); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + configs.clear(); + } + + @Test + public void testConfigure() { + AwsIamAssumeRoleChaining roleChaining = mock(AwsIamAssumeRoleChaining.class); + roleChaining.configure(configs); + + STSAssumeRoleSessionCredentialsProvider mockFinalProvider = Mockito.mock(STSAssumeRoleSessionCredentialsProvider.class); + Mockito.doReturn(mockFinalProvider).when(roleChaining).getProvider(); + + AWSCredentials expectedCredentials = new BasicAWSCredentials("accessKey", "secretKey"); + Mockito.when(mockFinalProvider.getCredentials()).thenReturn((AWSSessionCredentials) expectedCredentials); + + AWSCredentials actualCredentials = (AWSCredentials) roleChaining.getCredentials(); + + assertEquals(expectedCredentials, actualCredentials); + } +} From 3d13e718f22e8049f412569634fa31515bc1e106 Mon Sep 17 00:00:00 2001 From: Khyati Soneji Date: Mon, 13 May 2024 08:51:26 +0530 Subject: [PATCH 5/6] Fix checkstyle error --- .../connect/s3/S3SinkConnectorConfig.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 944d2e57c..6950f2423 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -281,7 +281,39 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { Arrays.stream(AffixType.names()).collect(Collectors.toList())); } - public static void addIamConfigDef(ConfigDef configDef, final String group, int orderInGroup) { + public static void addAuthenticationConfigDef(ConfigDef configDef, final String group, int orderInGroup) { + configDef.define( + AWS_ACCESS_KEY_ID_CONFIG, + Type.STRING, + AWS_ACCESS_KEY_ID_DEFAULT, + Importance.HIGH, + "The AWS access key ID used to authenticate personal AWS credentials such as IAM " + + "credentials. Use only if you do not wish to authenticate by using a credentials " + + "provider class via ``" + + CREDENTIALS_PROVIDER_CLASS_CONFIG + + "``", + group, + ++orderInGroup, + Width.LONG, + "AWS Access Key ID" + ); + + configDef.define( + AWS_SECRET_ACCESS_KEY_CONFIG, + Type.PASSWORD, + AWS_SECRET_ACCESS_KEY_DEFAULT, + Importance.HIGH, + "The secret access key used to authenticate personal AWS credentials such as IAM " + + "credentials. Use only if you do not wish to authenticate by using a credentials " + + "provider class via ``" + + CREDENTIALS_PROVIDER_CLASS_CONFIG + + "``", + group, + ++orderInGroup, + Width.LONG, + "AWS Secret Access Key" + ); + configDef.define( CUSTOMER_ROLE_ARN_CONFIG, Type.STRING, @@ -434,8 +466,8 @@ public static ConfigDef newConfigDef() { "Authentication method" ); - // Define IAM Role ARN ConfigDef - addIamConfigDef(configDef, group, orderInGroup); + // Define Config use for Authentication + addAuthenticationConfigDef(configDef, group, orderInGroup); configDef.define( AWS_ACCESS_KEY_ID_CONFIG, From e0f664b098c69ece474c14a0fea9da703c067202 Mon Sep 17 00:00:00 2001 From: Khyati Soneji Date: Mon, 13 May 2024 09:24:52 +0530 Subject: [PATCH 6/6] Fix checkstyle errors --- .../java/io/confluent/connect/s3/S3SinkConnectorConfig.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index 6950f2423..cc1aad5bf 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -281,7 +281,10 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { Arrays.stream(AffixType.names()).collect(Collectors.toList())); } - public static void addAuthenticationConfigDef(ConfigDef configDef, final String group, int orderInGroup) { + public static void addAuthenticationConfigDef( + ConfigDef configDef, + final String group, + int orderInGroup) { configDef.define( AWS_ACCESS_KEY_ID_CONFIG, Type.STRING,