Hi,
I had the same issue before, and I *think* the 8097 is only work if you're
using the flink runner, i.e., --runner=FlinkRunner, but it's not working
for PortableRunner.

So far, what seems work for me is to do things like this:

from apache_beam.io.kafka import default_io_expansion_service
ReadFromKafka(
                consumer_config=consumer_config,
                topics=[topic],
                with_metadata=False,
                expansion_service=default_io_expansion_service(
                     append_args=[
                      '--defaultEnvironmentType=PROCESS',

'--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
                      '--experiments=use_deprecated_read',
                 ]
    )

this would ensure it will start an expansion service for you automatically
use that expansion service.
Also I have to provide extra --defaultEnvironmentType as well, otherwise it
will start another docker container, but i don't have docker available
inside my job manager, and thus have to force it to use PROCESS instead.
The /opt/apache/beam/boot can be get via adding this line to my Dockerfile

COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
/opt/apache/beam_java/

Finally it is really important to add '--experiments=use_deprecated_read'.
Turned out beam python doesn't support the Splitable DoFn very well, and
when I test, the newer version will crash in about a day, but it would be
totally working well if I add this use_deprecated_read.

Sincerely,
Lydian Lee



On Thu, Jan 5, 2023 at 2:37 AM Florentin Hennecker <fhennec...@scortex.io>
wrote:

> Hello,
>
> I’ve been trying to write a pipeline in Python with apache-beam==2.38.0,
> which starts with a ReadFromKafka step.
> I have a flink cluster running in docker-compose, as well as the Flink job
> service image, which should serve the expansion service on port 8097,
> according to the docs:
> https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html
>
> However, when trying to launch the pipeline, the ReadFromKafka step fails
> with:
> failed to connect to all addresses; last error: UNAVAILABLE: ipv4:
> 127.0.0.1:8097: Socket closed
>
> I have tried a range of possible values for the expansion service passed
> to the ReadFromKafka step: internal docker compose DNS, my LAN IP address,
> and they all fail with various reasons. I have tried using different
> runners, changing everything that I could change, but no luck.
>
> What am I doing wrong?
>
> Here are the minimal beam pipeline and docker-compose to reproduce:
>
> *pipeline.py*
>
> import apache_beam as beam
> from apache_beam import window
> from apache_beam.transforms.trigger import (
>     AccumulationMode,
>     Repeatedly,
>     AfterCount,
> )
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
> from apache_beam.io.kafka import ReadFromKafka
>
>
> pipeline_options = PipelineOptions(
>     [
>         "--runner=PortableRunner",
>         "--job_endpoint=localhost:8099",
>         "--environment_type=LOOPBACK",
>     ]
> )
> pipeline_options.view_as(StandardOptions).streaming = True
> with beam.Pipeline(options=pipeline_options) as pipeline:
>     _ = (
>         pipeline
>         | ReadFromKafka(
>             consumer_config={"bootstrap.servers": "redpanda:9092"},
>             topics=["my-topic"],
>             expansion_service="localhost:8097",
>         )
>         | beam.WindowInto(
>             window.GlobalWindows(),
>             trigger=Repeatedly(AfterCount(1)),
>             accumulation_mode=AccumulationMode.DISCARDING,
>         )
>         | beam.Map(print)
>     )
>
>
> *docker-compose.yml*
>
> version: '3.7'
> services:
>
>   flink-jobmanager:
>     image: flink:1.14
>     ports:
>       - "8081:8081"
>     command: jobmanager
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>
>   flink-taskmanager:
>     image: flink:1.14
>     depends_on:
>       - flink-jobmanager
>     command: taskmanager
>     scale: 1
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>         taskmanager.numberOfTaskSlots: 2
>
>   beam-flink-jobserver:
>     image: apache/beam_flink1.14_job_server:latest
>     ports:
>       - "8097:8097"
>       - "8098:8098"
>       - "8099:8099"
>
> Thanks in advance for your help!
>
> Best,
> Florentin
>

Reply via email to