Hi Lydian, and thanks for answering.

If you have a working minimal example, could you share it? 
I’ve tried your suggestion, which required me to bump apache-beam to 2.41. I 
get a different error when I do that (I tried extracting the relevant stack 
trace bits):

2023-01-06 10:38:04,885 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: 
ReadFromKafka(beam:transform:org.apache.beam:kafka_read_without_metadata:v1)/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
 -> Flat Map -> Map -> 
[1]ReadFromKafka(beam:transform:org.apache.beam:kafka_read_without_metadata:v1)/Remove
 Kafka Metadata -> [2]{WindowInto(WindowIntoFn), Map(print)} (1/1)#0 
(d15bfc4aeee2cc9143ec7796cab67a09) switched from INITIALIZING to FAILED with 
failure cause: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
…
Caused by: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
…
Caused by: 
org.apache.beam.vendor.grpc.v1p43p2.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 finishConnect(..) failed: Connection refused: localhost/127.0.0.1:50747
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection 
refused
…
Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.InternalSubchannel$4
…
2023-01-06 10:38:44,627 ERROR 
org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: 
Thread 'grpc-default-executor-0' produced an uncaught exception. If you want to 
fail on uncaught exceptions, then configure cluster.uncaught-exception-handling 
accordingly
java.lang.NoClassDefFoundError: 
org/apache/beam/vendor/grpc/v1p43p2/io/grpc/internal/LogExceptionRunnable

What could this be?

Thanks,
Florentin

> On 6 Jan 2023, at 09:52, Lydian <lydia...@gmail.com> wrote:
> 
> Hi, 
> I had the same issue before, and I *think* the 8097 is only work if you're 
> using the flink runner, i.e., --runner=FlinkRunner, but it's not working for 
> PortableRunner. 
> 
> So far, what seems work for me is to do things like this:
> 
> from apache_beam.io.kafka import default_io_expansion_service
> ReadFromKafka(
>                 consumer_config=consumer_config,
>                 topics=[topic],
>                 with_metadata=False,
>                 expansion_service=default_io_expansion_service(
>                      append_args=[
>                       '--defaultEnvironmentType=PROCESS',
>                       
> '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
>                       '--experiments=use_deprecated_read',
>                  ]
>     )
> 
> this would ensure it will start an expansion service for you automatically 
> use that expansion service.  
> Also I have to provide extra --defaultEnvironmentType as well, otherwise it 
> will start another docker container, but i don't have docker available inside 
> my job manager, and thus have to force it to use PROCESS instead. 
> The /opt/apache/beam/boot can be get via adding this line to my Dockerfile
> 
> COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ 
> /opt/apache/beam_java/
> 
> Finally it is really important to add '--experiments=use_deprecated_read'. 
> Turned out beam python doesn't support the Splitable DoFn very well, and when 
> I test, the newer version will crash in about a day, but it would be totally 
> working well if I add this use_deprecated_read. 
> 
> Sincerely,
> Lydian Lee
> 
> 
> 
> On Thu, Jan 5, 2023 at 2:37 AM Florentin Hennecker <fhennec...@scortex.io 
> <mailto:fhennec...@scortex.io>> wrote:
> Hello, 
> 
> I’ve been trying to write a pipeline in Python with apache-beam==2.38.0, 
> which starts with a ReadFromKafka step.
> I have a flink cluster running in docker-compose, as well as the Flink job 
> service image, which should serve the expansion service on port 8097, 
> according to the docs: 
> https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html 
> <https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html>
> 
> However, when trying to launch the pipeline, the ReadFromKafka step fails 
> with: 
> failed to connect to all addresses; last error: UNAVAILABLE: 
> ipv4:127.0.0.1:8097 <http://127.0.0.1:8097/>: Socket closed
> 
> I have tried a range of possible values for the expansion service passed to 
> the ReadFromKafka step: internal docker compose DNS, my LAN IP address, and 
> they all fail with various reasons. I have tried using different runners, 
> changing everything that I could change, but no luck.
> 
> What am I doing wrong?
> 
> Here are the minimal beam pipeline and docker-compose to reproduce:
> 
> pipeline.py
> 
> import apache_beam as beam
> from apache_beam import window
> from apache_beam.transforms.trigger import (
>     AccumulationMode,
>     Repeatedly,
>     AfterCount,
> )
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import StandardOptions
> from apache_beam.io.kafka import ReadFromKafka
> 
> 
> pipeline_options = PipelineOptions(
>     [
>         "--runner=PortableRunner",
>         "--job_endpoint=localhost:8099",
>         "--environment_type=LOOPBACK",
>     ]
> )
> pipeline_options.view_as(StandardOptions).streaming = True
> with beam.Pipeline(options=pipeline_options) as pipeline:
>     _ = (
>         pipeline
>         | ReadFromKafka(
>             consumer_config={"bootstrap.servers": "redpanda:9092"},
>             topics=["my-topic"],
>             expansion_service="localhost:8097",
>         )
>         | beam.WindowInto(
>             window.GlobalWindows(),
>             trigger=Repeatedly(AfterCount(1)),
>             accumulation_mode=AccumulationMode.DISCARDING,
>         )
>         | beam.Map(print)
>     )
> 
> 
> docker-compose.yml
> 
> version: '3.7'
> services:
> 
>   flink-jobmanager:
>     image: flink:1.14
>     ports:
>       - "8081:8081"
>     command: jobmanager
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager        
> 
>   flink-taskmanager:
>     image: flink:1.14
>     depends_on:
>       - flink-jobmanager
>     command: taskmanager
>     scale: 1
>     environment:
>       - |
>         FLINK_PROPERTIES=
>         jobmanager.rpc.address: jobmanager
>         taskmanager.numberOfTaskSlots: 2
> 
>   beam-flink-jobserver:
>     image: apache/beam_flink1.14_job_server:latest
>     ports:
>       - "8097:8097"
>       - "8098:8098"
>       - "8099:8099"
> 
> Thanks in advance for your help!
> 
> Best,
> Florentin

Reply via email to