WriteToText
transform, it spins up a beam python SDK docker container"From: "Sam Bourne" <samb...@gmail.com>
To: user@beam.apache.org
Subject: Re: How to run Beam pipeline in Flink [Python]?
Hi Mike,
I’m not an expert, but I have some experience running beam pipelines in Flink that require access to something on disk. When the Flink taskmanager executes the WriteToText
transform, it spins up a beam python SDK docker container to perform the work*. At the moment there is not a way to mount a directory into the SDK docker container, but there is an open ticket [1] to provide a way to specify a directory to mount.
I was running a fork for a little while with these changes [2] that allowed us to pass along some docker run options for how the SDK container was started (so we could include a mount). One thing to note is that the flink taskmanager workers need access to whatever directory you’re specifying (the E:\ drive in your example). I also created a quick sample of how to deploy Flink in Kubernetes here [3] which solved some problems we were running into dealing with the Flink job server sharing the same disk staging area.
Hopefully some of that helps,
-Sam
*There’s an exception to this where some transforms are replaced with something runner-specific. For example, the apache_beam.io.gcp.pubsub.ReadFromPubSub
transform. This gets “swapped out” to and executes the Java implementation of the transform directly on the Flink taskmanager worker and not within the SDK container.
[1] https://github.com/apache/beam/issues/19240
[2] https://github.com/apache/beam/pull/8982
[3] https://github.com/sambvfx/beam-flink-k8s
I try again maybe someone can help me with this?How to run Beam on Flink?I have code:def run():
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.14",
"--flink_master=localhost:8081",
"--environment_config=localhost:50000"
])
output_file = 'E:\\directory\\output.txt'
with beam.Pipeline(options=options) as p:
(p
| 'Create file lines' >> beam.Create([
'Each element must be a string.',
'It writes one element per line.',
'There are no guarantees on the line order.',
'The data might be written into multiple files.',
])
| 'Write to files' >> beam.io.WriteToText(output_file)
)
if __name__ == "__main__":
run()Should work. But for some reason Flink is not able to save to file:CHAIN MapPartition (MapPartition at [2]Write to files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) FAILED
Same problem if I want to open some file.
What is wrong here? I tried several example scripts - none is working.
If you could help me to take first step in Beam and Flink.
Regards
Mike