Thanks for the detailed answer.
I forgot to mention that I am using FlinkRunner as my   Setup. Will this
work with this runner as well?


On 2024/01/10 13:34:28 Ferran Fernández Garrido wrote:
> Hi Yarden,
>
> If you are using Dataflow as a runner, you can already use
> ReadFromKafka (introduced originally in version 2.52). Dataflow will
> handle the expansion service automatically, so you don't have to do
> anything.
>
> If you want to run it locally for development purposes, you'll have to
> build the Docker image. You can check out the project and run:
>
> ./gradlew :sdks:java:container:java8:docker
> -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest (DOCKER ROOT
> -> repo location)
>
> Then, for instance, if you want to run your custom Docker image in
> Dataflow, you could do this:
>
> (Build the Python SDK -> python setup.py sdist to get
> apache-beam-2.53.0.dev0.tar.gz)
>
> You'll have to build the expansion service that Kafka uses (in case
> you've changed something in the KafkaIO) : ./gradlew
> :sdks:java:io:expansion-service:build
>
> python3 -m apache_beam.yaml.main --runner=DataflowRunner
> --project=project_id --region=region --temp_location=temp_location
> --pipeline_spec_file=yaml_pipeline.yml
> --staging_location=staging_location
> --sdk_location="path/apache-beam-2.53.0.dev0.tar.gz"
> --sdk_harness_container_image_overrides=".*java.*,$DOCKER_ROOT:latest"
> --streaming
>
> This is an example of how to read JSON events from Kafka in Beam YAML:
>
> - type: ReadFromKafka
> config:
> topic: 'TOPIC_NAME'
> format: JSON
> bootstrap_servers: 'BOOTSTRAP_SERVERS'
> schema: 'JSON_SCHEMA'
>
> Best,
> Ferran
>
> El mié, 10 ene 2024 a las 14:11, Yarden BenMoshe
> (<ya...@gmail.com>) escribió:
> >
> > Hi,
> >
> > I am trying to consume a kafka topic using ReadFromKafka transform.
> >
> > If i got it right, since ReadFromKafka is originally written in java,
an expansion service is needed and default env is set to DOCKER, and in
current implementation I can see that expansion service field is not
adjustable (im not able to pass it as part of the transform's config).
> > Is there currently a way to ReadFromKafka from a pipeline written with
yaml api? If so, an explanation would be much appreciated.
> >
> > I saw there's some workaround suggested online of using
Docker-in-Docker but would prefer to avoid it.
> >
> > Thanks
> > Yarden
>

Reply via email to