i have to put the keystore file to the nodes.

On Wed, Nov 18, 2020 at 4:29 PM Fanbin Bu <fanbin...@coinbase.com> wrote:

> Hi,
>
> This is a repost with modified subject per Sri Tummala's suggestion.
>
> I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I
> tried to put keystore.jks location under /usr/lib/flink/... like:
>
> export
> SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>
> Notice that this is on EMR master(master) node. Both JM and TMs are on EMR
> core(slave) nodes.
>
> Here is the code snippet:
>
> val stmt = s"""
>   |create table ${table.name} (${schema}, ${watermark})
>   |with(
>   |'connector' = 'kafka',
>   |'topic' = '${table.topic}',
>   |'scan.startup.mode'= '${table.scanStartUpMode}',
>   |'properties.zookeeper.connect'='xxx',
>   |'properties.bootstrap.servers'='xxx',
>  *
> |'properties.ssl.keystore.location'='/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks',*
>   |'properties.ssl.keystore.password'='xxx',
>   |'properties.ssl.key.password'='xxx',
>   |'properties.security.protocol'='SSL',
>   |'properties.ssl.keystore.type'='JKS',
>   |'properties.ssl.truststore.type'='JKS',
>   |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
>   |'properties.group.id' = '${table.name}_group_id',
>   |'format' = 'json',
>   |'json.ignore-parse-errors' = 'true'
>   |)
> """.stripMargin
>
> tEnv.executeSql(stmt)
>
>
> However, I got exception: *Caused by: java.nio.file.NoSuchFileException:
> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
> even though the file is there
> [hadoop@ip-10-200-41-39 flink]$ ll
> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
> -rw-r--r-- 1 root root 5565 Nov 17 22:24
> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>
> Things i tried:
> 1. the keystore.jks file itself works since I can use console-consumer to
> read kafka topics on EMR master.
> 2. set the location to be s3://my-bucket/keystore.jks, not working
>
> What value should I set the keystore location to?
> Thanks!
> Fanbin
>
> Also attached the full exception log:
>
> 2020-11-17 09:35:49
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
> of type JKS
> at
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
> at
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
> at
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
> at
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
> ... 15 more
> Caused by: org.apache.kafka.common.KafkaException:
> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
> of type JKS
> at
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
> at
> org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
> at
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
> at
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
> ... 19 more
> Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
> keystore
> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
> of type JKS
> at
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
> at
> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
> ... 22 more
> *Caused by: java.nio.file.NoSuchFileException:
> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
> at java.nio.file.Files.newByteChannel(Files.java:361)
> at java.nio.file.Files.newByteChannel(Files.java:407)
> at
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
> at java.nio.file.Files.newInputStream(Files.java:152)
> at
> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
>

Reply via email to