The problem is that Kafka is a "cross-language" transform that is implemented in Java. You have configured your Python pipeline to run with environment_type=EXTERNAL. However the Kafka transform has its own environment that has environment_type=DOCKER, it does not respect the environment_type you set for the pipeline. Currently I don't think there's a way to configure the environment for an external transform; I brought up this issue in a recent thread [1]. The reason for the error you are seeing is that environment_type=DOCKER tries to start up Docker inside your Flink workers, which must not have Docker installed.
[1] https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E On Thu, Mar 4, 2021 at 2:28 AM yilun zhang <ilyak1...@gmail.com> wrote: > hmmm, looks like I may fail due to docker environment: > Traceback (most recent call last): > File "/usr/local/lib/python3.7/runpy.py", line 193, in > _run_module_as_main > "__main__", mod_spec) > File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code > exec(code, run_globals) > File "/tmp/kafka_test.py", line 26, in <module> > run_pipeline() > File "/tmp/kafka_test.py", line 22, in run_pipeline > |beam.Map(print) > File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 583, in __exit__ > self.result.wait_until_finish() > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", > line 581, in wait_until_finish > raise self._runtime_exception > RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in > state FAILED: > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: > java.io.IOException: Cannot run program "docker": error=2, No such file or > directory > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) > at > org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38) > at > org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171) > at > org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95) > at > org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252) > at > org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) > ... 14 more > Caused by: java.io.IOException: error=2, No such file or directory > at java.lang.UNIXProcess.forkAndExec(Native Method) > at java.lang.UNIXProcess.<init>(UNIXProcess.java:247) > at java.lang.ProcessImpl.start(ProcessImpl.java:134) > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) > ... 28 more > > > I tried to create yaml like to mount docker in local like: > apiVersion: v1 > kind: Pod > metadata: > name: beam-pod > namespace: default > spec: > volumes: > - name: docker-sock > hostPath: > path: "/var/run/docker.sock" > type: Socket > - name: docker-directory > hostPath: > path: "/var/lib/docker" > type: Directory > containers: > - image: python3.7sdk_java11_beam:2.27.0 > command: ["sleep","3600"] > name: beam-pod > volumeMounts: > - mountPath: "/var/run/docker.sock" > name: docker-sock > readOnly: false > - mountPath: "/var/lib/docker" > name: docker-directory > readOnly: false > securityContext: > privileged: true > runAsUser: 0 > imagePullPolicy: Never > restartPolicy: Never > > And hello-world example runs fine: > root@beam-pod:/tmp# docker run hello-world > > Hello from Docker! > > > Thanks again! > Yilun > > On Thu, Mar 4, 2021 at 4:44 PM yilun zhang <ilyak1...@gmail.com> wrote: > >> We create a custom docker image, which include java runtime, python and >> docker environment to run our job. But encountered timeout exception: >> >> root@beam-pod:/tmp# PYTHONPATH='./' python -m kafka_test >> --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081 >> --flink_submit_uber_jar --environment_type=EXTERNAL >> --environment_config=localhost:50000 >> WARNING:root:Make sure that locally built Python SDK docker image has >> Python 3.7 interpreter. >> ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of >> TaskManager with id 10.190.29.80:6122-88ce88 timed out. >> at >> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442) >> at >> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> Our test code is super simple: >> >> >> import apache_beam as beam >> import apache_beam.transforms.window as window >> from apache_beam.options.pipeline_options import PipelineOptions >> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka >> from apache_beam.io import WriteToText >> >> def run_pipeline(): >> with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner", >> flink_master="beam-flink-cluster-jobmanager:8081", >> environment_type="EXTERNAL", >> environment_config="localhost:50000")) as p: >> (p >> | 'Read from Kafka' >> >> ReadFromKafka(consumer_config={'bootstrap.servers': >> 'zookeeper.libra.ubiquant:31090', >> >> 'auto.offset.reset': 'latest'}, >> topics=['test001']) >> | 'Par with 1' >> beam.Map(lambda word: (word, 1)) >> | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5)) >> | 'Group by key' >> beam.GroupByKey() >> | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1]))) >> # | "Write to Kafka" >> >> WriteToKafka(producer_config={'bootstrap.servers': >> 'zookeeper.libra.ubiquant:31090'}, topic='test001') >> | 'Write to text' >> WriteToText("/tmp/output2") >> ) >> >> if __name__ == '__main__': >> run_pipeline() >> >> >> Is there any suggestion on debugging direction? In flink UI, it looks >> like it failed from first step, ReadFromKafka. >> >> Thanks, >> Yilun >> >> On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kcwea...@google.com> wrote: >> >>> In Beam, the Kafka connector does not know anything about the underlying >>> execution engine (here Flink). It is instead translated by the runner into >>> a user defined function in Flink. So it is expected that the resulting DAG >>> does not look the same as it would with a native Flink source. >>> >>> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <ilyak1...@gmail.com> wrote: >>> >>>> So sorry for subscribing errors on my side resulted in multiple >>>> duplicate email! >>>> >>>> Thanks for reply and it does help! >>>> >>>> I am confused when submitting beam job with kafka connector to flink, I >>>> noticed that flink DAG diagram will included readFromKafka as part of flink >>>> workflow. while if we submit a pyflink job(connected with kafka) directly >>>> to flink, the flink workflow will exclude reading from kafka(which is the >>>> resource) but only has data processing parts. >>>> >>>> Is that how beam want flink to do? >>>> >>>> Thanks a lot and sincerely apologize again for silly duplicated emails! >>>> >>>> Yilun >>>> >>>> Sam Bourne <samb...@gmail.com>于2021年2月25日 周四上午11:58写道: >>>> >>>>> Hi Yilun! >>>>> >>>>> I made a quick proof of concept repo showcasing how to run a beam >>>>> pipeline in flink on k8s. It may be useful for you as reference. >>>>> >>>>> https://github.com/sambvfx/beam-flink-k8s >>>>> >>>>> >>>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <ilyak1...@gmail.com> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> Our team is trying to use beam with connector Kafka and runner flink >>>>>> to gather information and process data. We adopt python sdk and build in >>>>>> java 11 in python 3.7 sdk image as java runtime for kafka expansion >>>>>> service. >>>>>> so : >>>>>> image: beam python 3.7 docker image + build in java 11 >>>>>> connector: kafka >>>>>> runner: flink >>>>>> container: kubernetes >>>>>> >>>>>> We encounter an docker not found error when running: >>>>>> python3 -m kafka_test --runner=FlinkRunner >>>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar >>>>>> --environment_type=EXTERNAL --environment_config=localhost:50000 >>>>>> >>>>>> We notice that in https://beam.apache.org/roadmap/portability/ it >>>>>> mentioned the prerequisite also includes Docker. We wonder what is the >>>>>> docker usage here? Is there any suggested way to build docker in >>>>>> k8s container? (something maybe like sysbox for docker in docker?) >>>>>> >>>>>> Or maybe we should not use beam sdk+runner in k8s? >>>>>> >>>>>> Thanks, >>>>>> Yilun >>>>>> >>>>>