Hello, I’ve been trying to write a pipeline in Python with apache-beam==2.38.0, which starts with a ReadFromKafka step. I have a flink cluster running in docker-compose, as well as the Flink job service image, which should serve the expansion service on port 8097, according to the docs: https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html <https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html>
However, when trying to launch the pipeline, the ReadFromKafka step fails with: failed to connect to all addresses; last error: UNAVAILABLE: ipv4:127.0.0.1:8097: Socket closed I have tried a range of possible values for the expansion service passed to the ReadFromKafka step: internal docker compose DNS, my LAN IP address, and they all fail with various reasons. I have tried using different runners, changing everything that I could change, but no luck. What am I doing wrong? Here are the minimal beam pipeline and docker-compose to reproduce: pipeline.py import apache_beam as beam from apache_beam import window from apache_beam.transforms.trigger import ( AccumulationMode, Repeatedly, AfterCount, ) from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.io.kafka import ReadFromKafka pipeline_options = PipelineOptions( [ "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK", ] ) pipeline_options.view_as(StandardOptions).streaming = True with beam.Pipeline(options=pipeline_options) as pipeline: _ = ( pipeline | ReadFromKafka( consumer_config={"bootstrap.servers": "redpanda:9092"}, topics=["my-topic"], expansion_service="localhost:8097", ) | beam.WindowInto( window.GlobalWindows(), trigger=Repeatedly(AfterCount(1)), accumulation_mode=AccumulationMode.DISCARDING, ) | beam.Map(print) ) docker-compose.yml version: '3.7' services: flink-jobmanager: image: flink:1.14 ports: - "8081:8081" command: jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager flink-taskmanager: image: flink:1.14 depends_on: - flink-jobmanager command: taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 beam-flink-jobserver: image: apache/beam_flink1.14_job_server:latest ports: - "8097:8097" - "8098:8098" - "8099:8099" Thanks in advance for your help! Best, Florentin