I works fine if I only use Kafka read/write as I only see a single container - two transforms (read and write) but a single container.
If I add SqlTransform, however, another container is created and it begins to create an error. My speculation is the containers don't recognise each other and get killed by the Flink task manager. I see containers are kept created and killed. Does every multi-language pipeline runs in a separate container? On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, <user@beam.apache.org> wrote: > Oh, sorry, I didn't see that. > > I would look earlier in the logs and see why it failed to bring up the > containers (or, if they started, look in the container logs to see why > they died). > > On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim <dott...@gmail.com> wrote: > > > > I am not using the python local runner but the flink runner. A flink > cluster is started locally. > > > > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user < > user@beam.apache.org> wrote: > >> > >> Streaming portable pipelines are not yet supported on the Python local > runner. > >> > >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim <dott...@gmail.com> wrote: > >> > > >> > Hello, > >> > > >> > I use the python SDK and my pipeline reads messages from Kafka and > transforms via SQL. I see two containers are created but it seems that they > don't communicate with each other so that the Flink task manager keeps > killing the containers. The Flink cluster runs locally. Is there a way to > run two multi-language pipelines (running on Docker) communicating with > each other? > >> > > >> > Cheers, > >> > Jaehyeon > >> > > >> > > >> > > >> > def run(): > >> > parser = argparse.ArgumentParser( > >> > description="Process statistics by user from website visit > event" > >> > ) > >> > parser.add_argument( > >> > "--inputs", > >> > default="inputs", > >> > help="Specify folder name that event records are saved", > >> > ) > >> > parser.add_argument( > >> > "--runner", default="FlinkRunner", help="Specify Apache Beam > Runner" > >> > ) > >> > opts = parser.parse_args() > >> > > >> > options = PipelineOptions() > >> > pipeline_opts = { > >> > "runner": opts.runner, > >> > "flink_master": "localhost:8081", > >> > "job_name": "traffic-agg-sql", > >> > "environment_type": "LOOPBACK", > >> > "streaming": True, > >> > "parallelism": 3, > >> > "experiments": [ > >> > "use_deprecated_read" > >> > ], ## https://github.com/apache/beam/issues/20979 > >> > "checkpointing_interval": "60000", > >> > } > >> > options = PipelineOptions([], **pipeline_opts) > >> > # Required, else it will complain that when importing worker > functions > >> > options.view_as(SetupOptions).save_main_session = True > >> > > >> > query = """ > >> > WITH cte AS ( > >> > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts > >> > FROM PCOLLECTION > >> > ) > >> > SELECT > >> > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS > window_start, > >> > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS > window_end, > >> > COUNT(*) AS page_view > >> > FROM cte > >> > GROUP BY > >> > TUMBLE(ts, INTERVAL '10' SECOND), id > >> > """ > >> > > >> > p = beam.Pipeline(options=options) > >> > ( > >> > p > >> > | "Read from Kafka" > >> > >> kafka.ReadFromKafka( > >> > consumer_config={ > >> > "bootstrap.servers": os.getenv( > >> > "BOOTSTRAP_SERVERS", > >> > "host.docker.internal:29092", > >> > ), > >> > "auto.offset.reset": "earliest", > >> > # "enable.auto.commit": "true", > >> > "group.id": "traffic-agg", > >> > }, > >> > topics=["website-visit"], > >> > ) > >> > | "Decode messages" >> beam.Map(decode_message) > >> > | "Parse elements" >> > beam.Map(parse_json).with_output_types(EventLog) > >> > | "Format timestamp" >> > beam.Map(format_timestamp).with_output_types(EventLog) > >> > | "Count per minute" >> SqlTransform(query) > >> > | beam.Map(print) > >> > ) > >> > > >> > logging.getLogger().setLevel(logging.INFO) > >> > logging.info("Building pipeline ...") > >> > > >> > p.run().wait_until_finish() > >> > > >> > Here is the error message from the flink UI. > >> > > >> > 2024-03-07 12:01:41 > >> > > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalStateException: No container running for id > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020) > >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458) > >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443) > >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310) > >> > at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) > >> > at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207) > >> > at > org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258) > >> > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > >> > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731) > >> > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > >> > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706) > >> > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672) > >> > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > >> > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) > >> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > >> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > >> > at java.base/java.lang.Thread.run(Thread.java:829) > >> > Caused by: java.lang.IllegalStateException: No container running for > id cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 > >> > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137) > >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259) > >> > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190) > >> > at > org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080) > >> > ... 20 more > >> > Suppressed: java.io.IOException: Received exit code 1 for command > 'docker kill > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8'. stderr: > Error response from daemon: Cannot kill container: > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8: Container > cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8 is not > running > >> > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255) > >> > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181) > >> > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161) > >> > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161) > >> > > >> > >