Please remove any instantiation of HadoopFileSystem; it is not meant to be used directly. It will be created automatically when you use the 'hdfs://' schema in your path.
It is sufficient to only set the PipelineOptions for HDFS, e.g.: > options = PipelineOptions([ > "--hdfs_host=XXXXXX.143", > "--hdfs_user=hdfs", > "--hdfs_port=50070", > "--runner=PortableRunner", > "--job_endpoint=XXXXXXX.134:8099" > ]) > > input_file_hdfs = "hdfs://..." > output_file_hdfs = "hdfs://...">> with beam.Pipeline(options=options) as p: > (p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs) > |"WriteMyFile" >> beam.io.WriteToText(output_file_hdfs)) -Max On 28.05.20 17:00, Ramanan, Buvana (Nokia - US/Murray Hill) wrote: > Hi Max, > > I incorporated your suggestion and I still get the same error message. > > -Buvana > > On 5/28/20, 7:35 AM, "Maximilian Michels" <m...@apache.org> wrote: > > The configuration looks good but the HDFS file system implementation is > not intended to be used directly. > > Instead of: > > > lines = p | 'ReadMyFile' >> > beam.Create(hdfs_client.open(input_file_hdfs)) > > Use: > > > lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs) > > > Best, > Max > > On 28.05.20 06:06, Ramanan, Buvana (Nokia - US/Murray Hill) wrote: > > Further to that: > > > > > > > > At the Flink Job/Task Manager end, I configured/setup the following: > > > > HADOOP_CONF_DIR > > > > HADOOP_USER_NAME > > > > hadoop jars copied under $FLINK_HOME/lib > > > > > > > > And made sure a pyflink script is able to read and write into the hdfs > > system. > > > > > > > > Should I setup / configure anything at the Job Server? > > > > > > > > I came across this thread, which helped to some extent, but not > completely: > > > > > https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E > > > > > > > > > > *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)" > > <buvana.rama...@nokia-bell-labs.com> > > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > > *Date: *Wednesday, May 27, 2020 at 8:30 PM > > *To: *"user@beam.apache.org" <user@beam.apache.org> > > *Subject: *Flink Runner with HDFS > > > > > > > > Hello, > > > > > > > > I am trying to read from, process and write data to HDFS with beam > > pipelines in python. Using Flink Runner. Beam version 2.19.0. Flink > 1.9.2 > > > > > > > > My initial code (pasted below my sign) to make sure I can read and > > write, works fine on Local Runner. However, I get the following error > > message (pasted below my sign) at the Flink Job manager when I use > > Portable Runner with Flink Runner I invoke Job server as: > > > > > > > > docker run --net=host apachebeam/flink1.9_job_server:latest > > --flink-master $IP:8081 --job-host $IP --job-port 8099 > > > > > > > > I am supplying the pipeline options in my python code. Yet the error > > message is regarding the missing pipeline_options. And strangely the > > python command at the client side does not return any error message and > > simply terminates. > > > > > > > > Please let me know how I can fix my code and get this running. > > > > > > > > Thank you, > > > > Regards, > > > > Buvana > > > > > > > > ----- Code that reads and writes to hdfs ------ > > > > import apache_beam as beam > > > > from apache_beam.options.pipeline_options import PipelineOptions > > > > from apache_beam.options.pipeline_options import HadoopFileSystemOptions > > > > from apache_beam.io.hadoopfilesystem import HadoopFileSystem > > > > > > > > options = PipelineOptions([ > > > > "--hdfs_host=XXXXXX.143", > > > > "--hdfs_user=hdfs", > > > > "--hdfs_port=50070", > > > > "--runner=PortableRunner", > > > > "--job_endpoint=XXXXXXX.134:8099" > > > > ]) > > > > p = beam.Pipeline(options=options) > > > > > > > > HDFS_HOSTNAME = XXXXXX.143' > > > > HDFS_PORT = 50070 > > > > hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, > > hdfs_port=HDFS_PORT, hdfs_user='hdfs') > > > > hdfs_client = HadoopFileSystem(hdfs_client_options) > > > > > > > > input_file_hdfs = "hdfs://user/buvana/manifest.csv" > > > > > > > > lines = p | 'ReadMyFile' >> > beam.Create(hdfs_client.open(input_file_hdfs)) > > > > res = lines | "WriteMyFile" >> > > beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv") > > > > p.run() > > > > > > > > -------------- error message at the Flink Job Manager ------------- > > > > 2020-05-2720:13:10 > > > > java.util.concurrent.ExecutionException: java.lang.RuntimeException: > > Errorreceived fromSDKharness forinstruction 2: Traceback(most recent > > call last): > > > > File"apache_beam/runners/common.py", line 883, > > inapache_beam.runners.common.DoFnRunner.process > > > > File"apache_beam/runners/common.py", line 667, > > inapache_beam.runners.common.PerWindowInvoker.invoke_process > > > > File"apache_beam/runners/common.py", line 748, > > inapache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > > > > > > > File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py", > > line 1435, in<lambda> > > > > wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] > > > > > > > File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py", > > line 1001, in<lambda> > > > > lambda _, sink: sink.initialize_write(), self.sink) > > > > > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", > > line 140, in_f > > > > returnfnc(self, *args, **kwargs) > > > > > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > > line 163, ininitialize_write > > > > tmp_dir = self._create_temp_dir(file_path_prefix) > > > > > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", > > line 168, in_create_temp_dir > > > > base_path, last_component = FileSystems.split(file_path_prefix) > > > > > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > > line 147, insplit > > > > filesystem = FileSystems.get_filesystem(path) > > > > > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", > > line 110, inget_filesystem > > > > returnsystems[0](pipeline_options=options) > > > > > > > File"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", > > line 115, in__init__ > > > > raise ValueError('pipeline_options is not set') > > > > ValueError: pipeline_options is notset > > > > > > >