Please remove any instantiation of HadoopFileSystem; it is not meant to
be used directly. It will be created automatically when you use the
'hdfs://' schema in your path.

It is sufficient to only set the PipelineOptions for HDFS, e.g.:

> options = PipelineOptions([
>     "--hdfs_host=XXXXXX.143",
>     "--hdfs_user=hdfs",
>     "--hdfs_port=50070",
>     "--runner=PortableRunner",
>     "--job_endpoint=XXXXXXX.134:8099"
> ])
> > input_file_hdfs = "hdfs://..."
> output_file_hdfs = "hdfs://...">> with beam.Pipeline(options=options) as p:
>   (p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)
>      |"WriteMyFile" >> beam.io.WriteToText(output_file_hdfs))


-Max

On 28.05.20 17:00, Ramanan, Buvana (Nokia - US/Murray Hill) wrote:
> Hi Max,
> 
> I incorporated your suggestion and I still get the same error message.
> 
> -Buvana
> 
> On 5/28/20, 7:35 AM, "Maximilian Michels" <m...@apache.org> wrote:
> 
>     The configuration looks good but the HDFS file system implementation is
>     not intended to be used directly.
> 
>     Instead of:
> 
>     > lines = p | 'ReadMyFile' >> 
> beam.Create(hdfs_client.open(input_file_hdfs))
> 
>     Use:
> 
>     >  lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)
> 
> 
>     Best,
>     Max
> 
>     On 28.05.20 06:06, Ramanan, Buvana (Nokia - US/Murray Hill) wrote:
>     > Further to that:
>     > 
>     >  
>     > 
>     > At the Flink Job/Task Manager end, I configured/setup the following:
>     > 
>     > HADOOP_CONF_DIR
>     > 
>     > HADOOP_USER_NAME
>     > 
>     > hadoop jars copied under $FLINK_HOME/lib
>     > 
>     >  
>     > 
>     > And made sure a pyflink script is able to read and write into the hdfs
>     > system.
>     > 
>     >  
>     > 
>     > Should I setup / configure anything at the Job Server?
>     > 
>     >  
>     > 
>     > I came across this thread, which helped to some extent, but not 
> completely:
>     > 
>     > 
> https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E
>     > 
>     > 
>     >  
>     > 
>     > *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)"
>     > <buvana.rama...@nokia-bell-labs.com>
>     > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
>     > *Date: *Wednesday, May 27, 2020 at 8:30 PM
>     > *To: *"user@beam.apache.org" <user@beam.apache.org>
>     > *Subject: *Flink Runner with HDFS
>     > 
>     >  
>     > 
>     > Hello,
>     > 
>     >  
>     > 
>     > I am trying to read from, process and write data to HDFS with beam
>     > pipelines in python. Using Flink Runner. Beam version 2.19.0. Flink 
> 1.9.2
>     > 
>     >  
>     > 
>     > My initial code (pasted below my sign) to make sure I can read and
>     > write, works fine on Local Runner. However, I get the following error
>     > message (pasted below my sign) at the Flink Job manager when I use
>     > Portable Runner with Flink Runner I invoke Job server as:
>     > 
>     >  
>     > 
>     > docker run --net=host apachebeam/flink1.9_job_server:latest
>     > --flink-master $IP:8081 --job-host $IP  --job-port 8099
>     > 
>     >  
>     > 
>     > I am supplying the pipeline options in my python code. Yet the error
>     > message is regarding the missing pipeline_options. And strangely the
>     > python command at the client side does not return any error message and
>     > simply terminates.  
>     > 
>     >  
>     > 
>     > Please let me know how I can fix my code and get this running.
>     > 
>     >  
>     > 
>     > Thank you,
>     > 
>     > Regards,
>     > 
>     > Buvana
>     > 
>     >  
>     > 
>     > ----- Code that reads and writes to hdfs ------
>     > 
>     > import apache_beam as beam
>     > 
>     > from apache_beam.options.pipeline_options import PipelineOptions
>     > 
>     > from apache_beam.options.pipeline_options import HadoopFileSystemOptions
>     > 
>     > from apache_beam.io.hadoopfilesystem import HadoopFileSystem
>     > 
>     >  
>     > 
>     > options = PipelineOptions([
>     > 
>     >     "--hdfs_host=XXXXXX.143",
>     > 
>     >     "--hdfs_user=hdfs",
>     > 
>     >     "--hdfs_port=50070",
>     > 
>     >     "--runner=PortableRunner",
>     > 
>     >     "--job_endpoint=XXXXXXX.134:8099"
>     > 
>     > ])
>     > 
>     > p = beam.Pipeline(options=options)
>     > 
>     >  
>     > 
>     > HDFS_HOSTNAME = XXXXXX.143'
>     > 
>     > HDFS_PORT = 50070
>     > 
>     > hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME,
>     > hdfs_port=HDFS_PORT, hdfs_user='hdfs')
>     > 
>     > hdfs_client = HadoopFileSystem(hdfs_client_options)
>     > 
>     >  
>     > 
>     > input_file_hdfs = "hdfs://user/buvana/manifest.csv"
>     > 
>     >  
>     > 
>     > lines = p | 'ReadMyFile' >> 
> beam.Create(hdfs_client.open(input_file_hdfs))
>     > 
>     > res = lines | "WriteMyFile" >>
>     > beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv")
>     > 
>     > p.run()
>     > 
>     >  
>     > 
>     > -------------- error message at the Flink Job Manager -------------
>     > 
>     > 2020-05-2720:13:10
>     > 
>     > java.util.concurrent.ExecutionException: java.lang.RuntimeException:
>     > Errorreceived fromSDKharness forinstruction 2: Traceback(most recent
>     > call last):
>     > 
>     >   File"apache_beam/runners/common.py", line 883,
>     > inapache_beam.runners.common.DoFnRunner.process
>     > 
>     >   File"apache_beam/runners/common.py", line 667,
>     > inapache_beam.runners.common.PerWindowInvoker.invoke_process
>     > 
>     >   File"apache_beam/runners/common.py", line 748,
>     > inapache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>     > 
>     >  
>     > 
> File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
>     > line 1435, in<lambda>
>     > 
>     >     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
>     > 
>     >  
>     > 
> File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
>     > line 1001, in<lambda>
>     > 
>     >     lambda _, sink: sink.initialize_write(), self.sink)
>     > 
>     >  
>     > 
> File"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
>     > line 140, in_f
>     > 
>     >     returnfnc(self, *args, **kwargs)
>     > 
>     >  
>     > 
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
>     > line 163, ininitialize_write
>     > 
>     >     tmp_dir = self._create_temp_dir(file_path_prefix)
>     > 
>     >  
>     > 
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
>     > line 168, in_create_temp_dir
>     > 
>     >     base_path, last_component = FileSystems.split(file_path_prefix)
>     > 
>     >  
>     > 
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
>     > line 147, insplit
>     > 
>     >     filesystem = FileSystems.get_filesystem(path)
>     > 
>     >  
>     > 
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
>     > line 110, inget_filesystem
>     > 
>     >     returnsystems[0](pipeline_options=options)
>     > 
>     >  
>     > 
> File"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
>     > line 115, in__init__
>     > 
>     >     raise ValueError('pipeline_options is not set')
>     > 
>     > ValueError: pipeline_options is notset
>     > 
>     >  
>     > 
> 

Reply via email to