Hi,

This is a repost with modified subject per Sri Tummala's suggestion.

I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I tried
to put keystore.jks location under /usr/lib/flink/... like:

export
SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Notice that this is on EMR master(master) node. Both JM and TMs are on EMR
core(slave) nodes.

Here is the code snippet:

val stmt = s"""
  |create table ${table.name} (${schema}, ${watermark})
  |with(
  |'connector' = 'kafka',
  |'topic' = '${table.topic}',
  |'scan.startup.mode'= '${table.scanStartUpMode}',
  |'properties.zookeeper.connect'='xxx',
  |'properties.bootstrap.servers'='xxx',
 *
|'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',*
  |'properties.ssl.keystore.password'='xxx',
  |'properties.ssl.key.password'='xxx',
  |'properties.security.protocol'='SSL',
  |'properties.ssl.keystore.type'='JKS',
  |'properties.ssl.truststore.type'='JKS',
  |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
  |'properties.group.id' = '${table.name}_group_id',
  |'format' = 'json',
  |'json.ignore-parse-errors' = 'true'
  |)
""".stripMargin

tEnv.executeSql(stmt)


However, I got exception: *Caused by: java.nio.file.NoSuchFileException:
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
even though the file is there
[hadoop@ip-10-200-41-39 flink]$ ll
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
-rw-r--r-- 1 root root 5565 Nov 17 22:24
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks

Things i tried:
1. the keystore.jks file itself works since I can use console-consumer to
read kafka topics on EMR master.
2. set the location to be s3://my-bucket/keystore.jks, not working

What value should I set the keystore location to?
Thanks!
Fanbin

Also attached the full exception log:

2020-11-17 09:35:49
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
of type JKS
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
... 15 more
Caused by: org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: Failed to load SSL keystore
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
of type JKS
at
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
at
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
at
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
... 19 more
Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
keystore
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
of type JKS
at
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
... 22 more
*Caused by: java.nio.file.NoSuchFileException:
/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at
java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at
org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)

Reply via email to