Any idea apachebeam community?

El mié, 11 mar 2026 a las 17:49, Juan Romero (<[email protected]>) escribió:

> The problem is not related with Kafka broker. The problem is related when
> i submit the job to apache flink
>
> El El mié, 11 mar. 2026 a la(s) 5:29 p.m., Nathan Fisher <
> [email protected]> escribió:
>
>> You’re trying to access localhost which is bound to individual containers.
>>
>> Have you tried kafka-broker:9092 instead of localhost for the broker?
>>
>> On Wed, Mar 11, 2026 at 18:19 Juan Romero <[email protected]> wrote:
>>
>>> Hi!! I have the following services definition in docker compose file :
>>> version: "3"
>>>
>>>
>>> services:
>>> kafka-broker:
>>> image: apache/kafka:latest
>>> container_name: broker
>>> environment:
>>> KAFKA_NODE_ID: 1
>>> KAFKA_PROCESS_ROLES: broker,controller
>>> KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
>>> KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
>>> KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
>>> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
>>> CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
>>> KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
>>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>>> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>>> KAFKA_NUM_PARTITIONS: 3
>>> ports:
>>> - "9092:9092"
>>>
>>>
>>> jobmanager:
>>> image: flink:1.19.0-java17
>>> depends_on:
>>> - kafka-broker
>>> ports:
>>> - "8081:8081"
>>> command: jobmanager
>>> environment:
>>> - |
>>> FLINK_PROPERTIES=
>>> jobmanager.rpc.address: jobmanager
>>> taskmanager:
>>> image: flink:1.19.0-java17
>>> depends_on:
>>> - jobmanager
>>> - kafka-broker
>>> command: taskmanager
>>> scale: 1
>>> environment:
>>> - |
>>> FLINK_PROPERTIES=
>>> jobmanager.rpc.address: jobmanager
>>> taskmanager.numberOfTaskSlots: 2
>>>
>>> The pipeline definition is this one:
>>>
>>> import json
>>> import typing
>>> import argparse
>>> import apache_beam as beam
>>> from apache_beam.io.kafka import ReadFromKafka
>>> from apache_beam import Pipeline
>>> from apache_beam.options.pipeline_options import PipelineOptions
>>>
>>>
>>>
>>> def format_message(element) :
>>> key = 'test-key'
>>> return (key.encode('utf8'), json.dumps(element).encode('utf8'))
>>>
>>> def run():
>>> parser = argparse.ArgumentParser()
>>> args, beam_args = parser.parse_known_args()
>>> print(beam_args)
>>> pipeline_options = PipelineOptions(
>>> beam_args,
>>> streaming=True,
>>> save_main_session=True
>>> )
>>> consumer_config = {
>>> 'bootstrap.servers': "localhost:9092",
>>> 'group.id': 'consumer_id', # Consumer group ID
>>> 'auto.offset.reset': 'earliest' # Start reading from the beginning if
>>> no offset is committed,
>>> }
>>>
>>> with Pipeline(options=pipeline_options) as p:
>>> kafka_messages = p | "ReadFromKafka" >> ReadFromKafka(
>>> consumer_config=consumer_config,
>>> topics=['test-topic']
>>> )
>>> (kafka_messages | "Prepare " >> beam.Map(format_message
>>> ).with_output_types(typing.Tuple[bytes, bytes])
>>> | "Print" >> beam.Map(print)
>>> )
>>>
>>> if __name__ == "__main__":
>>> run()
>>>
>>>
>>> The command im using to execute the pipeline is this :
>>>
>>> python pipeline.py --runner=FlinkRunner --flink_master=localhost:8081
>>> --environment_type=LOOPBACK --streaming
>>>
>>> I'm using apache beam 2.71 . This is the steps recommended by the
>>> documentation but i got this error :
>>>
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:52001.
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>>> ERROR:root:java.lang.reflect.InaccessibleObjectException: Unable to make
>>> field private final byte[] java.lang.String.value accessible: module
>>> java.base does not "opens java.lang" to unnamed module @a4114d5
>>> Traceback (most recent call last):
>>>   File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
>>> line 46, in <module>
>>>     run()
>>>   File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
>>> line 34, in run
>>>     with Pipeline(options=pipeline_options) as p:
>>>   File
>>> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/pipeline.py",
>>> line 601, in __exit__
>>>     self.result.wait_until_finish()
>>>   File
>>> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/runners/portability/portable_runner.py",
>>> line 614, in wait_until_finish
>>>     raise self._runtime_exception
>>> RuntimeError: Pipeline
>>> BeamApp-juansebastian0romero-0311221307-4bfaded2_9fc33fde-6e9b-4454-9c1f-b23109ed0983
>>> failed in state FAILED: java.lang.reflect.InaccessibleObjectException:
>>> Unable to make field private final byte[] java.lang.String.value
>>> accessible: module java.base does not "opens java.lang" to unnamed module
>>> @a4114d5
>>>
>>> I have tried many things like change the image to flink1.19-java11 , and
>>> put the properties
>>>
>>> env.java.opts.all: >
>>> --add-opens=java.base/java.lang=ALL-UNNAMED
>>> --add-opens=java.base/java.util=ALL-UNNAMED
>>> --add-opens=java.base/java.io=ALL-UNNAMED
>>> --add-opens=java.base/java.nio=ALL-UNNAMED. , in flink config , but i
>>> havent beee able to solve the problem. *Anyone have an example of
>>> kafka reading in apache beam 2.71 running over flink. ?*
>>>
>>

Reply via email to