Further to that: At the Flink Job/Task Manager end, I configured/setup the following: HADOOP_CONF_DIR HADOOP_USER_NAME hadoop jars copied under $FLINK_HOME/lib
And made sure a pyflink script is able to read and write into the hdfs system. Should I setup / configure anything at the Job Server? I came across this thread, which helped to some extent, but not completely: https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E From: "Ramanan, Buvana (Nokia - US/Murray Hill)" <buvana.rama...@nokia-bell-labs.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Wednesday, May 27, 2020 at 8:30 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Flink Runner with HDFS Hello, I am trying to read from, process and write data to HDFS with beam pipelines in python. Using Flink Runner. Beam version 2.19.0. Flink 1.9.2 My initial code (pasted below my sign) to make sure I can read and write, works fine on Local Runner. However, I get the following error message (pasted below my sign) at the Flink Job manager when I use Portable Runner with Flink Runner I invoke Job server as: docker run --net=host apachebeam/flink1.9_job_server:latest --flink-master $IP:8081 --job-host $IP --job-port 8099 I am supplying the pipeline options in my python code. Yet the error message is regarding the missing pipeline_options. And strangely the python command at the client side does not return any error message and simply terminates. Please let me know how I can fix my code and get this running. Thank you, Regards, Buvana ----- Code that reads and writes to hdfs ------ import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import HadoopFileSystemOptions from apache_beam.io.hadoopfilesystem import HadoopFileSystem options = PipelineOptions([ "--hdfs_host=XXXXXX.143", "--hdfs_user=hdfs", "--hdfs_port=50070", "--runner=PortableRunner", "--job_endpoint=XXXXXXX.134:8099" ]) p = beam.Pipeline(options=options) HDFS_HOSTNAME = XXXXXX.143' HDFS_PORT = 50070 hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, hdfs_port=HDFS_PORT, hdfs_user='hdfs') hdfs_client = HadoopFileSystem(hdfs_client_options) input_file_hdfs = "hdfs://user/buvana/manifest.csv" lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs)) res = lines | "WriteMyFile" >> beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv") p.run() -------------- error message at the Flink Job Manager ------------- 2020-05-27 20:13:10 java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last): File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 667, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 748, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py", line 1435, in <lambda> wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] File "/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1001, in <lambda> lambda _, sink: sink.initialize_write(), self.sink) File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 140, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 163, in initialize_write tmp_dir = self._create_temp_dir(file_path_prefix) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 168, in _create_temp_dir base_path, last_component = FileSystems.split(file_path_prefix) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 147, in split filesystem = FileSystems.get_filesystem(path) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 110, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 115, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set