Hi all, I have created a simple snippet as such:
import apache_beam as beam from apache_beam.io.kafka import ReadFromKafka from apache_beam.options.pipeline_options import PipelineOptions import logging logging.basicConfig(level=logging.WARNING) opts = direct_opts with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner", "--streaming"])) as p: ( p | "read" >> ReadFromKafka({"bootstrap.servers": f"localhost:9092"}, topics=["topic"]) | "log" >> beam.FlatMap(lambda x: logging.error("%s", str(x)) ) I've set up a Kafka single node similar to the kafka_taxi README, and run this both on DirectRunner and DataflowRunner but it doesn't work. What I mean by this is that the Transform seems to be capturing data, but doesn't pass it on to subsequent transforms. With DirectRunner, if I send a non-keyed Kafka message to the server it actually crashes (saying that it cannot encode null into a byte[]), hence why I believe the transform is actually running. My main objective really is to create a streaming ExternalTransform for MqttIO and SolaceIO (https://github.com/SolaceProducts/solace-apache-beam). I've implemented the builder and registrars and they work in batch mode (with maxNumRecords) but otherwise it fails to read. With MqttIO, the streaming transform gets stuck waiting for one bundle to complete (if I continuously send messages into the MQTT server), and after stopping, the bundles finish but nothing gets passed on either. I appreciate any help I can get with this. Thanks! Cheers Alex