Hello, 

I’ve been trying to write a pipeline in Python with apache-beam==2.38.0, which 
starts with a ReadFromKafka step.
I have a flink cluster running in docker-compose, as well as the Flink job 
service image, which should serve the expansion service on port 8097, according 
to the docs: 
https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html 
<https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html>

However, when trying to launch the pipeline, the ReadFromKafka step fails with: 
failed to connect to all addresses; last error: UNAVAILABLE: 
ipv4:127.0.0.1:8097: Socket closed

I have tried a range of possible values for the expansion service passed to the 
ReadFromKafka step: internal docker compose DNS, my LAN IP address, and they 
all fail with various reasons. I have tried using different runners, changing 
everything that I could change, but no luck.

What am I doing wrong?

Here are the minimal beam pipeline and docker-compose to reproduce:

pipeline.py

import apache_beam as beam
from apache_beam import window
from apache_beam.transforms.trigger import (
    AccumulationMode,
    Repeatedly,
    AfterCount,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.kafka import ReadFromKafka


pipeline_options = PipelineOptions(
    [
        "--runner=PortableRunner",
        "--job_endpoint=localhost:8099",
        "--environment_type=LOOPBACK",
    ]
)
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | ReadFromKafka(
            consumer_config={"bootstrap.servers": "redpanda:9092"},
            topics=["my-topic"],
            expansion_service="localhost:8097",
        )
        | beam.WindowInto(
            window.GlobalWindows(),
            trigger=Repeatedly(AfterCount(1)),
            accumulation_mode=AccumulationMode.DISCARDING,
        )
        | beam.Map(print)
    )


docker-compose.yml

version: '3.7'
services:

  flink-jobmanager:
    image: flink:1.14
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  flink-taskmanager:
    image: flink:1.14
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2

  beam-flink-jobserver:
    image: apache/beam_flink1.14_job_server:latest
    ports:
      - "8097:8097"
      - "8098:8098"
      - "8099:8099"

Thanks in advance for your help!

Best,
Florentin

Reply via email to