trying to put the jks on s3... unfortunately, no luck. i have properties set up: 'properties.ssl.keystore.location'='s3://application-bucket/kafka.keystore.jks'
got the following error message: at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore .load(SslEngineBuilder.java:292) at org.apache.kafka.common.security.ssl.SslEngineBuilder .createSSLContext(SslEngineBuilder.java:144) ... 22 more *Caused by: java.nio.file.NoSuchFileException: s3:/application-bucket/kafka.keystore.jks* at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86 ) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixFileSystemProvider.newByteChannel( UnixFileSystemProvider.java:214) at java.nio.file.Files.newByteChannel(Files.java:361) at java.nio.file.Files.newByteChannel(Files.java:407) at java.nio.file.spi.FileSystemProvider.newInputStream( FileSystemProvider.java:384) at java.nio.file.Files.newInputStream(Files.java:152) at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore .load(SslEngineBuilder.java:285) ... 23 more On Tue, Nov 17, 2020 at 10:01 PM Fanbin Bu <fanbin...@coinbase.com> wrote: > let me try to put it on s3 and change code like: > 'properties.ssl.keystore.location'='s3://my-bucket/keystore.jks > > Thanks, > Fanbin > > On Tue, Nov 17, 2020 at 6:43 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Try with hdfs folder with Gard coded value inside the code and see what >> happens. >> >> On Tue, 17 Nov 2020 at 18:42, sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Can you use hdfs as keystone location ? Are you using oozie to run your >>> job ? >>> >>> On Tue, 17 Nov 2020 at 17:54, Fanbin Bu <fanbin...@coinbase.com> wrote: >>> >>>> Hi Sri, my code is not github. but here is the skeleton. >>>> >>>> >>>> val stmt = s""" >>>> |create table ${table.name} (${schema}, ${watermark}) >>>> |with( >>>> |'connector' = 'kafka', >>>> |'topic' = '${table.topic}', >>>> |'scan.startup.mode'= '${table.scanStartUpMode}', >>>> |'properties.zookeeper.connect'='${Globals.ZOOKEEPER_CONNECT}', >>>> |'properties.bootstrap.servers'='${Globals.BOOTSTRAP_SERVERS}', >>>> |'properties.ssl.keystore.location'='${Globals.SSL_KEYSTORE_LOCATION}', >>>> |'properties.ssl.keystore.password'='${Globals.KEYSTORE_PASS}', >>>> |'properties.ssl.key.password'='${Globals.KEYSTORE_PASS}', >>>> |'properties.security.protocol'='SSL', >>>> |'properties.ssl.keystore.type'='JKS', >>>> |'properties.ssl.truststore.type'='JKS', >>>> |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1', >>>> |'properties.group.id' = '${table.name}_group_id', >>>> |'format' = 'json', >>>> |'json.ignore-parse-errors' = 'true' >>>> |) >>>> """.stripMargin >>>> >>>> >>>> tEnv.executeSql(stmt) >>>> >>>> >>>> On Tue, Nov 17, 2020 at 5:40 PM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> Hi Fanbin, >>>>> >>>>> Can you share your Flink code which reads from Kafka using SSL ? >>>>> >>>>> Is your code on GitHub ? >>>>> >>>>> Thanks >>>>> Sri >>>>> >>>>> >>>>> On Tue, 17 Nov 2020 at 17:14, Fanbin Bu <fanbin...@coinbase.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I >>>>>> tried to put keystore.jks location under /usr/lib/flink/... like: >>>>>> >>>>>> export >>>>>> SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks >>>>>> >>>>>> Notice that this is on EMR master(master) node. Both JM and TMs are >>>>>> on EMR core(slave) nodes. >>>>>> >>>>>> However, I got exception: *Caused by: >>>>>> java.nio.file.NoSuchFileException: >>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks* >>>>>> even though the file is there >>>>>> >>>>>> [hadoop@ip-10-200-41-39 flink]$ ll >>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks >>>>>> -rw-r--r-- 1 root root 5565 Nov 17 22:24 >>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks >>>>>> >>>>>> Where should the keystore.jks be located? >>>>>> >>>>>> Thanks, >>>>>> Fanbin >>>>>> >>>>>> >>>>>> Here is the full log. >>>>>> 2020-11-17 09:35:49 >>>>>> org.apache.kafka.common.KafkaException: Failed to construct kafka >>>>>> consumer >>>>>> at >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) >>>>>> at >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) >>>>>> at >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550) >>>>>> at >>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: org.apache.kafka.common.KafkaException: >>>>>> org.apache.kafka.common.KafkaException: >>>>>> org.apache.kafka.common.KafkaException: Failed to load SSL keystore >>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks >>>>>> of type JKS >>>>>> at >>>>>> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71) >>>>>> at >>>>>> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) >>>>>> at >>>>>> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) >>>>>> at >>>>>> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) >>>>>> at >>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) >>>>>> ... 15 more >>>>>> Caused by: org.apache.kafka.common.KafkaException: >>>>>> org.apache.kafka.common.KafkaException: Failed to load SSL keystore >>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks >>>>>> of type JKS >>>>>> at >>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163) >>>>>> at >>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104) >>>>>> at >>>>>> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95) >>>>>> at >>>>>> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69) >>>>>> ... 19 more >>>>>> Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL >>>>>> keystore >>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks >>>>>> of type JKS >>>>>> at >>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292) >>>>>> at >>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144) >>>>>> ... 22 more >>>>>> *Caused by: java.nio.file.NoSuchFileException: >>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks* >>>>>> at >>>>>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) >>>>>> at >>>>>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) >>>>>> at >>>>>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) >>>>>> at >>>>>> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) >>>>>> at java.nio.file.Files.newByteChannel(Files.java:361) >>>>>> at java.nio.file.Files.newByteChannel(Files.java:407) >>>>>> at >>>>>> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) >>>>>> at java.nio.file.Files.newInputStream(Files.java:152) >>>>>> at >>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285) >>>>>> >>>>>> >>>>> -- >>>>> Thanks & Regards >>>>> Sri Tummala >>>>> >>>>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> -- >> Thanks & Regards >> Sri Tummala >> >>