To use "ReadFromKafka" from Flink, you additionally need to
specify pipeline option "--experiments=use_deprecated_read" I believe. This
is due to a known issue: https://github.com/apache/beam/issues/20979

Thanks,
Cham

On Wed, Jan 10, 2024 at 9:56 PM Yarden BenMoshe <yarde...@gmail.com> wrote:

> Thanks for the detailed answer.
> I forgot to mention that I am using FlinkRunner as my   Setup. Will this
> work with this runner as well?
>
>
> On 2024/01/10 13:34:28 Ferran Fernández Garrido wrote:
> > Hi Yarden,
> >
> > If you are using Dataflow as a runner, you can already use
> > ReadFromKafka (introduced originally in version 2.52). Dataflow will
> > handle the expansion service automatically, so you don't have to do
> > anything.
> >
> > If you want to run it locally for development purposes, you'll have to
> > build the Docker image. You can check out the project and run:
> >
> > ./gradlew :sdks:java:container:java8:docker
> > -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest (DOCKER ROOT
> > -> repo location)
> >
> > Then, for instance, if you want to run your custom Docker image in
> > Dataflow, you could do this:
> >
> > (Build the Python SDK -> python setup.py sdist to get
> > apache-beam-2.53.0.dev0.tar.gz)
> >
> > You'll have to build the expansion service that Kafka uses (in case
> > you've changed something in the KafkaIO) : ./gradlew
> > :sdks:java:io:expansion-service:build
> >
> > python3 -m apache_beam.yaml.main --runner=DataflowRunner
> > --project=project_id --region=region --temp_location=temp_location
> > --pipeline_spec_file=yaml_pipeline.yml
> > --staging_location=staging_location
> > --sdk_location="path/apache-beam-2.53.0.dev0.tar.gz"
> > --sdk_harness_container_image_overrides=".*java.*,$DOCKER_ROOT:latest"
> > --streaming
> >
> > This is an example of how to read JSON events from Kafka in Beam YAML:
> >
> > - type: ReadFromKafka
> > config:
> > topic: 'TOPIC_NAME'
> > format: JSON
> > bootstrap_servers: 'BOOTSTRAP_SERVERS'
> > schema: 'JSON_SCHEMA'
> >
> > Best,
> > Ferran
> >
> > El mié, 10 ene 2024 a las 14:11, Yarden BenMoshe
> > (<ya...@gmail.com>) escribió:
> > >
> > > Hi,
> > >
> > > I am trying to consume a kafka topic using ReadFromKafka transform.
> > >
> > > If i got it right, since ReadFromKafka is originally written in java,
> an expansion service is needed and default env is set to DOCKER, and in
> current implementation I can see that expansion service field is not
> adjustable (im not able to pass it as part of the transform's config).
> > > Is there currently a way to ReadFromKafka from a pipeline written with
> yaml api? If so, an explanation would be much appreciated.
> > >
> > > I saw there's some workaround suggested online of using
> Docker-in-Docker but would prefer to avoid it.
> > >
> > > Thanks
> > > Yarden
> >
>

Reply via email to