Hi Phil

You need specify keystore with CA location [1]

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#security


________________________________
От: gongzhongqiang <gongzhongqi...@apache.org>
Отправлено: 17 мая 2024 г. 10:44:18
Кому: Phil Stavridis
Копия: user@flink.apache.org
Тема: Re: SSL Kafka PyFlink

Hi Phil,

The kafka configuration keys of ssl maybe not correct. You can refer the kafka 
document[1] to get the ssl configurations of client.


[1] https://kafka.apache.org/documentation/#security_configclients


Best,
Zhongqiang Gong

Phil Stavridis <phi...@gmail.com<mailto:phi...@gmail.com>> 于2024年5月17日周五 
01:44写道:
Hi,

I have a PyFlink job that needs to read from a Kafka topic and the 
communication with the Kafka broker requires SSL.
I have connected to the Kafka cluster with something like this using just 
Python.

from confluent_kafka import Consumer, KafkaException, KafkaError



def get_config(bootstrap_servers, ca_file, cert_file, key_file):
config = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SSL',
'ssl.ca.location': ca_file,
'ssl.certificate.location': cert_file,
'ssl.key.location': key_file,
'ssl.endpoint.identification.algorithm': 'none',
'enable.ssl.certificate.verification': 'false',
'group.id<http://group.id>': ‘my_group_id'
}


return config



And have read messages from the Kafka topic.

I am trying to set up something similar with Flink SQL:

t_env.execute_sql(f"""
CREATE TABLE logs (
`user` ROW(`user_id` BIGINT),
`timestamp` ROW(`secs` BIGINT)
) WITH (
'connector' = '{CONNECTOR_TYPE}',
'topic' = ‘{KAFKA_TOPIC}',
'properties.bootstrap.servers' = '{BOOTSTRAP_SERVERS}',
'properties.group.id<http://properties.group.id>' = '{CONSUMER_GROUP}',
'scan.startup.mode' = '{STARTUP_MODE_LATEST}',
'format' = '{MESSAGE_FORMAT}',
'properties.security.protocol' = 'SSL',
'properties.ssl.ca.location' = '{ca_file}',
'properties.ssl.certificate.location' = '{cert_file}',
'properties.ssl.key.location' = '{key_file}',
'properties.ssl.endpoint.identification.algorithm' = ''
)
""")


But when this runs I am getting this error:

Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: kafka_action_logs[1] -> Calc[2]' (operator 
cbc357ccb763df2852fee8c4fc7d55f2).
...
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list 
subscribed topic partitions due to
at
...
Caused by: java.lang.RuntimeException: Failed to get metadata for topics [logs].
at
...
... 3 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44)
... 10 more
Caused by: 
org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.SslAuthenticationException:
 SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target
...
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:435)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:523)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:373)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:293)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:481)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)
at 
org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)
at java.lang.Thread.run(Thread.java:750)
Caused by: sun.security.validator.ValidatorException: PKIX path building 
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
find valid certification path to requested target
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
at sun.security.validator.Validator.validate(Validator.java:271)
at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
at 
sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:278)
at 
sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
at 
sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:632)
... 19 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable 
to find valid certification path to requested target
at 
sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:148)
at 
sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:129)
at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)


Any idea how this is usually configured in PyFlink? I am running this on EMR if 
it matters.
Thanks

Kind regards
Phil












________________________________
“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”

Reply via email to