Thanks for the detailed answer. I forgot to mention that I am using FlinkRunner as my Setup. Will this work with this runner as well?
On 2024/01/10 13:34:28 Ferran Fernández Garrido wrote: > Hi Yarden, > > If you are using Dataflow as a runner, you can already use > ReadFromKafka (introduced originally in version 2.52). Dataflow will > handle the expansion service automatically, so you don't have to do > anything. > > If you want to run it locally for development purposes, you'll have to > build the Docker image. You can check out the project and run: > > ./gradlew :sdks:java:container:java8:docker > -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest (DOCKER ROOT > -> repo location) > > Then, for instance, if you want to run your custom Docker image in > Dataflow, you could do this: > > (Build the Python SDK -> python setup.py sdist to get > apache-beam-2.53.0.dev0.tar.gz) > > You'll have to build the expansion service that Kafka uses (in case > you've changed something in the KafkaIO) : ./gradlew > :sdks:java:io:expansion-service:build > > python3 -m apache_beam.yaml.main --runner=DataflowRunner > --project=project_id --region=region --temp_location=temp_location > --pipeline_spec_file=yaml_pipeline.yml > --staging_location=staging_location > --sdk_location="path/apache-beam-2.53.0.dev0.tar.gz" > --sdk_harness_container_image_overrides=".*java.*,$DOCKER_ROOT:latest" > --streaming > > This is an example of how to read JSON events from Kafka in Beam YAML: > > - type: ReadFromKafka > config: > topic: 'TOPIC_NAME' > format: JSON > bootstrap_servers: 'BOOTSTRAP_SERVERS' > schema: 'JSON_SCHEMA' > > Best, > Ferran > > El mié, 10 ene 2024 a las 14:11, Yarden BenMoshe > (<ya...@gmail.com>) escribió: > > > > Hi, > > > > I am trying to consume a kafka topic using ReadFromKafka transform. > > > > If i got it right, since ReadFromKafka is originally written in java, an expansion service is needed and default env is set to DOCKER, and in current implementation I can see that expansion service field is not adjustable (im not able to pass it as part of the transform's config). > > Is there currently a way to ReadFromKafka from a pipeline written with yaml api? If so, an explanation would be much appreciated. > > > > I saw there's some workaround suggested online of using Docker-in-Docker but would prefer to avoid it. > > > > Thanks > > Yarden >