Hi Max, I incorporated your suggestion and I still get the same error message.
-Buvana On 5/28/20, 7:35 AM, "Maximilian Michels" <m...@apache.org> wrote: The configuration looks good but the HDFS file system implementation is not intended to be used directly. Instead of: > lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs)) Use: > lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs) Best, Max On 28.05.20 06:06, Ramanan, Buvana (Nokia - US/Murray Hill) wrote: > Further to that: > > > > At the Flink Job/Task Manager end, I configured/setup the following: > > HADOOP_CONF_DIR > > HADOOP_USER_NAME > > hadoop jars copied under $FLINK_HOME/lib > > > > And made sure a pyflink script is able to read and write into the hdfs > system. > > > > Should I setup / configure anything at the Job Server? > > > > I came across this thread, which helped to some extent, but not completely: > > https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E > > > > > *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)" > <buvana.rama...@nokia-bell-labs.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Wednesday, May 27, 2020 at 8:30 PM > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *Flink Runner with HDFS > > > > Hello, > > > > I am trying to read from, process and write data to HDFS with beam > pipelines in python. Using Flink Runner. Beam version 2.19.0. Flink 1.9.2 > > > > My initial code (pasted below my sign) to make sure I can read and > write, works fine on Local Runner. However, I get the following error > message (pasted below my sign) at the Flink Job manager when I use > Portable Runner with Flink Runner I invoke Job server as: > > > > docker run --net=host apachebeam/flink1.9_job_server:latest > --flink-master $IP:8081 --job-host $IP --job-port 8099 > > > > I am supplying the pipeline options in my python code. Yet the error > message is regarding the missing pipeline_options. And strangely the > python command at the client side does not return any error message and > simply terminates. > > > > Please let me know how I can fix my code and get this running. > > > > Thank you, > > Regards, > > Buvana > > > > ----- Code that reads and writes to hdfs ------ > > import apache_beam as beam > > from apache_beam.options.pipeline_options import PipelineOptions > > from apache_beam.options.pipeline_options import HadoopFileSystemOptions > > from apache_beam.io.hadoopfilesystem import HadoopFileSystem > > > > options = PipelineOptions([ > > "--hdfs_host=XXXXXX.143", > > "--hdfs_user=hdfs", > > "--hdfs_port=50070", > > "--runner=PortableRunner", > > "--job_endpoint=XXXXXXX.134:8099" > > ]) > > p = beam.Pipeline(options=options) > > > > HDFS_HOSTNAME = XXXXXX.143' > > HDFS_PORT = 50070 > > hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, > hdfs_port=HDFS_PORT, hdfs_user='hdfs') > > hdfs_client = HadoopFileSystem(hdfs_client_options) > > > > input_file_hdfs = "hdfs://user/buvana/manifest.csv" > > > > lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs)) > > res = lines | "WriteMyFile" >> > beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv") > > p.run() > > > > -------------- error message at the Flink Job Manager ------------- > > 2020-05-2720:13:10 > > java.util.concurrent.ExecutionException: java.lang.RuntimeException: > Errorreceived fromSDKharness forinstruction 2: Traceback(most recent > call last): > > File"apache_beam/runners/common.py", line 883, > inapache_beam.runners.common.DoFnRunner.process > > File"apache_beam/runners/common.py", line 667, > inapache_beam.runners.common.PerWindowInvoker.invoke_process > > File"apache_beam/runners/common.py", line 748, > inapache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > > > File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py", > line 1435, in<lambda> > > wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] > > > File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py", > line 1001, in<lambda> > > lambda _, sink: sink.initialize_write(), self.sink) > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > line 140, in_f > > returnfnc(self, *args, **kwargs) > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 163, ininitialize_write > > tmp_dir = self._create_temp_dir(file_path_prefix) > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > line 168, in_create_temp_dir > > base_path, last_component = FileSystems.split(file_path_prefix) > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > line 147, insplit > > filesystem = FileSystems.get_filesystem(path) > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > line 110, inget_filesystem > > returnsystems[0](pipeline_options=options) > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", > line 115, in__init__ > > raise ValueError('pipeline_options is not set') > > ValueError: pipeline_options is notset > > >