Hi Sam,
 
thanks for your help.
"When the Flink taskmanager executes the WriteToText transform, it spins up a beam python SDK docker container"
Hmmm... that's weird. I though Beam create pipeline in Flink language so once sent do Flink there's no relation with Beam anymore. This is what they say at least. And why some exotic 'docker' not just a library....
 
I try to learn Beam/Flink but I'm stuck. They say 'it's easy' but I see this is not - especially if official examples do not work.
 
Maybe you know; is there any other way I could prepare pipeline in Beam [Python] and run in Flink with 'flink run'?
 
Best
 
M.
 
Sent: Monday, June 20, 2022 at 7:38 PM
From: "Sam Bourne" <samb...@gmail.com>
To: user@beam.apache.org
Subject: Re: How to run Beam pipeline in Flink [Python]?

Hi Mike,

I’m not an expert, but I have some experience running beam pipelines in Flink that require access to something on disk. When the Flink taskmanager executes the WriteToText transform, it spins up a beam python SDK docker container to perform the work*. At the moment there is not a way to mount a directory into the SDK docker container, but there is an open ticket [1] to provide a way to specify a directory to mount.

I was running a fork for a little while with these changes [2] that allowed us to pass along some docker run options for how the SDK container was started (so we could include a mount). One thing to note is that the flink taskmanager workers need access to whatever directory you’re specifying (the E:\ drive in your example). I also created a quick sample of how to deploy Flink in Kubernetes here [3] which solved some problems we were running into dealing with the Flink job server sharing the same disk staging area.

Hopefully some of that helps,
-Sam

*There’s an exception to this where some transforms are replaced with something runner-specific. For example, the apache_beam.io.gcp.pubsub.ReadFromPubSub transform. This gets “swapped out” to and executes the Java implementation of the transform directly on the Flink taskmanager worker and not within the SDK container.

[1] https://github.com/apache/beam/issues/19240
[2] https://github.com/apache/beam/pull/8982
[3] https://github.com/sambvfx/beam-flink-k8s

 
 
On Sat, Jun 18, 2022 at 11:39 AM <pod...@gmx.com> wrote:
 
I try again maybe someone can help me with this?
 
How to run Beam on Flink?
 
I have code:
 
def run():
 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions
 
 options = PipelineOptions([
        "--runner=FlinkRunner",
        "--flink_version=1.14",
        "--flink_master=localhost:8081",
        "--environment_config=localhost:50000"
 ])
 output_file = 'E:\\directory\\output.txt'
 with beam.Pipeline(options=options) as p:
    (p
        | 'Create file lines' >> beam.Create([
          'Each element must be a string.',
          'It writes one element per line.',
          'There are no guarantees on the line order.',
          'The data might be written into multiple files.',
        ])
        | 'Write to files' >> beam.io.WriteToText(output_file)
    )
if __name__ == "__main__":
    run()
 
Should work. But for some reason Flink is not able to save to file:
 
CHAIN MapPartition (MapPartition at [2]Write to files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) FAILED
 
Same problem if I want to open some file.
What is wrong here? I tried several example scripts - none is working.
If you could help me to take first step in Beam and Flink.
Regards
Mike
 
 

Reply via email to