Glad you were able to figure it out.

Maybe it's moot with runner v2 becoming the default, but we really
should give a clearer error in this case.

On Wed, Jun 2, 2021 at 8:16 PM Chamikara Jayalath <chamik...@google.com> wrote:
>
> Great :)
>
> On Wed, Jun 2, 2021 at 8:15 PM Alex Koay <alexkoa...@gmail.com> wrote:
>>
>> Finally figured out the issue.
>> Can confirm that the kafka_taxi job is working as expected now.
>> The issue was that I ran the Dataflow job with an invalid experiments flag 
>> (runner_v2 instead of use_runner_v2), and I was getting logging messages (on 
>> 2.29) that said that I was using Runner V2 even though it seems that I 
>> wasn't.
>> Setting the correct flag fixes the issue (and so I get to see the correctly 
>> expanded transforms in the graph).
>> Thanks for your help Cham!
>>
>> Cheers
>> Alex
>>
>> On Thu, Jun 3, 2021 at 1:07 AM Chamikara Jayalath <chamik...@google.com> 
>> wrote:
>>>
>>> Can you mention the Job Logs you see in the Dataflow Cloud Console page for 
>>> your job ? Can you also mention the pipeline and configs you used for 
>>> Dataflow (assuming it's different from what's given in the example) ?
>>> Make sure that you used Dataflow Runner v2 (as given in the example).
>>> Are you providing null keys by any chance ? There's a known issue related 
>>> to that (but if you are just running the example, it should generate 
>>> appropriate keys).
>>>
>>> Unfortunately for actually debugging your job, I need a Dataflow customer 
>>> support ticket.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Wed, Jun 2, 2021 at 9:45 AM Alex Koay <alexkoa...@gmail.com> wrote:
>>>>
>>>> CC-ing Chamikara as he got omitted from the reply all I did earlier.
>>>>
>>>> On Thu, Jun 3, 2021 at 12:43 AM Alex Koay <alexkoa...@gmail.com> wrote:
>>>>>
>>>>> Yeah, I figured it wasn't supported correctly on DirectRunner. Stumbled 
>>>>> upon several threads saying so.
>>>>>
>>>>> On Dataflow, I've encountered a few different kinds of issues.
>>>>> 1. For the kafka_taxi example, the pipeline would start, the PubSub to 
>>>>> Kafka would run, but nothing gets read from Kafka (this seems to get 
>>>>> expanded as Dataflow shows KafkaIO.Read + Remove Kafka Metadata 
>>>>> sub-transforms.
>>>>> 2. For the snippet I shared above, I would vary it either with a "log" 
>>>>> transform or a direct "write" back to Kafka. Neither seems to work (and 
>>>>> the steps don't get expanded unlike the kafka_taxi example). With the 
>>>>> "write" step, I believe it didn't get captured in the Dataflow graph a 
>>>>> few times.
>>>>> 3. No errors appear in both Job Logs and Worker Logs, except for one 
>>>>> message emitted from the "log" step if that happens.
>>>>>
>>>>> All this is happening while I am producing ~4 messages/sec in Kafka. I 
>>>>> can verify that Kafka is working normally remotely and all (ran into some 
>>>>> issues setting it up).
>>>>> I've also tested the KafkaIO.read transform in Java and can confirm that 
>>>>> it works as expected.
>>>>>
>>>>> As an aside, I put together an ExternalTransform for MqttIO which you can 
>>>>> find here: 
>>>>> https://gist.github.com/alexkoay/df35eb12bc2afd8f502ef13bc915b33c
>>>>> I can confirm that it works in batch mode, but given that I couldn't get 
>>>>> Kafka to work with Dataflow, I don't have much confidence in getting this 
>>>>> to work.
>>>>>
>>>>> Thanks for your help.
>>>>>
>>>>> On Thu, Jun 3, 2021 at 12:05 AM Chamikara Jayalath <chamik...@google.com> 
>>>>> wrote:
>>>>>>
>>>>>> What error did you run into with Dataflow ? Did you observe any errors 
>>>>>> in worker logs ?
>>>>>> If you follow the steps given in the example here it should work. Make 
>>>>>> sure Dataflow workers have access to Kafka bootstrap servers you provide.
>>>>>>
>>>>>> Portable DirectRunner currently doesn't support streaming mode so you 
>>>>>> need to convert your pipeline to a batch pipeline and provide 
>>>>>> 'max_num_records' or 'max_read_time' to convert the Kafka source to a 
>>>>>> batch source.
>>>>>> This is tracked in https://issues.apache.org/jira/browse/BEAM-7514.
>>>>>>
>>>>>> Also portable runners (Flink, Spark etc.) have a known issue related to 
>>>>>> SDF checkpointing in streaming mode which results in messages not being 
>>>>>> pushed to subsequent steps. This is tracked in 
>>>>>> https://issues.apache.org/jira/browse/BEAM-11998.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Wed, Jun 2, 2021 at 8:28 AM Ahmet Altay <al...@google.com> wrote:
>>>>>>>
>>>>>>> /cc @Boyuan Zhang for kafka @Chamikara Jayalath for multi language 
>>>>>>> might be able to help.
>>>>>>>
>>>>>>> On Tue, Jun 1, 2021 at 9:39 PM Alex Koay <alexkoa...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have created a simple snippet as such:
>>>>>>>>
>>>>>>>> import apache_beam as beam
>>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>>>>>>
>>>>>>>> import logging
>>>>>>>> logging.basicConfig(level=logging.WARNING)
>>>>>>>>
>>>>>>>> opts = direct_opts
>>>>>>>> with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", 
>>>>>>>> "--streaming"])) as p:
>>>>>>>>     (
>>>>>>>>         p
>>>>>>>>         | "read" >> ReadFromKafka({"bootstrap.servers": 
>>>>>>>> f"localhost:9092"}, topics=["topic"])
>>>>>>>>         | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
>>>>>>>>     )
>>>>>>>>
>>>>>>>> I've set up a Kafka single node similar to the kafka_taxi README, and 
>>>>>>>> run this both on DirectRunner and DataflowRunner but it doesn't work. 
>>>>>>>> What I mean by this is that the Transform seems to be capturing data, 
>>>>>>>> but doesn't pass it on to subsequent transforms.
>>>>>>>> With DirectRunner, if I send a non-keyed Kafka message to the server 
>>>>>>>> it actually crashes (saying that it cannot encode null into a byte[]), 
>>>>>>>> hence why I believe the transform is actually running.
>>>>>>>>
>>>>>>>> My main objective really is to create a streaming ExternalTransform 
>>>>>>>> for MqttIO and SolaceIO 
>>>>>>>> (https://github.com/SolaceProducts/solace-apache-beam).
>>>>>>>> I've implemented the builder and registrars and they work in batch 
>>>>>>>> mode (with maxNumRecords) but otherwise it fails to read.
>>>>>>>>
>>>>>>>> With MqttIO, the streaming transform gets stuck waiting for one bundle 
>>>>>>>> to complete (if I continuously send messages into the MQTT server), 
>>>>>>>> and after stopping, the bundles finish but nothing gets passed on 
>>>>>>>> either.
>>>>>>>>
>>>>>>>> I appreciate any help I can get with this.
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Alex
>>>>>>>>
>>>>>>>>

Reply via email to