Finally figured out the issue. Can confirm that the kafka_taxi job is working as expected now. The issue was that I ran the Dataflow job with an invalid experiments flag (runner_v2 instead of use_runner_v2), and I was getting logging messages (on 2.29) that said that I was using Runner V2 even though it seems that I wasn't. Setting the correct flag fixes the issue (and so I get to see the correctly expanded transforms in the graph). Thanks for your help Cham!
Cheers Alex On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath <chamik...@google.com> wrote: > Can you mention the Job Logs you see in the Dataflow Cloud Console page > for your job ? Can you also mention the pipeline and configs you used for > Dataflow (assuming it's different from what's given in the example) ? > Make sure that you used Dataflow Runner v2 (as given in the example). > Are you providing null keys by any chance ? There's a known issue related > to that (but if you are just running the example, it should generate > appropriate keys). > > Unfortunately for actually debugging your job, I need a Dataflow customer > support > ticket <https://cloud.google.com/dataflow/docs/support>. > > Thanks, > Cham > > On Wed, Jun 2, 2021 at 9:45 AM Alex Koay <alexkoa...@gmail.com> wrote: > >> CC-ing Chamikara as he got omitted from the reply all I did earlier. >> >> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay <alexkoa...@gmail.com> wrote: >> >>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled >>> upon several threads saying so. >>> >>> On Dataflow, I've encountered a few different kinds of issues. >>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to >>> Kafka would run, but nothing gets read from Kafka (this seems to get >>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata >>> sub-transforms. >>> 2. For the snippet I shared above, I would vary it either with a "log" >>> transform or a direct "write" back to Kafka. Neither seems to work (and the >>> steps don't get expanded unlike the kafka_taxi example). With the "write" >>> step, I believe it didn't get captured in the Dataflow graph a few times. >>> 3. No errors appear in both Job Logs and Worker Logs, except for one >>> message emitted from the "log" step if that happens. >>> >>> All this is happening while I am producing ~4 messages/sec in Kafka. I >>> can verify that Kafka is working normally remotely and all (ran into some >>> issues setting it up). >>> I've also tested the KafkaIO.read transform in Java and can confirm that >>> it works as expected. >>> >>> As an aside, I put together an ExternalTransform for MqttIO which you >>> can find here: >>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c >>> I can confirm that it works in batch mode, but given that I couldn't get >>> Kafka to work with Dataflow, I don't have much confidence in getting this >>> to work. >>> >>> Thanks for your help. >>> >>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> What error did you run into with Dataflow ? Did you observe any errors >>>> in worker logs ? >>>> If you follow the steps given in the example here >>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md> >>>> it should work. Make sure Dataflow workers have access to Kafka bootstrap >>>> servers you provide. >>>> >>>> Portable DirectRunner currently doesn't support streaming mode so you >>>> need to convert your pipeline to a batch pipeline and provide >>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch >>>> source. >>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514. >>>> >>>> Also portable runners (Flink, Spark etc.) have a known issue related to >>>> SDF checkpointing in streaming mode which results in messages not being >>>> pushed to subsequent steps. This is tracked in >>>> https://issues.apache.org/jira/browse/BEAM-11998. >>>> >>>> Thanks, >>>> Cham >>>> >>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote: >>>> >>>>> /cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath >>>>> <chamik...@google.com> for multi language might be able to help. >>>>> >>>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I have created a simple snippet as such: >>>>>> >>>>>> import apache_beam as beam >>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>> from apache_beam.options.pipeline_options import PipelineOptions >>>>>> >>>>>> import logging >>>>>> logging.basicConfig(level=logging.WARNING) >>>>>> >>>>>> opts = direct_opts >>>>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", >>>>>> "--streaming"])) as p: >>>>>> ( >>>>>> p >>>>>> | "read" >> ReadFromKafka({"bootstrap.servers": >>>>>> f"localhost:9092"}, topics=["topic"]) >>>>>> | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) >>>>>> ) >>>>>> >>>>>> I've set up a Kafka single node similar to the kafka_taxi README, and >>>>>> run this both on DirectRunner and DataflowRunner but it doesn't work. >>>>>> What >>>>>> I mean by this is that the Transform seems to be capturing data, but >>>>>> doesn't pass it on to subsequent transforms. >>>>>> With DirectRunner, if I send a non-keyed Kafka message to the server >>>>>> it actually crashes (saying that it cannot encode null into a byte[]), >>>>>> hence why I believe the transform is actually running. >>>>>> >>>>>> My main objective really is to create a streaming ExternalTransform >>>>>> for MqttIO and SolaceIO ( >>>>>> https://github.com/SolaceProducts/solace-apache-beam). >>>>>> I've implemented the builder and registrars and they work in batch >>>>>> mode (with maxNumRecords) but otherwise it fails to read. >>>>>> >>>>>> With MqttIO, the streaming transform gets stuck waiting for one >>>>>> bundle to complete (if I continuously send messages into the MQTT >>>>>> server), >>>>>> and after stopping, the bundles finish but nothing gets passed on either. >>>>>> >>>>>> I appreciate any help I can get with this. >>>>>> Thanks! >>>>>> >>>>>> Cheers >>>>>> Alex >>>>>> >>>>>> >>>>>>