Thank you for clarifying. Would you mind clarifying whether the issues that I experience running Kafka IO on Flink (or DirectRunner for testing) specific to my setup etc. or this setup is not yet fully functional (for Python SDK)?
On Thu, Jun 18, 2020 at 12:03 PM Chamikara Jayalath <chamik...@google.com> wrote: > Beam does not have a concept of general availability. It's released with > Beam so available. Some of the APIs used by Kafka are experimental so are > subject to change (but less likely at this point). > Various runners may offer their own levels of availability for > cross-language transforms. > > Thanks, > Cham > > > On Thu, Jun 18, 2020 at 11:26 AM Piotr Filipiuk <piotr.filip...@gmail.com> > wrote: > >> I also wanted to clarify whether Kafka IO for Python SDK is general >> availability or is it still experimental? >> >> On Fri, Jun 12, 2020 at 2:52 PM Piotr Filipiuk <piotr.filip...@gmail.com> >> wrote: >> >>> For completeness I am also attaching task manager logs. >>> >>> On Fri, Jun 12, 2020 at 2:32 PM Piotr Filipiuk <piotr.filip...@gmail.com> >>> wrote: >>> >>>> Thank you for clarifying. >>>> >>>> I attempted to use FlinkRunner with 2.22 and I am getting the following >>>> error, which I am not sure how to debug: >>>> >>>> ERROR:root:java.lang.UnsupportedOperationException: The ActiveBundle >>>> does not have a registered bundle checkpoint handler. >>>> INFO:apache_beam.runners.portability.portable_runner:Job state changed >>>> to FAILED >>>> Starting pipeline kafka = 192.168.1.219:9092, topic = piotr-test >>>> Traceback (most recent call last): >>>> File "apache_beam/examples/streaming_wordcount_kafka.py", line 73, in >>>> <module> >>>> run() >>>> File "apache_beam/examples/streaming_wordcount_kafka.py", line 68, in >>>> run >>>> | "WriteUserScoreSums" >> beam.io.WriteToText(args.output) >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/pipeline.py", >>>> line 547, in __exit__ >>>> self.run().wait_until_finish() >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", >>>> line 583, in wait_until_finish >>>> raise self._runtime_exception >>>> RuntimeError: Pipeline >>>> BeamApp-piotr0filipiuk-0612212344-159c632b_45b947f8-bc64-4aad-a96c-6fb2bd834d60 >>>> failed in state FAILED: java.lang.UnsupportedOperationException: The >>>> ActiveBundle does not have a registered bundle checkpoint handler. >>>> >>>> My setup is (everything runs locally): >>>> Beam Version: 2.22.0. >>>> Kafka 2.5.0 (https://kafka.apache.org/quickstart - using default >>>> config/server.properties) >>>> Flink 1.10 ( >>>> https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/tutorials/local_setup.html >>>> ) >>>> >>>> I run the pipeline using the following command: >>>> >>>> python apache_beam/examples/streaming_wordcount_kafka.py >>>> --bootstrap_servers=192.168.1.219:9092 --topic=piotr-test >>>> --runner=FlinkRunner --flink_version=1.10 --flink_master= >>>> 192.168.1.219:8081 --environment_type=LOOPBACK >>>> >>>> I can see the following error in the logs: >>>> >>>> ERROR:apache_beam.runners.worker.data_plane:Failed to read inputs in >>>> the data plane. >>>> Traceback (most recent call last): >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", >>>> line 528, in _read_inputs >>>> for elements in elements_iterator: >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>> line 416, in __next__ >>>> return self._next() >>>> File >>>> "/Users/piotr.filipiuk/.virtualenvs/apache-beam/lib/python3.7/site-packages/grpc/_channel.py", >>>> line 689, in _next >>>> raise self >>>> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of >>>> RPC that terminated with: >>>> status = StatusCode.UNAVAILABLE >>>> details = "DNS resolution failed" >>>> debug_error_string = >>>> "{"created":"@1591997030.613849000","description":"Failed to pick >>>> subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3941,"referenced_errors":[{"created":"@1591997030.613847000","description":"Resolver >>>> transient >>>> failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":262,"referenced_errors":[{"created":"@1591997030.613847000","description":"DNS >>>> resolution >>>> failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":370,"grpc_status":14,"referenced_errors":[{"created":"@1591997030.613840000","description":"C-ares >>>> status is not ARES_SUCCESS: Misformatted domain >>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244,"referenced_errors":[{"created":"@1591997030.613728000","description":"C-ares >>>> status is not ARES_SUCCESS: Misformatted domain >>>> name","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":244}]}]}]}]}" >>>> > >>>> >>>> Which I thought might be a culprit, however it also happens when >>>> running the wordcount.py example >>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py> >>>> that succeeds. That error appears only for Flink 1.10, not for Flink 1.9. >>>> >>>> Full log attached. >>>> >>>> I would appreciate help and suggestions on how to proceed. >>>> >>>> >>>> On Mon, Jun 8, 2020 at 5:49 PM Heejong Lee <heej...@google.com> wrote: >>>> >>>>> DirectRunner is not well-tested for xlang transforms and you need to >>>>> specify jar_packages experimental flag for Java dependencies from Python >>>>> SDK. I'd recommend using 2.22 + FlinkRunner for xlang pipelines. >>>>> >>>>> On Mon, Jun 8, 2020 at 3:27 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> To clarify, Kafka dependency was already available as an embedded >>>>>> dependency in Java SDK Harness but not sure if this worked for >>>>>> DirectRunner. starting 2.22 we'll be staging dependencies from the >>>>>> environment during pipeline submission. >>>>>> >>>>>> On Mon, Jun 8, 2020 at 3:23 PM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> >>>>>>> Seems like Java dependency is not being properly set up when running >>>>>>> the cross-language Kafka step. I don't think this was available for Beam >>>>>>> 2.21. Can you try with the latest Beam HEAD or Beam 2.22 when it's >>>>>>> released ? >>>>>>> +Heejong Lee <heej...@google.com> >>>>>>> >>>>>>> On Mon, Jun 8, 2020 at 12:39 PM Piotr Filipiuk < >>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>> >>>>>>>> Pasting the error inline: >>>>>>>> >>>>>>>> ERROR:root:severity: ERROR >>>>>>>> timestamp { >>>>>>>> seconds: 1591405163 >>>>>>>> nanos: 815000000 >>>>>>>> } >>>>>>>> message: "Client failed to dequeue and process the value" >>>>>>>> trace: "org.apache.beam.sdk.util.UserCodeException: >>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>>>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>>>> java.lang.Thread.run(Thread.java:748)\nCaused by: >>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>> org/springframework/expression/EvaluationContext\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\nCaused >>>>>>>> by: java.lang.ClassNotFoundException: >>>>>>>> org.springframework.expression.EvaluationContext\n\tat >>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat >>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:419)\n\tat >>>>>>>> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)\n\tat >>>>>>>> java.lang.ClassLoader.loadClass(ClassLoader.java:352)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:478)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:121)\n\tat >>>>>>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.getSize(Read.java:465)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$2.outputWithTimestamp(FnApiDoFnRunner.java:497)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:504)\n\tat >>>>>>>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:715)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:688)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:874)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$600(FnApiDoFnRunner.java:121)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1340)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1335)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:75)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:139)\n\tat >>>>>>>> org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown >>>>>>>> Source)\n\tat >>>>>>>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:672)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)\n\tat >>>>>>>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:177)\n\tat >>>>>>>> org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:294)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)\n\tat >>>>>>>> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)\n\tat >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat >>>>>>>> java.lang.Thread.run(Thread.java:748)\n" >>>>>>>> log_location: >>>>>>>> "org.apache.beam.fn.harness.data.QueueingBeamFnDataClient" >>>>>>>> >>>>>>>> On Fri, Jun 5, 2020 at 3:57 PM Piotr Filipiuk < >>>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thank you for the suggestions. >>>>>>>>> >>>>>>>>> Neither Kafka nor Flink run in a docker container, they all run >>>>>>>>> locally. Furthermore, the same issue happens for Direct Runner. That >>>>>>>>> being >>>>>>>>> said changing from localhost:$PORT to $IP_ADDR:$PORT resulted in a >>>>>>>>> different error, see attached. >>>>>>>>> >>>>>>>>> On Fri, Jun 5, 2020 at 3:47 PM Venkat Muthuswamy < >>>>>>>>> venkat_pack...@yahoo.com> wrote: >>>>>>>>> >>>>>>>>>> Is Kafka itself running inside another container? If so inspect >>>>>>>>>> that container and see if it has a network alias and add that alias >>>>>>>>>> to your >>>>>>>>>> /etc/hosts file and map it to 127.0.0.1. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *From:* Chamikara Jayalath <chamik...@google.com> >>>>>>>>>> *Sent:* Friday, June 5, 2020 2:58 PM >>>>>>>>>> *To:* Luke Cwik <lc...@google.com> >>>>>>>>>> *Cc:* user <user@beam.apache.org>; dev <d...@beam.apache.org>; >>>>>>>>>> Heejong Lee <heej...@google.com> >>>>>>>>>> *Subject:* Re: Python SDK ReadFromKafka: Timeout expired while >>>>>>>>>> fetching topic metadata >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Is it possible that "'localhost:9092'" is not available from the >>>>>>>>>> Docker environment where the Flink step is executed from ? Can you >>>>>>>>>> try >>>>>>>>>> specifying the actual IP address of the node running the Kafka >>>>>>>>>> broker ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jun 5, 2020 at 2:53 PM Luke Cwik <lc...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> +dev <d...@beam.apache.org> +Chamikara Jayalath >>>>>>>>>> <chamik...@google.com> +Heejong Lee <heej...@google.com> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jun 5, 2020 at 8:29 AM Piotr Filipiuk < >>>>>>>>>> piotr.filip...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>> I am unable to read from Kafka and getting the following warnings >>>>>>>>>> & errors when calling kafka.ReadFromKafka() (Python SDK): >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> WARNING:root:severity: WARN >>>>>>>>>> timestamp { >>>>>>>>>> seconds: 1591370012 >>>>>>>>>> nanos: 523000000 >>>>>>>>>> } >>>>>>>>>> message: "[Consumer clientId=consumer-2, groupId=] Connection to >>>>>>>>>> node -1 could not be established. Broker may not be available." >>>>>>>>>> log_location: "org.apache.kafka.clients.NetworkClient" >>>>>>>>>> thread: "18" >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Finally the pipeline fails with: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> RuntimeError: org.apache.beam.sdk.util.UserCodeException: >>>>>>>>>> java.lang.RuntimeException: >>>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Timeout expired >>>>>>>>>> while >>>>>>>>>> fetching topic metadata >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> See more complete log attached. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> The relevant code snippet: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> consumer_conf = {"bootstrap.servers": 'localhost:9092'} >>>>>>>>>> >>>>>>>>>> ... >>>>>>>>>> >>>>>>>>>> kafka.ReadFromKafka( >>>>>>>>>> consumer_config=consumer_conf, >>>>>>>>>> topics=[args.topic], >>>>>>>>>> >>>>>>>>>> ) >>>>>>>>>> >>>>>>>>>> ... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Also see full python script attached. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I am using Beam Version: 2.21.0 and DirectRunner. For Flink >>>>>>>>>> Runner I am also not able to read from topic. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I am using kafka 2.5.0 and started the broker by following >>>>>>>>>> https://kafka.apache.org/quickstart - using default >>>>>>>>>> config/server.properties. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Everything runs locally, and I verified that I can >>>>>>>>>> publish&consume from that topic using confluent_kafka library. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> Piotr >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best regards, >>>>>>>>> Piotr >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards, >>>>>>>> Piotr >>>>>>>> >>>>>>> >>>> >>>> -- >>>> Best regards, >>>> Piotr >>>> >>> >>> >>> -- >>> Best regards, >>> Piotr >>> >> >> >> -- >> Best regards, >> Piotr >> > -- Best regards, Piotr