Hi Phil You need specify keystore with CA location [1]
[1] https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#security ________________________________ От: gongzhongqiang <gongzhongqi...@apache.org> Отправлено: 17 мая 2024 г. 10:44:18 Кому: Phil Stavridis Копия: user@flink.apache.org Тема: Re: SSL Kafka PyFlink Hi Phil, The kafka configuration keys of ssl maybe not correct. You can refer the kafka document[1] to get the ssl configurations of client. [1] https://kafka.apache.org/documentation/#security_configclients Best, Zhongqiang Gong Phil Stavridis <phi...@gmail.com<mailto:phi...@gmail.com>> 于2024年5月17日周五 01:44写道: Hi, I have a PyFlink job that needs to read from a Kafka topic and the communication with the Kafka broker requires SSL. I have connected to the Kafka cluster with something like this using just Python. from confluent_kafka import Consumer, KafkaException, KafkaError def get_config(bootstrap_servers, ca_file, cert_file, key_file): config = { 'bootstrap.servers': bootstrap_servers, 'security.protocol': 'SSL', 'ssl.ca.location': ca_file, 'ssl.certificate.location': cert_file, 'ssl.key.location': key_file, 'ssl.endpoint.identification.algorithm': 'none', 'enable.ssl.certificate.verification': 'false', 'group.id<http://group.id>': ‘my_group_id' } return config And have read messages from the Kafka topic. I am trying to set up something similar with Flink SQL: t_env.execute_sql(f""" CREATE TABLE logs ( `user` ROW(`user_id` BIGINT), `timestamp` ROW(`secs` BIGINT) ) WITH ( 'connector' = '{CONNECTOR_TYPE}', 'topic' = ‘{KAFKA_TOPIC}', 'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}', 'properties.group.id<http://properties.group.id>' = '{CONSUMER_GROUP}', 'scan.startup.mode' = '{STARTUP_MODE_LATEST}', 'format' = '{MESSAGE_FORMAT}', 'properties.security.protocol' = 'SSL', 'properties.ssl.ca.location' = '{ca_file}', 'properties.ssl.certificate.location' = '{cert_file}', 'properties.ssl.key.location' = '{key_file}', 'properties.ssl.endpoint.identification.algorithm' = '' ) """) But when this runs I am getting this error: Caused by: org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator cbc357ccb763df2852fee8c4fc7d55f2). ... Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to at ... Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs]. at ... ... 3 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) ... 10 more Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target ... at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543) at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413) at org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344) at java.lang.Thread.run(Thread.java:750) Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456) at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323) at sun.security.validator.Validator.validate(Validator.java:271) at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315) at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278) at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141) at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632) ... 19 more Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148) at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129) at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280) at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451) Any idea how this is usually configured in PyFlink? I am running this on EMR if it matters. Thanks Kind regards Phil ________________________________ “This message contains confidential information/commercial secret. If you are not the intended addressee of this message you may not copy, save, print or forward it to any third party and you are kindly requested to destroy this message and notify the sender thereof by email. Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом отправителя электронным письмом.”