You’re trying to access localhost which is bound to individual containers.
Have you tried kafka-broker:9092 instead of localhost for the broker? On Wed, Mar 11, 2026 at 18:19 Juan Romero <[email protected]> wrote: > Hi!! I have the following services definition in docker compose file : > version: "3" > > > services: > kafka-broker: > image: apache/kafka:latest > container_name: broker > environment: > KAFKA_NODE_ID: 1 > KAFKA_PROCESS_ROLES: broker,controller > KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 > KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 > KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER > KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: > CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT > KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 > KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 > KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 > KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 > KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 > KAFKA_NUM_PARTITIONS: 3 > ports: > - "9092:9092" > > > jobmanager: > image: flink:1.19.0-java17 > depends_on: > - kafka-broker > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager: > image: flink:1.19.0-java17 > depends_on: > - jobmanager > - kafka-broker > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > > The pipeline definition is this one: > > import json > import typing > import argparse > import apache_beam as beam > from apache_beam.io.kafka import ReadFromKafka > from apache_beam import Pipeline > from apache_beam.options.pipeline_options import PipelineOptions > > > > def format_message(element) : > key = 'test-key' > return (key.encode('utf8'), json.dumps(element).encode('utf8')) > > def run(): > parser = argparse.ArgumentParser() > args, beam_args = parser.parse_known_args() > print(beam_args) > pipeline_options = PipelineOptions( > beam_args, > streaming=True, > save_main_session=True > ) > consumer_config = { > 'bootstrap.servers': "localhost:9092", > 'group.id': 'consumer_id', # Consumer group ID > 'auto.offset.reset': 'earliest' # Start reading from the beginning if no > offset is committed, > } > > with Pipeline(options=pipeline_options) as p: > kafka_messages = p | "ReadFromKafka" >> ReadFromKafka( > consumer_config=consumer_config, > topics=['test-topic'] > ) > (kafka_messages | "Prepare " >> beam.Map(format_message > ).with_output_types(typing.Tuple[bytes, bytes]) > | "Print" >> beam.Map(print) > ) > > if __name__ == "__main__": > run() > > > The command im using to execute the pipeline is this : > > python pipeline.py --runner=FlinkRunner --flink_master=localhost:8081 > --environment_type=LOOPBACK --streaming > > I'm using apache beam 2.71 . This is the steps recommended by the > documentation but i got this error : > > WARNING:root:Waiting for grpc channel to be ready at localhost:52001. > WARNING:root:Waiting for grpc channel to be ready at localhost:39153. > WARNING:root:Waiting for grpc channel to be ready at localhost:39153. > WARNING:root:Waiting for grpc channel to be ready at localhost:39153. > WARNING:root:Waiting for grpc channel to be ready at localhost:39153. > ERROR:root:java.lang.reflect.InaccessibleObjectException: Unable to make > field private final byte[] java.lang.String.value accessible: module > java.base does not "opens java.lang" to unnamed module @a4114d5 > Traceback (most recent call last): > File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py", > line 46, in <module> > run() > File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py", > line 34, in run > with Pipeline(options=pipeline_options) as p: > File > "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/pipeline.py", > line 601, in __exit__ > self.result.wait_until_finish() > File > "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/runners/portability/portable_runner.py", > line 614, in wait_until_finish > raise self._runtime_exception > RuntimeError: Pipeline > BeamApp-juansebastian0romero-0311221307-4bfaded2_9fc33fde-6e9b-4454-9c1f-b23109ed0983 > failed in state FAILED: java.lang.reflect.InaccessibleObjectException: > Unable to make field private final byte[] java.lang.String.value > accessible: module java.base does not "opens java.lang" to unnamed module > @a4114d5 > > I have tried many things like change the image to flink1.19-java11 , and > put the properties > > env.java.opts.all: > > --add-opens=java.base/java.lang=ALL-UNNAMED > --add-opens=java.base/java.util=ALL-UNNAMED > --add-opens=java.base/java.io=ALL-UNNAMED > --add-opens=java.base/java.nio=ALL-UNNAMED. , in flink config , but i > havent beee able to solve the problem. *Anyone have an example of > kafka reading in apache beam 2.71 running over flink. ?* >
