trying to put the jks on s3... unfortunately, no luck.
i have properties set up:
'properties.ssl.keystore.location'='s3://application-bucket/kafka.keystore.jks'


got the following error message:
    at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore
.load(SslEngineBuilder.java:292)
    at org.apache.kafka.common.security.ssl.SslEngineBuilder
.createSSLContext(SslEngineBuilder.java:144)
    ... 22 more
*Caused by: java.nio.file.NoSuchFileException:
s3:/application-bucket/kafka.keystore.jks*
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86
)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(
UnixFileSystemProvider.java:214)
    at java.nio.file.Files.newByteChannel(Files.java:361)
    at java.nio.file.Files.newByteChannel(Files.java:407)
    at java.nio.file.spi.FileSystemProvider.newInputStream(
FileSystemProvider.java:384)
    at java.nio.file.Files.newInputStream(Files.java:152)
    at org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore
.load(SslEngineBuilder.java:285)
    ... 23 more

On Tue, Nov 17, 2020 at 10:01 PM Fanbin Bu <fanbin...@coinbase.com> wrote:

> let me try to put it on s3 and change code like:
> 'properties.ssl.keystore.location'='s3://my-bucket/keystore.jks
>
> Thanks,
> Fanbin
>
> On Tue, Nov 17, 2020 at 6:43 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Try with hdfs folder with Gard coded value inside the code and see what
>> happens.
>>
>> On Tue, 17 Nov 2020 at 18:42, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Can you use hdfs as keystone location ? Are you using oozie to run your
>>> job ?
>>>
>>> On Tue, 17 Nov 2020 at 17:54, Fanbin Bu <fanbin...@coinbase.com> wrote:
>>>
>>>> Hi Sri, my code is not github. but here is the skeleton.
>>>>
>>>>
>>>> val stmt = s"""
>>>>   |create table ${table.name} (${schema}, ${watermark})
>>>>   |with(
>>>>   |'connector' = 'kafka',
>>>>   |'topic' = '${table.topic}',
>>>>   |'scan.startup.mode'= '${table.scanStartUpMode}',
>>>>   |'properties.zookeeper.connect'='${Globals.ZOOKEEPER_CONNECT}',
>>>>   |'properties.bootstrap.servers'='${Globals.BOOTSTRAP_SERVERS}',
>>>>   |'properties.ssl.keystore.location'='${Globals.SSL_KEYSTORE_LOCATION}',
>>>>   |'properties.ssl.keystore.password'='${Globals.KEYSTORE_PASS}',
>>>>   |'properties.ssl.key.password'='${Globals.KEYSTORE_PASS}',
>>>>   |'properties.security.protocol'='SSL',
>>>>   |'properties.ssl.keystore.type'='JKS',
>>>>   |'properties.ssl.truststore.type'='JKS',
>>>>   |'properties.ssl.enabled.protocols'='TLSv1.2,TLSv1.1,TLSv1',
>>>>   |'properties.group.id' = '${table.name}_group_id',
>>>>   |'format' = 'json',
>>>>   |'json.ignore-parse-errors' = 'true'
>>>>   |)
>>>> """.stripMargin
>>>>
>>>>
>>>> tEnv.executeSql(stmt)
>>>>
>>>>
>>>> On Tue, Nov 17, 2020 at 5:40 PM sri hari kali charan Tummala <
>>>> kali.tumm...@gmail.com> wrote:
>>>>
>>>>> Hi Fanbin,
>>>>>
>>>>> Can you share your Flink code which reads from Kafka using SSL ?
>>>>>
>>>>> Is your code on GitHub ?
>>>>>
>>>>> Thanks
>>>>> Sri
>>>>>
>>>>>
>>>>> On Tue, 17 Nov 2020 at 17:14, Fanbin Bu <fanbin...@coinbase.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm running Flink 1.11 on EMR and would like to read Kafka via SSL. I
>>>>>> tried to put keystore.jks location under /usr/lib/flink/... like:
>>>>>>
>>>>>> export
>>>>>> SSL_KEYSTORE_LOCATION=/usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>>>>>
>>>>>> Notice that this is on EMR master(master) node. Both JM and TMs are
>>>>>> on EMR core(slave) nodes.
>>>>>>
>>>>>> However, I got exception: *Caused by:
>>>>>> java.nio.file.NoSuchFileException:
>>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
>>>>>> even though the file is there
>>>>>>
>>>>>> [hadoop@ip-10-200-41-39 flink]$ ll
>>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>>>>> -rw-r--r-- 1 root root 5565 Nov 17 22:24
>>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>>>>>
>>>>>> Where should the keystore.jks be located?
>>>>>>
>>>>>> Thanks,
>>>>>> Fanbin
>>>>>>
>>>>>>
>>>>>> Here is the full log.
>>>>>> 2020-11-17 09:35:49
>>>>>> org.apache.kafka.common.KafkaException: Failed to construct kafka
>>>>>> consumer
>>>>>> at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
>>>>>> at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666)
>>>>>> at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>>>>>> at
>>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:550)
>>>>>> at
>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: org.apache.kafka.common.KafkaException:
>>>>>> org.apache.kafka.common.KafkaException:
>>>>>> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
>>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>>>>> of type JKS
>>>>>> at
>>>>>> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:71)
>>>>>> at
>>>>>> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
>>>>>> at
>>>>>> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
>>>>>> at
>>>>>> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
>>>>>> at
>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741)
>>>>>> ... 15 more
>>>>>> Caused by: org.apache.kafka.common.KafkaException:
>>>>>> org.apache.kafka.common.KafkaException: Failed to load SSL keystore
>>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>>>>> of type JKS
>>>>>> at
>>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163)
>>>>>> at
>>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104)
>>>>>> at
>>>>>> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
>>>>>> at
>>>>>> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:69)
>>>>>> ... 19 more
>>>>>> Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL
>>>>>> keystore
>>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks
>>>>>> of type JKS
>>>>>> at
>>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292)
>>>>>> at
>>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:144)
>>>>>> ... 22 more
>>>>>> *Caused by: java.nio.file.NoSuchFileException:
>>>>>> /usr/lib/flink/kafka/kafka_2.11-2.3.1/config/ssl/keystore/kafka.keystore.jks*
>>>>>> at
>>>>>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>>>>>> at
>>>>>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>>>>>> at
>>>>>> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>>>>>> at
>>>>>> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>>>>>> at java.nio.file.Files.newByteChannel(Files.java:361)
>>>>>> at java.nio.file.Files.newByteChannel(Files.java:407)
>>>>>> at
>>>>>> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>>>>>> at java.nio.file.Files.newInputStream(Files.java:152)
>>>>>> at
>>>>>> org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285)
>>>>>>
>>>>>>
>>>>> --
>>>>> Thanks & Regards
>>>>> Sri Tummala
>>>>>
>>>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

Reply via email to