Hi, I have a PyFlink job that needs to read from a Kafka topic and the communication with the Kafka broker requires SSL. I have connected to the Kafka cluster with something like this using just Python.
from confluent_kafka import Consumer, KafkaException, KafkaError def get_config(bootstrap_servers, ca_file, cert_file, key_file): config = { 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SSL', 'ssl.ca.location': ca_file, 'ssl.certificate.location': cert_file, 'ssl.key.location': key_file, 'ssl.endpoint.identification.algorithm': 'none', 'enable.ssl.certificate.verification': 'false', 'group.id': ‘my_group_id' } return config And have read messages from the Kafka topic. I am trying to set up something similar with Flink SQL: t_env.execute_sql(f""" CREATE TABLE logs ( `user` ROW(`user_id` BIGINT), `timestamp` ROW(`secs` BIGINT) ) WITH ( 'connector' = '{CONNECTOR_TYPE}', 'topic' = ‘{KAFKA_TOPIC}', 'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}', 'properties.group.id' = '{CONSUMER_GROUP}', 'scan.startup.mode' = '{STARTUP_MODE_LATEST}', 'format' = '{MESSAGE_FORMAT}', 'properties.security.protocol' = 'SSL', 'properties.ssl.ca.location' = '{ca_file}', 'properties.ssl.certificate.location' = '{cert_file}', 'properties.ssl.key.location' = '{key_file}', 'properties.ssl.endpoint.identification.algorithm' = '' ) """) But when this runs I am getting this error: Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator cbc357ccb763df2852fee8c4fc7d55f2). ... Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to at ... Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs]. at ... ... 3 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) ... 10 more Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target ... at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344) at java.lang.Thread.run(Thread.java:750) Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456) at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323) at sun.security.validator.Validator.validate(Validator.java:271) at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315) at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278) at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632) ... 19 more Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148) at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129) at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280) at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451) Any idea how this is usually configured in PyFlink? I am running this on EMR if it matters. Thanks Kind regards Phil