Finally figured out the issue.
Can confirm that the kafka_taxi job is working as expected now.
The issue was that I ran the Dataflow job with an invalid experiments flag
(runner_v2 instead of use_runner_v2), and I was getting logging messages
(on 2.29) that said that I was using Runner V2 even though it seems that I
wasn't.
Setting the correct flag fixes the issue (and so I get to see the correctly
expanded transforms in the graph).
Thanks for your help Cham!

Cheers
Alex

On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath <chamik...@google.com>
wrote:

> Can you mention the Job Logs you see in the Dataflow Cloud Console page
> for your job ? Can you also mention the pipeline and configs you used for
> Dataflow (assuming it's different from what's given in the example) ?
> Make sure that you used Dataflow Runner v2 (as given in the example).
> Are you providing null keys by any chance ? There's a known issue related
> to that (but if you are just running the example, it should generate
> appropriate keys).
>
> Unfortunately for actually debugging your job, I need a Dataflow customer 
> support
> ticket <https://cloud.google.com/dataflow/docs/support>.
>
> Thanks,
> Cham
>
> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay <alexkoa...@gmail.com> wrote:
>
>> CC-ing Chamikara as he got omitted from the reply all I did earlier.
>>
>> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay <alexkoa...@gmail.com> wrote:
>>
>>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled
>>> upon several threads saying so.
>>>
>>> On Dataflow, I've encountered a few different kinds of issues.
>>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to
>>> Kafka would run, but nothing gets read from Kafka (this seems to get
>>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata
>>> sub-transforms.
>>> 2. For the snippet I shared above, I would vary it either with a "log"
>>> transform or a direct "write" back to Kafka. Neither seems to work (and the
>>> steps don't get expanded unlike the kafka_taxi example). With the "write"
>>> step, I believe it didn't get captured in the Dataflow graph a few times.
>>> 3. No errors appear in both Job Logs and Worker Logs, except for one
>>> message emitted from the "log" step if that happens.
>>>
>>> All this is happening while I am producing ~4 messages/sec in Kafka. I
>>> can verify that Kafka is working normally remotely and all (ran into some
>>> issues setting it up).
>>> I've also tested the KafkaIO.read transform in Java and can confirm that
>>> it works as expected.
>>>
>>> As an aside, I put together an ExternalTransform for MqttIO which you
>>> can find here:
>>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
>>> I can confirm that it works in batch mode, but given that I couldn't get
>>> Kafka to work with Dataflow, I don't have much confidence in getting this
>>> to work.
>>>
>>> Thanks for your help.
>>>
>>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>> What error did you run into with Dataflow ? Did you observe any errors
>>>> in worker logs ?
>>>> If you follow the steps given in the example here
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/README.md>
>>>> it should work. Make sure Dataflow workers have access to Kafka bootstrap
>>>> servers you provide.
>>>>
>>>> Portable DirectRunner currently doesn't support streaming mode so you
>>>> need to convert your pipeline to a batch pipeline and provide
>>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a batch
>>>> source.
>>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>>>
>>>> Also portable runners (Flink, Spark etc.) have a known issue related to
>>>> SDF checkpointing in streaming mode which results in messages not being
>>>> pushed to subsequent steps. This is tracked in
>>>> https://issues.apache.org/jira/browse/BEAM-11998.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote:
>>>>
>>>>> /cc @Boyuan Zhang <boyu...@google.com> for kafka @Chamikara Jayalath
>>>>> <chamik...@google.com> for multi language might be able to help.
>>>>>
>>>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have created a simple snippet as such:
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>
>>>>>> import logging
>>>>>> logging.basicConfig(level=logging.WARNING)
>>>>>>
>>>>>> opts = direct_opts
>>>>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
>>>>>> "--streaming"])) as p:
>>>>>>     (
>>>>>>         p
>>>>>>         | "read" >> ReadFromKafka({"bootstrap.servers":
>>>>>> f"localhost:9092"}, topics=["topic"])
>>>>>>         | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>>>>>     )
>>>>>>
>>>>>> I've set up a Kafka single node similar to the kafka_taxi README, and
>>>>>> run this both on DirectRunner and DataflowRunner but it doesn't work. 
>>>>>> What
>>>>>> I mean by this is that the Transform seems to be capturing data, but
>>>>>> doesn't pass it on to subsequent transforms.
>>>>>> With DirectRunner, if I send a non-keyed Kafka message to the server
>>>>>> it actually crashes (saying that it cannot encode null into a byte[]),
>>>>>> hence why I believe the transform is actually running.
>>>>>>
>>>>>> My main objective really is to create a streaming ExternalTransform
>>>>>> for MqttIO and SolaceIO (
>>>>>> https://github.com/SolaceProducts/solace-apache-beam).
>>>>>> I've implemented the builder and registrars and they work in batch
>>>>>> mode (with maxNumRecords) but otherwise it fails to read.
>>>>>>
>>>>>> With MqttIO, the streaming transform gets stuck waiting for one
>>>>>> bundle to complete (if I continuously send messages into the MQTT 
>>>>>> server),
>>>>>> and after stopping, the bundles finish but nothing gets passed on either.
>>>>>>
>>>>>> I appreciate any help I can get with this.
>>>>>> Thanks!
>>>>>>
>>>>>> Cheers
>>>>>> Alex
>>>>>>
>>>>>>
>>>>>>

Reply via email to