i have to put the keystore file to the nodes. On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu <fanbin...@coinbase.com> wrote:
> Hi, > > This is a repost with modified subject per Sri Tummala's suggestion. > > I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I > tried to put keystore.jks location under /usr/lib/flink/... like: > > export > SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks > > Notice that this is on EMR master(master) node. Both JM and TMs are on EMR > core(slave) nodes. > > Here is the code snippet: > > val stmt = s""" > |create table ${table.name} (${schema}, ${watermark}) > |with( > |'connector' = 'kafka', > |'topic' = '${table.topic}', > |'scan.startup.mode'= '${table.scanStartUpMode}', > |'properties.zookeeper.connect'='xxx', > |'properties.bootstrap.servers'='xxx', > * > |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',* > |'properties.ssl.keystore.password'='xxx', > |'properties.ssl.key.password'='xxx', > |'properties.security.protocol'='SSL', > |'properties.ssl.keystore.type'='JKS', > |'properties.ssl.truststore.type'='JKS', > |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1', > |'properties.group.id' = '${table.name}_group_id', > |'format' = 'json', > |'json.ignore-parse-errors' = 'true' > |) > """.stripMargin > > tEnv.executeSql(stmt) > > > However, I got exception: *Caused by: java.nio.file.NoSuchFileException: > /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks* > even though the file is there > [hadoop@ip-10-200-41-39 flink]$ ll > /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks > -rw-r--r-- 1 root root 5565 Nov 17 22:24 > /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks > > Things i tried: > 1. the keystore.jks file itself works since I can use console-consumer to > read kafka topics on EMR master. > 2. set the location to be s3://my-bucket/keystore.jks, not working > > What value should I set the keystore location to? > Thanks! > Fanbin > > Also attached the full exception log: > > 2020-11-17 09:35:49 > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: Failed to load SSL keystore > /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks > of type JKS > at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) > at > org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) > at > org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) > at > org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) > ... 15 more > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: Failed to load SSL keystore > /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks > of type JKS > at > org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104) > at > org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95) > at > org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69) > ... 19 more > Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL > keystore > /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks > of type JKS > at > org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144) > ... 22 more > *Caused by: java.nio.file.NoSuchFileException: > /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks* > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) > at java.nio.file.Files.newByteChannel(Files.java:407) > at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) > at > org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285) >