diff --git a/kafka-connect-s3/pom.xml b/kafka-connect-s3/pom.xml index 85e31a9a8..275b33dbd 100644 --- a/kafka-connect-s3/pom.xml +++ b/kafka-connect-s3/pom.xml @@ -33,6 +33,7 @@ + 2.20.133 1.12.650 0.2.5 0.11.1 @@ -83,6 +84,11 @@ httpclient ${httpclient.version} + + software.amazon.awssdk + sts + ${aws.sdk.version} + org.xerial.snappy diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java index a537e73f4..cc1aad5bf 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/S3SinkConnectorConfig.java @@ -24,7 +24,7 @@ import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.SSEAlgorithm; -import io.confluent.connect.storage.common.util.StringUtils; +import io.confluent.connect.s3.auth.iamassume.AwsIamAssumeRoleChaining; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; @@ -62,6 +62,7 @@ import io.confluent.connect.s3.format.parquet.ParquetFormat; import io.confluent.connect.s3.storage.CompressionType; import io.confluent.connect.s3.storage.S3Storage; +import io.confluent.connect.storage.common.util.StringUtils; import io.confluent.connect.storage.StorageSinkConnectorConfig; import io.confluent.connect.storage.common.ComposableConfig; import io.confluent.connect.storage.common.GenericRecommender; @@ -78,7 +79,6 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { - // S3 Group public static final String S3_BUCKET_CONFIG = "s3.bucket.name"; @@ -118,12 +118,21 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { CREDENTIALS_PROVIDER_CLASS_CONFIG.lastIndexOf(".") + 1 ); + public static final String AUTH_METHOD = "authentication.method"; + public static final String AWS_AUTH_DEFAULT = "Access key and secret"; public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; public static final String AWS_ACCESS_KEY_ID_DEFAULT = ""; public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; public static final Password AWS_SECRET_ACCESS_KEY_DEFAULT = new Password(null); + public static final String CUSTOMER_ROLE_ARN_CONFIG = "aws.iam.assume.role"; + public static final String CUSTOMER_ROLE_ARN_DEFAULT = ""; + public static final String CUSTOMER_ROLE_EXTERNAL_ID_CONFIG = "aws.iam.external.id"; + public static final Password CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT = new Password(null); + public static final String MIDDLEWARE_ROLE_ARN_CONFIG = "middleware.aws.iam.assume.role"; + public static final String MIDDLEWARE_ROLE_ARN_DEFAULT = ""; + public static final String REGION_CONFIG = "s3.region"; public static final String REGION_DEFAULT = Regions.DEFAULT_REGION.getName(); @@ -272,6 +281,79 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig { Arrays.stream(AffixType.names()).collect(Collectors.toList())); } + public static void addAuthenticationConfigDef( + ConfigDef configDef, + final String group, + int orderInGroup) { + configDef.define( + AWS_ACCESS_KEY_ID_CONFIG, + Type.STRING, + AWS_ACCESS_KEY_ID_DEFAULT, + Importance.HIGH, + "The AWS access key ID used to authenticate personal AWS credentials such as IAM " + + "credentials. Use only if you do not wish to authenticate by using a credentials " + + "provider class via ``" + + CREDENTIALS_PROVIDER_CLASS_CONFIG + + "``", + group, + ++orderInGroup, + Width.LONG, + "AWS Access Key ID" + ); + + configDef.define( + AWS_SECRET_ACCESS_KEY_CONFIG, + Type.PASSWORD, + AWS_SECRET_ACCESS_KEY_DEFAULT, + Importance.HIGH, + "The secret access key used to authenticate personal AWS credentials such as IAM " + + "credentials. Use only if you do not wish to authenticate by using a credentials " + + "provider class via ``" + + CREDENTIALS_PROVIDER_CLASS_CONFIG + + "``", + group, + ++orderInGroup, + Width.LONG, + "AWS Secret Access Key" + ); + + configDef.define( + CUSTOMER_ROLE_ARN_CONFIG, + Type.STRING, + CUSTOMER_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + + configDef.define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + Type.PASSWORD, + CUSTOMER_ROLE_EXTERNAL_ID_DEFAULT, + Importance.HIGH, + "External ID", + group, + ++orderInGroup, + Width.LONG, + "AWS External ID" + ); + + configDef.define( + MIDDLEWARE_ROLE_ARN_CONFIG, + Type.STRING, + MIDDLEWARE_ROLE_ARN_DEFAULT, + Importance.HIGH, + "Role ARN", + group, + ++orderInGroup, + Width.LONG, + "AWS Role ARN" + ); + } + public static ConfigDef newConfigDef() { ConfigDef configDef = StorageSinkConnectorConfig.newConfigDef( FORMAT_CLASS_RECOMMENDER, @@ -375,6 +457,21 @@ public static ConfigDef newConfigDef() { "AWS Credentials Provider Class" ); + configDef.define( + AUTH_METHOD, + Type.STRING, + AWS_AUTH_DEFAULT, + Importance.HIGH, + "Authentication method used for S3 Sink connector", + group, + ++orderInGroup, + Width.LONG, + "Authentication method" + ); + + // Define Config use for Authentication + addAuthenticationConfigDef(configDef, group, orderInGroup); + configDef.define( AWS_ACCESS_KEY_ID_CONFIG, Type.STRING, @@ -878,15 +975,33 @@ public Password awsSecretKeyId() { return getPassword(AWS_SECRET_ACCESS_KEY_CONFIG); } + public String awsCustomerRoleARN() { + return getString(CUSTOMER_ROLE_ARN_CONFIG); + } + + public String awsMiddlewareRoleARN() { + return getString(MIDDLEWARE_ROLE_ARN_CONFIG); + } + + public Password awsExternalId() { + return getPassword(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + } + public int getPartSize() { return getInt(PART_SIZE_CONFIG); } + public String getAuthenticationMethod() { + return getString(AUTH_METHOD); + } + @SuppressWarnings("unchecked") public AWSCredentialsProvider getCredentialsProvider() { try { AWSCredentialsProvider provider = ((Class) getClass(S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG)).newInstance(); + + String authMethod = getAuthenticationMethod(); if (provider instanceof Configurable) { Map configs = originalsWithPrefix(CREDENTIALS_PROVIDER_CONFIG_PREFIX); @@ -896,22 +1011,33 @@ public AWSCredentialsProvider getCredentialsProvider() { configs.put(AWS_ACCESS_KEY_ID_CONFIG, awsAccessKeyId()); configs.put(AWS_SECRET_ACCESS_KEY_CONFIG, awsSecretKeyId().value()); - ((Configurable) provider).configure(configs); } else { - final String accessKeyId = awsAccessKeyId(); - final String secretKey = awsSecretKeyId().value(); - if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { - BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); - provider = new AWSStaticCredentialsProvider(basicCredentials); + authMethod = getAuthenticationMethod(); + + if (authMethod.equals("IAM Assume Role")) { + Map configs = new HashMap(); + configs.put(CUSTOMER_ROLE_ARN_CONFIG, awsCustomerRoleARN()); + configs.put(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, awsExternalId().value()); + configs.put(MIDDLEWARE_ROLE_ARN_CONFIG, awsMiddlewareRoleARN()); + + provider = new AwsIamAssumeRoleChaining(); + ((AwsIamAssumeRoleChaining) provider).configure(configs); + } else { + final String accessKeyId = awsAccessKeyId(); + final String secretKey = awsSecretKeyId().value(); + if (StringUtils.isNotBlank(accessKeyId) && StringUtils.isNotBlank(secretKey)) { + BasicAWSCredentials basicCredentials = new BasicAWSCredentials(accessKeyId, secretKey); + provider = new AWSStaticCredentialsProvider(basicCredentials); + } } } return provider; } catch (IllegalAccessException | InstantiationException e) { throw new ConnectException( - "Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, - e + "Invalid class for: " + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + e ); } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java index 880a1045b..6908d1873 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/AwsAssumeRoleCredentialsProvider.java @@ -114,4 +114,8 @@ public void refresh() { } } + // Visible for test + public STSAssumeRoleSessionCredentialsProvider getProvider() { + return stsCredentialProvider; + } } diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java new file mode 100644 index 000000000..1924564b5 --- /dev/null +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/auth/iamassume/AwsIamAssumeRoleChaining.java @@ -0,0 +1,132 @@ +/* + * Copyright 2024 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.s3.auth.iamassume; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG; +import static io.confluent.connect.s3.S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG; +import java.util.Map; + +public class AwsIamAssumeRoleChaining implements AWSCredentialsProvider { + private static final ConfigDef STS_CONFIG_DEF = new ConfigDef() + .define( + CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "The role external ID used when retrieving session credentials under an assumed role." + ).define( + CUSTOMER_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ).define( + MIDDLEWARE_ROLE_ARN_CONFIG, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Role ARN to use when starting a session." + ); + + private String customerRoleArn; + private String customerRoleExternalId; + private String middlewareRoleArn; + private STSAssumeRoleSessionCredentialsProvider stsCredentialProvider; + private STSAssumeRoleSessionCredentialsProvider initialProvider; + + // Method to initiate role chaining + public void configure(Map configs) { + // Assume the initial role + AbstractConfig config = new AbstractConfig(STS_CONFIG_DEF, configs); + customerRoleArn = config.getString(CUSTOMER_ROLE_ARN_CONFIG); + customerRoleExternalId = config.getString(CUSTOMER_ROLE_EXTERNAL_ID_CONFIG); + middlewareRoleArn = config.getString(MIDDLEWARE_ROLE_ARN_CONFIG); + + initialProvider = buildProvider( + middlewareRoleArn, + "middlewareSession", + "", + null + ); + + // Use the credentials from the initial role to assume the subsequent role + stsCredentialProvider = buildProvider( + customerRoleArn, + "customerSession", + customerRoleExternalId, + initialProvider + ); + } + + // Updated buildProvider to optionally accept an existing AwsCredentialsProvider + private STSAssumeRoleSessionCredentialsProvider buildProvider( + final String roleArn, + final String roleSessionName, + final String roleExternalId, + STSAssumeRoleSessionCredentialsProvider existingProvider) { + + STSAssumeRoleSessionCredentialsProvider credentialsProvider; + // If an existing credentials provider is provided, use it for creating the STS client + if (existingProvider != null) { + AWSCredentials basicCredentials = existingProvider.getCredentials(); + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(basicCredentials)).build() + ) + .withExternalId(roleExternalId) + .build(); + } else { + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider + .Builder(roleArn, roleSessionName) + .withStsClient(AWSSecurityTokenServiceClientBuilder.defaultClient()) + .build(); + } + return credentialsProvider; + } + + @Override + public AWSCredentials getCredentials() { + return stsCredentialProvider.getCredentials(); + } + + @Override + public void refresh() { + if (initialProvider != null) { + initialProvider.refresh(); + stsCredentialProvider = buildProvider( + customerRoleArn, + "customerSession", + customerRoleExternalId, + initialProvider + ); + } + } + + // Visible for test + public STSAssumeRoleSessionCredentialsProvider getProvider() { + return stsCredentialProvider; + } +} diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java index acf1b8dbe..d44a7cb3c 100644 --- a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/S3SinkConnectorConfigTest.java @@ -37,6 +37,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import io.confluent.connect.s3.S3SinkConnectorConfig.AffixType; import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider; import io.confluent.connect.s3.format.avro.AvroFormat; import io.confluent.connect.s3.format.json.JsonFormat; @@ -236,6 +237,33 @@ public void testConfigurableCredentialProvider() { assertEquals(SECRET_KEY_VALUE, credentialsProvider.getCredentials().getAWSSecretKey()); } + @Test + public void testConfigurableRoleChainingCredentialProvider() { + final String CUSTOMER_ROLE_ARN = "givenARN"; + final String CUSTOMER_EXTERNAL_ID = "WhoIsJohnGalt?"; + //final String MIDDLEWARE_ROLE_ARN = "exampleARN"; + + properties.put( + S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CLASS_CONFIG, + DummyAssertiveCredentialsProvider.class.getName() + ); + String configPrefix = S3SinkConnectorConfig.CREDENTIALS_PROVIDER_CONFIG_PREFIX; + properties.put( + configPrefix.concat(DummyAssertiveCredentialsProvider.ACCESS_KEY_NAME), + CUSTOMER_ROLE_ARN + ); + properties.put( + configPrefix.concat(DummyAssertiveCredentialsProvider.SECRET_KEY_NAME), + CUSTOMER_EXTERNAL_ID + ); + connectorConfig = new S3SinkConnectorConfig(properties); + + AWSCredentialsProvider credentialsProvider = connectorConfig.getCredentialsProvider(); + + assertEquals(CUSTOMER_ROLE_ARN, credentialsProvider.getCredentials().getAWSAccessKeyId()); + assertEquals(CUSTOMER_EXTERNAL_ID, credentialsProvider.getCredentials().getAWSSecretKey()); + } + @Test public void testConfigurableAwsAssumeRoleCredentialsProvider() { properties.put( diff --git a/kafka-connect-s3/src/test/java/io/confluent/connect/s3/auth/AwsIamRoleChainingTest.java b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/auth/AwsIamRoleChainingTest.java new file mode 100644 index 000000000..a406a3fc1 --- /dev/null +++ b/kafka-connect-s3/src/test/java/io/confluent/connect/s3/auth/AwsIamRoleChainingTest.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.confluent.connect.s3.auth; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import io.confluent.connect.s3.S3SinkConnectorTestBase; +import io.confluent.connect.s3.auth.iamassume.AwsIamAssumeRoleChaining; +import io.confluent.connect.s3.S3SinkConnectorConfig; + +import org.junit.After; +import org.junit.Before; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; + +import java.util.HashMap; +import java.util.Map; + +class AwsIamAssumeRoleChainingTest extends S3SinkConnectorTestBase { + + protected Map configs = new HashMap<>(); + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + final String CUSTOMER_ROLE_ARN = "arn:aws:iam::012345678901:role/my-restricted-role"; + final String CUSTOMER_EXTERNAL_ID = "exampleExternaID"; + final String MIDDLEWARE_ROLE_ARN = "arn:aws:iam::109876543210:role/my-example-role"; + + configs.put( + S3SinkConnectorConfig.AUTH_METHOD, + "IAM Assume Role" + ); + configs.put( + S3SinkConnectorConfig.CUSTOMER_ROLE_ARN_CONFIG, + CUSTOMER_ROLE_ARN + ); + configs.put( + S3SinkConnectorConfig.CUSTOMER_ROLE_EXTERNAL_ID_CONFIG, + CUSTOMER_EXTERNAL_ID + ); + configs.put( + S3SinkConnectorConfig.MIDDLEWARE_ROLE_ARN_CONFIG, + MIDDLEWARE_ROLE_ARN + ); + } + + @After + @Override + public void tearDown() throws Exception { + super.tearDown(); + configs.clear(); + } + + @Test + public void testConfigure() { + AwsIamAssumeRoleChaining roleChaining = mock(AwsIamAssumeRoleChaining.class); + roleChaining.configure(configs); + + STSAssumeRoleSessionCredentialsProvider mockFinalProvider = Mockito.mock(STSAssumeRoleSessionCredentialsProvider.class); + Mockito.doReturn(mockFinalProvider).when(roleChaining).getProvider(); + + AWSCredentials expectedCredentials = new BasicAWSCredentials("accessKey", "secretKey"); + Mockito.when(mockFinalProvider.getCredentials()).thenReturn((AWSSessionCredentials) expectedCredentials); + + AWSCredentials actualCredentials = (AWSCredentials) roleChaining.getCredentials(); + + assertEquals(expectedCredentials, actualCredentials); + } +}