To use "ReadFromKafka" from Flink, you additionally need to specify pipeline option "--experiments=use_deprecated_read" I believe. This is due to a known issue: https://github.com/apache/beam/issues/20979
Thanks, Cham On Wed, Jan 10, 2024 at 9:56 PM Yarden BenMoshe <yarde...@gmail.com> wrote: > Thanks for the detailed answer. > I forgot to mention that I am using FlinkRunner as my Setup. Will this > work with this runner as well? > > > On 2024/01/10 13:34:28 Ferran Fernández Garrido wrote: > > Hi Yarden, > > > > If you are using Dataflow as a runner, you can already use > > ReadFromKafka (introduced originally in version 2.52). Dataflow will > > handle the expansion service automatically, so you don't have to do > > anything. > > > > If you want to run it locally for development purposes, you'll have to > > build the Docker image. You can check out the project and run: > > > > ./gradlew :sdks:java:container:java8:docker > > -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest (DOCKER ROOT > > -> repo location) > > > > Then, for instance, if you want to run your custom Docker image in > > Dataflow, you could do this: > > > > (Build the Python SDK -> python setup.py sdist to get > > apache-beam-2.53.0.dev0.tar.gz) > > > > You'll have to build the expansion service that Kafka uses (in case > > you've changed something in the KafkaIO) : ./gradlew > > :sdks:java:io:expansion-service:build > > > > python3 -m apache_beam.yaml.main --runner=DataflowRunner > > --project=project_id --region=region --temp_location=temp_location > > --pipeline_spec_file=yaml_pipeline.yml > > --staging_location=staging_location > > --sdk_location="path/apache-beam-2.53.0.dev0.tar.gz" > > --sdk_harness_container_image_overrides=".*java.*,$DOCKER_ROOT:latest" > > --streaming > > > > This is an example of how to read JSON events from Kafka in Beam YAML: > > > > - type: ReadFromKafka > > config: > > topic: 'TOPIC_NAME' > > format: JSON > > bootstrap_servers: 'BOOTSTRAP_SERVERS' > > schema: 'JSON_SCHEMA' > > > > Best, > > Ferran > > > > El mié, 10 ene 2024 a las 14:11, Yarden BenMoshe > > (<ya...@gmail.com>) escribió: > > > > > > Hi, > > > > > > I am trying to consume a kafka topic using ReadFromKafka transform. > > > > > > If i got it right, since ReadFromKafka is originally written in java, > an expansion service is needed and default env is set to DOCKER, and in > current implementation I can see that expansion service field is not > adjustable (im not able to pass it as part of the transform's config). > > > Is there currently a way to ReadFromKafka from a pipeline written with > yaml api? If so, an explanation would be much appreciated. > > > > > > I saw there's some workaround suggested online of using > Docker-in-Docker but would prefer to avoid it. > > > > > > Thanks > > > Yarden > > >