Hello, We are encountering a connection issue with our Kafka sink when using AWS MSK IAM authentication. While our Kafka source connects successfully, the sink fails to establish a connection.
Here's how we're building the sink: ```java KafkaSinkBuilder<T> builder = KafkaSink.<T>builder() .setBootstrapServers(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)) .setRecordSerializer(new FlinkKafkaRecordSerializationSchema<>(outputConfig)) .setKafkaProducerConfig(producerProps) .setDeliveryGuarantee(deliveryGuarantee) .setTransactionalIdPrefix(getTransactionalIdPrefix(outputConfig, jobName)); ``` The `producerProps` include the following configuration for IAM authentication: ``` { security.protocol=SASL_SSL, sasl.mechanism=AWS_MSK_IAM, sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;, sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler } ``` The sink connection fails with the following error: ``` 2025-02-20 13:57:35,299 WARN org.apache.kafka.clients.NetworkClient ... - [AdminClient clientId=pdfOrchestrator-enumerator-admin-client] Error connecting to node kafka-central-1.amazonaws.com:9098 (id: -1 rack: null) java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed] ... Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator ... Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to create SaslClient with mechanism AWS_MSK_IAM ``` This error suggests a problem with the SASL/IAM authentication configuration for the sink. We have already verified the following: * **Endpoint:** We are using port 9098, which is correct for MSK with IAM authentication. We have also confirmed that the source connection uses the same port. * **Dependencies:** The `aws-msk-iam-auth` dependency is included in our project. We suspect the issue might be related to IAM role configuration or missing AWS region information. Could you please provide guidance on how to further troubleshoot this? Specifically, we would appreciate information on: * Best practices for configuring IAM roles for Flink jobs connecting to MSK with IAM authentication. * How to explicitly set the AWS region for the Kafka client in this context. * Any other common pitfalls related to MSK IAM authentication with Flink. Thank you for your assistance.