Hi Lydian, and thanks for answering. If you have a working minimal example, could you share it? I’ve tried your suggestion, which required me to bump apache-beam to 2.41. I get a different error when I do that (I tried extracting the relevant stack trace bits):
2023-01-06 10:38:04,885 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: ReadFromKafka(beam:transform:org.apache.beam:kafka_read_without_metadata:v1)/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> Map -> [1]ReadFromKafka(beam:transform:org.apache.beam:kafka_read_without_metadata:v1)/Remove Kafka Metadata -> [2]{WindowInto(WindowIntoFn), Map(print)} (1/1)#0 (d15bfc4aeee2cc9143ec7796cab67a09) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception … Caused by: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception … Caused by: org.apache.beam.vendor.grpc.v1p43p2.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: localhost/127.0.0.1:50747 Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused … Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.InternalSubchannel$4 … 2023-01-06 10:38:44,627 ERROR org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: Thread 'grpc-default-executor-0' produced an uncaught exception. If you want to fail on uncaught exceptions, then configure cluster.uncaught-exception-handling accordingly java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p43p2/io/grpc/internal/LogExceptionRunnable What could this be? Thanks, Florentin > On 6 Jan 2023, at 09:52, Lydian <lydia...@gmail.com> wrote: > > Hi, > I had the same issue before, and I *think* the 8097 is only work if you're > using the flink runner, i.e., --runner=FlinkRunner, but it's not working for > PortableRunner. > > So far, what seems work for me is to do things like this: > > from apache_beam.io.kafka import default_io_expansion_service > ReadFromKafka( > consumer_config=consumer_config, > topics=[topic], > with_metadata=False, > expansion_service=default_io_expansion_service( > append_args=[ > '--defaultEnvironmentType=PROCESS', > > '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}', > '--experiments=use_deprecated_read', > ] > ) > > this would ensure it will start an expansion service for you automatically > use that expansion service. > Also I have to provide extra --defaultEnvironmentType as well, otherwise it > will start another docker container, but i don't have docker available inside > my job manager, and thus have to force it to use PROCESS instead. > The /opt/apache/beam/boot can be get via adding this line to my Dockerfile > > COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ > /opt/apache/beam_java/ > > Finally it is really important to add '--experiments=use_deprecated_read'. > Turned out beam python doesn't support the Splitable DoFn very well, and when > I test, the newer version will crash in about a day, but it would be totally > working well if I add this use_deprecated_read. > > Sincerely, > Lydian Lee > > > > On Thu, Jan 5, 2023 at 2:37 AM Florentin Hennecker <fhennec...@scortex.io > <mailto:fhennec...@scortex.io>> wrote: > Hello, > > I’ve been trying to write a pipeline in Python with apache-beam==2.38.0, > which starts with a ReadFromKafka step. > I have a flink cluster running in docker-compose, as well as the Flink job > service image, which should serve the expansion service on port 8097, > according to the docs: > https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html > <https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html> > > However, when trying to launch the pipeline, the ReadFromKafka step fails > with: > failed to connect to all addresses; last error: UNAVAILABLE: > ipv4:127.0.0.1:8097 <http://127.0.0.1:8097/>: Socket closed > > I have tried a range of possible values for the expansion service passed to > the ReadFromKafka step: internal docker compose DNS, my LAN IP address, and > they all fail with various reasons. I have tried using different runners, > changing everything that I could change, but no luck. > > What am I doing wrong? > > Here are the minimal beam pipeline and docker-compose to reproduce: > > pipeline.py > > import apache_beam as beam > from apache_beam import window > from apache_beam.transforms.trigger import ( > AccumulationMode, > Repeatedly, > AfterCount, > ) > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.options.pipeline_options import StandardOptions > from apache_beam.io.kafka import ReadFromKafka > > > pipeline_options = PipelineOptions( > [ > "--runner=PortableRunner", > "--job_endpoint=localhost:8099", > "--environment_type=LOOPBACK", > ] > ) > pipeline_options.view_as(StandardOptions).streaming = True > with beam.Pipeline(options=pipeline_options) as pipeline: > _ = ( > pipeline > | ReadFromKafka( > consumer_config={"bootstrap.servers": "redpanda:9092"}, > topics=["my-topic"], > expansion_service="localhost:8097", > ) > | beam.WindowInto( > window.GlobalWindows(), > trigger=Repeatedly(AfterCount(1)), > accumulation_mode=AccumulationMode.DISCARDING, > ) > | beam.Map(print) > ) > > > docker-compose.yml > > version: '3.7' > services: > > flink-jobmanager: > image: flink:1.14 > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > > flink-taskmanager: > image: flink:1.14 > depends_on: > - flink-jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > > beam-flink-jobserver: > image: apache/beam_flink1.14_job_server:latest > ports: > - "8097:8097" > - "8098:8098" > - "8099:8099" > > Thanks in advance for your help! > > Best, > Florentin