Glad you were able to figure it out. Maybe it's moot with runner v2 becoming the default, but we really should give a clearer error in this case.
On Wed, Jun 2, 2021 at 8:16 PM Chamikara Jayalath <chamik...@google.com> wrote: > > Great :) > > On Wed, Jun 2, 2021 at 8:15 PM Alex Koay <alexkoa...@gmail.com> wrote: >> >> Finally figured out the issue. >> Can confirm that the kafka_taxi job is working as expected now. >> The issue was that I ran the Dataflow job with an invalid experiments flag >> (runner_v2 instead of use_runner_v2), and I was getting logging messages (on >> 2.29) that said that I was using Runner V2 even though it seems that I >> wasn't. >> Setting the correct flag fixes the issue (and so I get to see the correctly >> expanded transforms in the graph). >> Thanks for your help Cham! >> >> Cheers >> Alex >> >> On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath <chamik...@google.com> >> wrote: >>> >>> Can you mention the Job Logs you see in the Dataflow Cloud Console page for >>> your job ? Can you also mention the pipeline and configs you used for >>> Dataflow (assuming it's different from what's given in the example) ? >>> Make sure that you used Dataflow Runner v2 (as given in the example). >>> Are you providing null keys by any chance ? There's a known issue related >>> to that (but if you are just running the example, it should generate >>> appropriate keys). >>> >>> Unfortunately for actually debugging your job, I need a Dataflow customer >>> support ticket. >>> >>> Thanks, >>> Cham >>> >>> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay <alexkoa...@gmail.com> wrote: >>>> >>>> CC-ing Chamikara as he got omitted from the reply all I did earlier. >>>> >>>> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay <alexkoa...@gmail.com> wrote: >>>>> >>>>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled >>>>> upon several threads saying so. >>>>> >>>>> On Dataflow, I've encountered a few different kinds of issues. >>>>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to >>>>> Kafka would run, but nothing gets read from Kafka (this seems to get >>>>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata >>>>> sub-transforms. >>>>> 2. For the snippet I shared above, I would vary it either with a "log" >>>>> transform or a direct "write" back to Kafka. Neither seems to work (and >>>>> the steps don't get expanded unlike the kafka_taxi example). With the >>>>> "write" step, I believe it didn't get captured in the Dataflow graph a >>>>> few times. >>>>> 3. No errors appear in both Job Logs and Worker Logs, except for one >>>>> message emitted from the "log" step if that happens. >>>>> >>>>> All this is happening while I am producing ~4 messages/sec in Kafka. I >>>>> can verify that Kafka is working normally remotely and all (ran into some >>>>> issues setting it up). >>>>> I've also tested the KafkaIO.read transform in Java and can confirm that >>>>> it works as expected. >>>>> >>>>> As an aside, I put together an ExternalTransform for MqttIO which you can >>>>> find here: >>>>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c >>>>> I can confirm that it works in batch mode, but given that I couldn't get >>>>> Kafka to work with Dataflow, I don't have much confidence in getting this >>>>> to work. >>>>> >>>>> Thanks for your help. >>>>> >>>>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath <chamik...@google.com> >>>>> wrote: >>>>>> >>>>>> What error did you run into with Dataflow ? Did you observe any errors >>>>>> in worker logs ? >>>>>> If you follow the steps given in the example here it should work. Make >>>>>> sure Dataflow workers have access to Kafka bootstrap servers you provide. >>>>>> >>>>>> Portable DirectRunner currently doesn't support streaming mode so you >>>>>> need to convert your pipeline to a batch pipeline and provide >>>>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a >>>>>> batch source. >>>>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514. >>>>>> >>>>>> Also portable runners (Flink, Spark etc.) have a known issue related to >>>>>> SDF checkpointing in streaming mode which results in messages not being >>>>>> pushed to subsequent steps. This is tracked in >>>>>> https://issues.apache.org/jira/browse/BEAM-11998. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote: >>>>>>> >>>>>>> /cc @Boyuan Zhang for kafka @Chamikara Jayalath for multi language >>>>>>> might be able to help. >>>>>>> >>>>>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I have created a simple snippet as such: >>>>>>>> >>>>>>>> import apache_beam as beam >>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>>>> >>>>>>>> import logging >>>>>>>> logging.basicConfig(level=logging.WARNING) >>>>>>>> >>>>>>>> opts = direct_opts >>>>>>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", >>>>>>>> "--streaming"])) as p: >>>>>>>> ( >>>>>>>> p >>>>>>>> | "read" >> ReadFromKafka({"bootstrap.servers": >>>>>>>> f"localhost:9092"}, topics=["topic"]) >>>>>>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) >>>>>>>> ) >>>>>>>> >>>>>>>> I've set up a Kafka single node similar to the kafka_taxi README, and >>>>>>>> run this both on DirectRunner and DataflowRunner but it doesn't work. >>>>>>>> What I mean by this is that the Transform seems to be capturing data, >>>>>>>> but doesn't pass it on to subsequent transforms. >>>>>>>> With DirectRunner, if I send a non-keyed Kafka message to the server >>>>>>>> it actually crashes (saying that it cannot encode null into a byte[]), >>>>>>>> hence why I believe the transform is actually running. >>>>>>>> >>>>>>>> My main objective really is to create a streaming ExternalTransform >>>>>>>> for MqttIO and SolaceIO >>>>>>>> (https://github.com/SolaceProducts/solace-apache-beam). >>>>>>>> I've implemented the builder and registrars and they work in batch >>>>>>>> mode (with maxNumRecords) but otherwise it fails to read. >>>>>>>> >>>>>>>> With MqttIO, the streaming transform gets stuck waiting for one bundle >>>>>>>> to complete (if I continuously send messages into the MQTT server), >>>>>>>> and after stopping, the bundles finish but nothing gets passed on >>>>>>>> either. >>>>>>>> >>>>>>>> I appreciate any help I can get with this. >>>>>>>> Thanks! >>>>>>>> >>>>>>>> Cheers >>>>>>>> Alex >>>>>>>> >>>>>>>>