Hi all,

I have created a simple snippet as such:

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions

import logging
logging.basicConfig(level=logging.WARNING)

opts = direct_opts
with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner",
"--streaming"])) as p:
    (
        p
        | "read" >> ReadFromKafka({"bootstrap.servers": f"localhost:9092"},
topics=["topic"])
        | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x))
    )

I've set up a Kafka single node similar to the kafka_taxi README, and run
this both on DirectRunner and DataflowRunner but it doesn't work. What I
mean by this is that the Transform seems to be capturing data, but doesn't
pass it on to subsequent transforms.
With DirectRunner, if I send a non-keyed Kafka message to the server it
actually crashes (saying that it cannot encode null into a byte[]), hence
why I believe the transform is actually running.

My main objective really is to create a streaming ExternalTransform for
MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam).
I've implemented the builder and registrars and they work in batch mode
(with maxNumRecords) but otherwise it fails to read.

With MqttIO, the streaming transform gets stuck waiting for one bundle to
complete (if I continuously send messages into the MQTT server), and after
stopping, the bundles finish but nothing gets passed on either.

I appreciate any help I can get with this.
Thanks!

Cheers
Alex

Reply via email to