Any idea apachebeam community? El mié, 11 mar 2026 a las 17:49, Juan Romero (<[email protected]>) escribió:
> The problem is not related with Kafka broker. The problem is related when > i submit the job to apache flink > > El El mié, 11 mar. 2026 a la(s) 5:29 p.m., Nathan Fisher < > [email protected]> escribió: > >> You’re trying to access localhost which is bound to individual containers. >> >> Have you tried kafka-broker:9092 instead of localhost for the broker? >> >> On Wed, Mar 11, 2026 at 18:19 Juan Romero <[email protected]> wrote: >> >>> Hi!! I have the following services definition in docker compose file : >>> version: "3" >>> >>> >>> services: >>> kafka-broker: >>> image: apache/kafka:latest >>> container_name: broker >>> environment: >>> KAFKA_NODE_ID: 1 >>> KAFKA_PROCESS_ROLES: broker,controller >>> KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 >>> KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 >>> KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER >>> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >>> CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT >>> KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 >>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 >>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 >>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 >>> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 >>> KAFKA_NUM_PARTITIONS: 3 >>> ports: >>> - "9092:9092" >>> >>> >>> jobmanager: >>> image: flink:1.19.0-java17 >>> depends_on: >>> - kafka-broker >>> ports: >>> - "8081:8081" >>> command: jobmanager >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> jobmanager.rpc.address: jobmanager >>> taskmanager: >>> image: flink:1.19.0-java17 >>> depends_on: >>> - jobmanager >>> - kafka-broker >>> command: taskmanager >>> scale: 1 >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> jobmanager.rpc.address: jobmanager >>> taskmanager.numberOfTaskSlots: 2 >>> >>> The pipeline definition is this one: >>> >>> import json >>> import typing >>> import argparse >>> import apache_beam as beam >>> from apache_beam.io.kafka import ReadFromKafka >>> from apache_beam import Pipeline >>> from apache_beam.options.pipeline_options import PipelineOptions >>> >>> >>> >>> def format_message(element) : >>> key = 'test-key' >>> return (key.encode('utf8'), json.dumps(element).encode('utf8')) >>> >>> def run(): >>> parser = argparse.ArgumentParser() >>> args, beam_args = parser.parse_known_args() >>> print(beam_args) >>> pipeline_options = PipelineOptions( >>> beam_args, >>> streaming=True, >>> save_main_session=True >>> ) >>> consumer_config = { >>> 'bootstrap.servers': "localhost:9092", >>> 'group.id': 'consumer_id', # Consumer group ID >>> 'auto.offset.reset': 'earliest' # Start reading from the beginning if >>> no offset is committed, >>> } >>> >>> with Pipeline(options=pipeline_options) as p: >>> kafka_messages = p | "ReadFromKafka" >> ReadFromKafka( >>> consumer_config=consumer_config, >>> topics=['test-topic'] >>> ) >>> (kafka_messages | "Prepare " >> beam.Map(format_message >>> ).with_output_types(typing.Tuple[bytes, bytes]) >>> | "Print" >> beam.Map(print) >>> ) >>> >>> if __name__ == "__main__": >>> run() >>> >>> >>> The command im using to execute the pipeline is this : >>> >>> python pipeline.py --runner=FlinkRunner --flink_master=localhost:8081 >>> --environment_type=LOOPBACK --streaming >>> >>> I'm using apache beam 2.71 . This is the steps recommended by the >>> documentation but i got this error : >>> >>> WARNING:root:Waiting for grpc channel to be ready at localhost:52001. >>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >>> ERROR:root:java.lang.reflect.InaccessibleObjectException: Unable to make >>> field private final byte[] java.lang.String.value accessible: module >>> java.base does not "opens java.lang" to unnamed module @a4114d5 >>> Traceback (most recent call last): >>> File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py", >>> line 46, in <module> >>> run() >>> File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py", >>> line 34, in run >>> with Pipeline(options=pipeline_options) as p: >>> File >>> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/pipeline.py", >>> line 601, in __exit__ >>> self.result.wait_until_finish() >>> File >>> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/runners/portability/portable_runner.py", >>> line 614, in wait_until_finish >>> raise self._runtime_exception >>> RuntimeError: Pipeline >>> BeamApp-juansebastian0romero-0311221307-4bfaded2_9fc33fde-6e9b-4454-9c1f-b23109ed0983 >>> failed in state FAILED: java.lang.reflect.InaccessibleObjectException: >>> Unable to make field private final byte[] java.lang.String.value >>> accessible: module java.base does not "opens java.lang" to unnamed module >>> @a4114d5 >>> >>> I have tried many things like change the image to flink1.19-java11 , and >>> put the properties >>> >>> env.java.opts.all: > >>> --add-opens=java.base/java.lang=ALL-UNNAMED >>> --add-opens=java.base/java.util=ALL-UNNAMED >>> --add-opens=java.base/java.io=ALL-UNNAMED >>> --add-opens=java.base/java.nio=ALL-UNNAMED. , in flink config , but i >>> havent beee able to solve the problem. *Anyone have an example of >>> kafka reading in apache beam 2.71 running over flink. ?* >>> >>
