DirectRunner is not well-tested for xlang transforms and you need to
specify jar_packages experimental flag for Java dependencies from Python
SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines.

On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> To clarify, Kafka dependency was already available as an embedded
> dependency in Java SDK Harness but not sure if this worked for
> DirectRunner. starting 2.22 we'll be staging dependencies from the
> environment during pipeline submission.
>
> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> Seems like Java dependency is not being properly set up when running the
>> cross-language Kafka step. I don't think this was available for Beam 2.21.
>> Can you try with the latest Beam HEAD or Beam 2.22 when it's released ?
>> +Heejong Lee <heej...@google.com>
>>
>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk <piotr.filip...@gmail.com>
>> wrote:
>>
>>> Pasting the error inline:
>>>
>>> ERROR:root:severity: ERROR
>>> timestamp {
>>>   seconds: 1591405163
>>>   nanos: 815000000
>>> }
>>> message: "Client failed to dequeue and process the value"
>>> trace: "org.apache.beam.sdk.util.UserCodeException:
>>> java.lang.NoClassDefFoundError:
>>> org/springframework/expression/EvaluationContext\n\tat
>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>> java.lang.Thread.run(Thread.java:748)\nCaused by:
>>> java.lang.NoClassDefFoundError:
>>> org/springframework/expression/EvaluationContext\n\tat
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused
>>> by: java.lang.ClassNotFoundException:
>>> org.springframework.expression.EvaluationContext\n\tat
>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat
>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat
>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat
>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat
>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat
>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat
>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat
>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown
>>> Source)\n\tat
>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat
>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat
>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat
>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat
>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat
>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>>> java.lang.Thread.run(Thread.java:748)\n"
>>> log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient"
>>>
>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <piotr.filip...@gmail.com>
>>> wrote:
>>>
>>>> Thank you for the suggestions.
>>>>
>>>> Neither Kafka nor Flink run in a docker container, they all run
>>>> locally. Furthermore, the same issue happens for Direct Runner. That being
>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a
>>>> different error, see attached.
>>>>
>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy <
>>>> venkat_pack...@yahoo.com> wrote:
>>>>
>>>>> Is Kafka itself running inside another container? If so inspect that
>>>>> container and see if it has a network alias and add that alias to your
>>>>> /etc/hosts file and map it to 127.0.0.1.
>>>>>
>>>>>
>>>>>
>>>>> *From:* Chamikara Jayalath <chamik...@google.com>
>>>>> *Sent:* Friday, June 5, 2020 2:58 PM
>>>>> *To:* Luke Cwik <lc...@google.com>
>>>>> *Cc:* user <user@beam.apache.org>; dev <d...@beam.apache.org>; Heejong
>>>>> Lee <heej...@google.com>
>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while
>>>>> fetching topic metadata
>>>>>
>>>>>
>>>>>
>>>>> Is it possible that "'localhost:9092'" is not available from the
>>>>> Docker environment where the Flink step is executed from ? Can you try
>>>>> specifying the actual IP address of the node running the Kafka broker ?
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>> +dev <d...@beam.apache.org> +Chamikara Jayalath <chamik...@google.com> 
>>>>> +Heejong
>>>>> Lee <heej...@google.com>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <
>>>>> piotr.filip...@gmail.com> wrote:
>>>>>
>>>>> I am unable to read from Kafka and getting the following warnings &
>>>>> errors when calling kafka.ReadFromKafka() (Python SDK):
>>>>>
>>>>>
>>>>>
>>>>> WARNING:root:severity: WARN
>>>>> timestamp {
>>>>>   seconds: 1591370012
>>>>>   nanos: 523000000
>>>>> }
>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node
>>>>> -1 could not be established. Broker may not be available."
>>>>> log_location: "org.apache.kafka.clients.NetworkClient"
>>>>> thread: "18"
>>>>>
>>>>>
>>>>>
>>>>> Finally the pipeline fails with:
>>>>>
>>>>>
>>>>>
>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException:
>>>>> java.lang.RuntimeException:
>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while
>>>>> fetching topic metadata
>>>>>
>>>>>
>>>>>
>>>>> See more complete log attached.
>>>>>
>>>>>
>>>>>
>>>>> The relevant code snippet:
>>>>>
>>>>>
>>>>>
>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'}
>>>>>
>>>>> ...
>>>>>
>>>>> kafka.ReadFromKafka(
>>>>>                 consumer_config=consumer_conf,
>>>>>                 topics=[args.topic],
>>>>>
>>>>> )
>>>>>
>>>>> ...
>>>>>
>>>>>
>>>>>
>>>>> Also see full python script attached.
>>>>>
>>>>>
>>>>>
>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I
>>>>> am also not able to read from topic.
>>>>>
>>>>>
>>>>>
>>>>> I am using kafka 2.5.0 and started the broker by following
>>>>> https://kafka.apache.org/quickstart - using default
>>>>> config/server.properties.
>>>>>
>>>>>
>>>>>
>>>>> Everything runs locally, and I verified that I can
>>>>> publish&consume from that topic using confluent_kafka library.
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Piotr
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>

Reply via email to