Seems like Java dependency is not being properly set up when running the cross-language Kafka step. I don't think this was available for Beam 2.21. Can you try with the latest Beam HEAD or Beam 2.22 when it's released ? +Heejong Lee <heej...@google.com>
On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk <piotr.filip...@gmail.com> wrote: > Pasting the error inline: > > ERROR:root:severity: ERROR > timestamp { > seconds: 1591405163 > nanos: 815000000 > } > message: "Client failed to dequeue and process the value" > trace: "org.apache.beam.sdk.util.UserCodeException: > java.lang.NoClassDefFoundError: > org/springframework/expression/EvaluationContext\n\tat > org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat > org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat > org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat > java.lang.Thread.run(Thread.java:748)\nCaused by: > java.lang.NoClassDefFoundError: > org/springframework/expression/EvaluationContext\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused > by: java.lang.ClassNotFoundException: > org.springframework.expression.EvaluationContext\n\tat > java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat > java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat > java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat > org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat > org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat > org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat > org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat > org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown > Source)\n\tat > org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat > org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat > org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat > org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat > org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat > org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat > org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat > java.lang.Thread.run(Thread.java:748)\n" > log_location: "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" > > On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk <piotr.filip...@gmail.com> > wrote: > >> Thank you for the suggestions. >> >> Neither Kafka nor Flink run in a docker container, they all run locally. >> Furthermore, the same issue happens for Direct Runner. That being said >> changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a different >> error, see attached. >> >> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >> venkat_pack...@yahoo.com> wrote: >> >>> Is Kafka itself running inside another container? If so inspect that >>> container and see if it has a network alias and add that alias to your >>> /etc/hosts file and map it to 127.0.0.1. >>> >>> >>> >>> *From:* Chamikara Jayalath <chamik...@google.com> >>> *Sent:* Friday, June 5, 2020 2:58 PM >>> *To:* Luke Cwik <lc...@google.com> >>> *Cc:* user <user@beam.apache.org>; dev <d...@beam.apache.org>; Heejong >>> Lee <heej...@google.com> >>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while fetching >>> topic metadata >>> >>> >>> >>> Is it possible that "'localhost:9092'" is not available from the Docker >>> environment where the Flink step is executed from ? Can you try specifying >>> the actual IP address of the node running the Kafka broker ? >>> >>> >>> >>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> wrote: >>> >>> +dev <d...@beam.apache.org> +Chamikara Jayalath <chamik...@google.com> >>> +Heejong >>> Lee <heej...@google.com> >>> >>> >>> >>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk <piotr.filip...@gmail.com> >>> wrote: >>> >>> I am unable to read from Kafka and getting the following warnings & >>> errors when calling kafka.ReadFromKafka() (Python SDK): >>> >>> >>> >>> WARNING:root:severity: WARN >>> timestamp { >>> seconds: 1591370012 >>> nanos: 523000000 >>> } >>> message: "[Consumer clientId=consumer-2, groupId=] Connection to node -1 >>> could not be established. Broker may not be available." >>> log_location: "org.apache.kafka.clients.NetworkClient" >>> thread: "18" >>> >>> >>> >>> Finally the pipeline fails with: >>> >>> >>> >>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>> java.lang.RuntimeException: >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired while >>> fetching topic metadata >>> >>> >>> >>> See more complete log attached. >>> >>> >>> >>> The relevant code snippet: >>> >>> >>> >>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>> >>> ... >>> >>> kafka.ReadFromKafka( >>> consumer_config=consumer_conf, >>> topics=[args.topic], >>> >>> ) >>> >>> ... >>> >>> >>> >>> Also see full python script attached. >>> >>> >>> >>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink Runner I am >>> also not able to read from topic. >>> >>> >>> >>> I am using kafka 2.5.0 and started the broker by following >>> https://kafka.apache.org/quickstart - using default >>> config/server.properties. >>> >>> >>> >>> Everything runs locally, and I verified that I can publish&consume from >>> that topic using confluent_kafka library. >>> >>> >>> >>> -- >>> >>> Best regards, >>> Piotr >>> >>> >> >> -- >> Best regards, >> Piotr >> > > > -- > Best regards, > Piotr >