Hi, We are trying to store the truststore.jks file inside the Flex Template Docker but while using it in the pipeline we are unable to locate it.
we tried pulling the image and we can see the file is present in the docker at \tmp\trust.jks but while using the same we get the following error. *Error Logs:* Error message from worker: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360) org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018) org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533) org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:136) org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975) org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322) org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524) Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631) org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612) org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.split(KafkaUnboundedSource.java:67) org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:134) org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.initializeCurrentReader(Read.java:843) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:975) org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:432) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322) org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524) org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791) org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145) org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360) org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530) org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78) org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018) org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313) org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245) org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213) org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158) org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533) org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151) org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116) java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:158) org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) ... 42 more Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:163) org.apache.kafka.common.security.ssl.SslEngineBuilder.<init>(SslEngineBuilder.java:104) org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95) org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:154) ... 46 more Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /tmp/trust.jks of type JKS org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:292) org.apache.kafka.common.security.ssl.SslEngineBuilder.createSSLContext(SslEngineBuilder.java:155) ... 49 more Caused by: java.nio.file.NoSuchFileException: /tmp/trust.jks java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219) java.base/java.nio.file.Files.newByteChannel(Files.java:371) java.base/java.nio.file.Files.newByteChannel(Files.java:422) java.base/java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:420) java.base/java.nio.file.Files.newInputStream(Files.java:156) org.apache.kafka.common.security.ssl.SslEngineBuilder$SecurityStore.load(SslEngineBuilder.java:285) ... 50 more *Here is the docker image that we used* FROM gcr.io/dataflow-templates-base/python38-template-launcher-base RUN apt-get update && \ apt-get install -y openjdk-11-jdk ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64 ARG WORKDIR=/dataflow/python/using_flex_template_adv3 RUN mkdir -p ${WORKDIR} WORKDIR ${WORKDIR} COPY . . COPY trust.jks /tmp/trust.jks ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py" ENV PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt" ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py" RUN apt-get update \ && apt-get install -y libffi-dev git \ && rm -rf /var/lib/apt/lists/* \ # Upgrade pip and install the requirements. && pip install --no-cache-dir --upgrade pip \ && pip install --no-cache-dir -r $PYTHON_REQUIREMENTS_FILE \ # Download the requirements to speed up launching the Dataflow job. && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $PYTHON_REQUIREMENTS_FILE RUN pip install --no-cache-dir apache-beam[gcp]==2.43.0 ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"] *Code Used:* read = p_input | "Read Json from Kafka" >> ReadFromKafka( consumer_config=self.consumer_config, topics=[self.topic], commit_offset_in_finalize=False, with_metadata=False ) *consumer_config contents:* bootstrap.servers: bs:9095 group.id: "Group1" ssl.truststore.location: '/tmp/trust.jks' sasl.mechanism: SCRAM-SHA-512 security.protocol: SASL_SSL sasl.jaas.config: "" We are trying to run this pipeline on GCP Dataflow with apache beam python sdk. -- Thanks and Regards, *Somnath Chouwdhury*Associate Engineer IV - Solutioning & Software Engineering +91 9284112897 | www.datametica.com Datametica Solutions Private Limited, Pune - India <https://www.linkedin.com/company/datametica> <https://twitter.com/DataMetica> <https://instagram.com/datametica_lifeatdm?utm_medium=copy_link> <https://www.facebook.com/datametica1> <https://www.youtube.com/channel/UCTBR2-f1mDSSpwD0BteETPQ>