The problem is that Kafka is a "cross-language" transform that is
implemented in Java. You have  configured your Python pipeline to run with
environment_type=EXTERNAL. However the Kafka transform has its own
environment that has environment_type=DOCKER, it does not respect the
environment_type you set for the pipeline. Currently I don't think there's
a way to configure the environment for an external transform; I brought up
this issue in a recent thread [1]. The reason for the error you are seeing
is that environment_type=DOCKER tries to start up Docker inside your Flink
workers, which must not have Docker installed.

[1]
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E

On Thu, Mar 4, 2021 at 2:28 AM yilun zhang <ilyak1...@gmail.com> wrote:

> hmmm, looks like I may fail due to docker environment:
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/runpy.py", line 193, in
> _run_module_as_main
>     "__main__", mod_spec)
>   File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code
>     exec(code, run_globals)
>   File "/tmp/kafka_test.py", line 26, in <module>
>     run_pipeline()
>   File "/tmp/kafka_test.py", line 22, in run_pipeline
>     |beam.Map(print)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 583, in __exit__
>     self.result.wait_until_finish()
>   File
> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 581, in wait_until_finish
>     raise self._runtime_exception
> RuntimeError: Pipeline job-0f33f7f0-4fb4-4a57-a0fe-c4b2c34caff8 failed in
> state FAILED:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
> at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:189)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:171)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:95)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:131)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
> ... 14 more
> Caused by: java.io.IOException: error=2, No such file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
> at java.lang.ProcessImpl.start(ProcessImpl.java:134)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> ... 28 more
>
>
> I tried to create yaml like to mount docker in local like:
> apiVersion: v1
> kind: Pod
> metadata:
>   name: beam-pod
>   namespace: default
> spec:
>   volumes:
>     - name: docker-sock
>       hostPath:
>         path: "/var/run/docker.sock"
>         type: Socket
>     - name: docker-directory
>       hostPath:
>         path: "/var/lib/docker"
>         type: Directory
>   containers:
>   - image: python3.7sdk_java11_beam:2.27.0
>     command: ["sleep","3600"]
>     name: beam-pod
>     volumeMounts:
>       - mountPath: "/var/run/docker.sock"
>         name: docker-sock
>         readOnly: false
>       - mountPath: "/var/lib/docker"
>         name: docker-directory
>         readOnly: false
>     securityContext:
>       privileged: true
>       runAsUser: 0
>     imagePullPolicy: Never
>   restartPolicy: Never
>
> And hello-world example runs fine:
> root@beam-pod:/tmp# docker run hello-world
>
> Hello from Docker!
>
>
> Thanks again!
> Yilun
>
> On Thu, Mar 4, 2021 at 4:44 PM yilun zhang <ilyak1...@gmail.com> wrote:
>
>> We create a custom docker image, which include java runtime, python and
>> docker environment to run our job. But encountered timeout exception:
>>
>> root@beam-pod:/tmp# PYTHONPATH='./' python  -m kafka_test
>> --runner=FlinkRunner --flink_master=beam-flink-cluster-jobmanager:8081
>> --flink_submit_uber_jar --environment_type=EXTERNAL
>> --environment_config=localhost:50000
>> WARNING:root:Make sure that locally built Python SDK docker image has
>> Python 3.7 interpreter.
>> ERROR:root:java.util.concurrent.TimeoutException: The heartbeat of
>> TaskManager with id 10.190.29.80:6122-88ce88  timed out.
>> at
>> org.apache.flink.runtime.resourcemanager.ResourceManager$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(ResourceManager.java:1442)
>> at
>> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> Our test code is super simple:
>>
>>
>> import apache_beam as beam
>> import apache_beam.transforms.window as window
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
>> from apache_beam.io import WriteToText
>>
>> def run_pipeline():
>>   with beam.Pipeline(options=PipelineOptions( runner="FlinkRunner",
>>             flink_master="beam-flink-cluster-jobmanager:8081",
>>             environment_type="EXTERNAL",
>>             environment_config="localhost:50000")) as p:
>>     (p
>>      | 'Read from Kafka' >>
>> ReadFromKafka(consumer_config={'bootstrap.servers':
>> 'zookeeper.libra.ubiquant:31090',
>>
>>  'auto.offset.reset': 'latest'},
>>                                           topics=['test001'])
>>      | 'Par with 1' >> beam.Map(lambda word: (word, 1))
>>      | 'Window of 10 seconds' >> beam.WindowInto(window.FixedWindows(5))
>>      | 'Group by key' >> beam.GroupByKey()
>>      | 'Sum word counts' >> beam.Map(lambda kv: (kv[0], sum(kv[1])))
>>    #  | "Write to Kafka" >>
>> WriteToKafka(producer_config={'bootstrap.servers':
>> 'zookeeper.libra.ubiquant:31090'}, topic='test001')
>>      | 'Write to text' >> WriteToText("/tmp/output2")
>>     )
>>
>> if __name__ == '__main__':
>>   run_pipeline()
>>
>>
>> Is there any suggestion on debugging direction? In flink UI, it looks
>> like it failed from first step, ReadFromKafka.
>>
>> Thanks,
>> Yilun
>>
>> On Sat, Feb 27, 2021 at 2:16 AM Kyle Weaver <kcwea...@google.com> wrote:
>>
>>> In Beam, the Kafka connector does not know anything about the underlying
>>> execution engine (here Flink). It is instead translated by the runner into
>>> a user defined function in Flink. So it is expected that the resulting DAG
>>> does not look the same as it would with a native Flink source.
>>>
>>> On Fri, Feb 26, 2021 at 5:18 AM yilun zhang <ilyak1...@gmail.com> wrote:
>>>
>>>> So sorry for subscribing errors on my side resulted in multiple
>>>> duplicate email!
>>>>
>>>> Thanks for reply and it does help!
>>>>
>>>> I am confused when submitting beam job with kafka connector to flink, I
>>>> noticed that flink DAG diagram will included readFromKafka as part of flink
>>>> workflow. while if we submit a pyflink job(connected with kafka) directly
>>>> to flink, the flink workflow will exclude reading from kafka(which is the
>>>> resource) but only has data processing parts.
>>>>
>>>> Is that how beam want flink to do?
>>>>
>>>> Thanks a lot and sincerely apologize again for silly duplicated emails!
>>>>
>>>> Yilun
>>>>
>>>> Sam Bourne <samb...@gmail.com>于2021年2月25日 周四上午11:58写道:
>>>>
>>>>> Hi Yilun!
>>>>>
>>>>> I made a quick proof of concept repo showcasing how to run a beam
>>>>> pipeline in flink on k8s. It may be useful for you as reference.
>>>>>
>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>
>>>>>
>>>>> On Wed, Feb 24, 2021, 8:13 AM yilun zhang <ilyak1...@gmail.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> Our team is trying to use beam with connector Kafka and runner flink
>>>>>> to gather information and process data. We adopt python sdk and build in
>>>>>> java 11 in python 3.7 sdk image as java runtime for kafka expansion 
>>>>>> service.
>>>>>>  so :
>>>>>> image: beam python 3.7 docker image + build in java 11
>>>>>> connector: kafka
>>>>>> runner: flink
>>>>>> container: kubernetes
>>>>>>
>>>>>> We encounter an docker not found error when running:
>>>>>>  python3 -m kafka_test --runner=FlinkRunner
>>>>>> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
>>>>>> --environment_type=EXTERNAL --environment_config=localhost:50000
>>>>>>
>>>>>> We notice that in https://beam.apache.org/roadmap/portability/ it
>>>>>> mentioned the prerequisite also includes Docker. We wonder what is the
>>>>>> docker usage here? Is there any suggested way to build docker in
>>>>>> k8s container? (something maybe like sysbox for docker in docker?)
>>>>>>
>>>>>> Or maybe we should not use beam sdk+runner in k8s?
>>>>>>
>>>>>> Thanks,
>>>>>> Yilun
>>>>>>
>>>>>

Reply via email to