Hello,

We are encountering a connection issue with our Kafka sink when using AWS
MSK IAM authentication.  While our Kafka source connects successfully, the
sink fails to establish a connection.
Here's how we're building the sink:

```java
KafkaSinkBuilder<T> builder = KafkaSink.<T>builder()

.setBootstrapServers(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))
        .setRecordSerializer(new
FlinkKafkaRecordSerializationSchema<>(outputConfig))
        .setKafkaProducerConfig(producerProps)
        .setDeliveryGuarantee(deliveryGuarantee)
        .setTransactionalIdPrefix(getTransactionalIdPrefix(outputConfig,
jobName));
```

The `producerProps` include the following configuration for IAM
authentication:

```
{
  security.protocol=SASL_SSL,
  sasl.mechanism=AWS_MSK_IAM,
  sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;,

sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
}
```

The sink connection fails with the following error:

```
2025-02-20 13:57:35,299 WARN  org.apache.kafka.clients.NetworkClient ... -
[AdminClient clientId=pdfOrchestrator-enumerator-admin-client] Error
connecting to node kafka-central-1.amazonaws.com:9098 (id: -1 rack: null)
java.io.IOException: Channel could not be created for socket
java.nio.channels.SocketChannel[closed]
...
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to
configure SaslClientAuthenticator
...
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException:
Failed to create SaslClient with mechanism AWS_MSK_IAM
```

This error suggests a problem with the SASL/IAM authentication
configuration for the sink.  We have already verified the following:

* **Endpoint:** We are using port 9098, which is correct for MSK with IAM
authentication.  We have also confirmed that the source connection uses the
same port.
* **Dependencies:** The `aws-msk-iam-auth` dependency is included in our
project.

We suspect the issue might be related to IAM role configuration or missing
AWS region information. Could you please provide guidance on how to further
troubleshoot this?  Specifically, we would appreciate information on:

* Best practices for configuring IAM roles for Flink jobs connecting to MSK
with IAM authentication.
* How to explicitly set the AWS region for the Kafka client in this context.
* Any other common pitfalls related to MSK IAM authentication with Flink.

Thank you for your assistance.

Reply via email to