Hi Max,

I incorporated your suggestion and I still get the same error message.

-Buvana

On 5/28/20, 7:35 AM, "Maximilian Michels" <m...@apache.org> wrote:

    The configuration looks good but the HDFS file system implementation is
    not intended to be used directly.

    Instead of:

    > lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs))

    Use:

    >  lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)


    Best,
    Max

    On 28.05.20 06:06, Ramanan, Buvana (Nokia - US/Murray Hill) wrote:
    > Further to that:
    > 
    >  
    > 
    > At the Flink Job/Task Manager end, I configured/setup the following:
    > 
    > HADOOP_CONF_DIR
    > 
    > HADOOP_USER_NAME
    > 
    > hadoop jars copied under $FLINK_HOME/lib
    > 
    >  
    > 
    > And made sure a pyflink script is able to read and write into the hdfs
    > system.
    > 
    >  
    > 
    > Should I setup / configure anything at the Job Server?
    > 
    >  
    > 
    > I came across this thread, which helped to some extent, but not 
completely:
    > 
    > 
https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E
    > 
    > 
    >  
    > 
    > *From: *"Ramanan, Buvana (Nokia - US/Murray Hill)"
    > <buvana.rama...@nokia-bell-labs.com>
    > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
    > *Date: *Wednesday, May 27, 2020 at 8:30 PM
    > *To: *"user@beam.apache.org" <user@beam.apache.org>
    > *Subject: *Flink Runner with HDFS
    > 
    >  
    > 
    > Hello,
    > 
    >  
    > 
    > I am trying to read from, process and write data to HDFS with beam
    > pipelines in python. Using Flink Runner. Beam version 2.19.0. Flink 1.9.2
    > 
    >  
    > 
    > My initial code (pasted below my sign) to make sure I can read and
    > write, works fine on Local Runner. However, I get the following error
    > message (pasted below my sign) at the Flink Job manager when I use
    > Portable Runner with Flink Runner I invoke Job server as:
    > 
    >  
    > 
    > docker run --net=host apachebeam/flink1.9_job_server:latest
    > --flink-master $IP:8081 --job-host $IP  --job-port 8099
    > 
    >  
    > 
    > I am supplying the pipeline options in my python code. Yet the error
    > message is regarding the missing pipeline_options. And strangely the
    > python command at the client side does not return any error message and
    > simply terminates.  
    > 
    >  
    > 
    > Please let me know how I can fix my code and get this running.
    > 
    >  
    > 
    > Thank you,
    > 
    > Regards,
    > 
    > Buvana
    > 
    >  
    > 
    > ----- Code that reads and writes to hdfs ------
    > 
    > import apache_beam as beam
    > 
    > from apache_beam.options.pipeline_options import PipelineOptions
    > 
    > from apache_beam.options.pipeline_options import HadoopFileSystemOptions
    > 
    > from apache_beam.io.hadoopfilesystem import HadoopFileSystem
    > 
    >  
    > 
    > options = PipelineOptions([
    > 
    >     "--hdfs_host=XXXXXX.143",
    > 
    >     "--hdfs_user=hdfs",
    > 
    >     "--hdfs_port=50070",
    > 
    >     "--runner=PortableRunner",
    > 
    >     "--job_endpoint=XXXXXXX.134:8099"
    > 
    > ])
    > 
    > p = beam.Pipeline(options=options)
    > 
    >  
    > 
    > HDFS_HOSTNAME = XXXXXX.143'
    > 
    > HDFS_PORT = 50070
    > 
    > hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME,
    > hdfs_port=HDFS_PORT, hdfs_user='hdfs')
    > 
    > hdfs_client = HadoopFileSystem(hdfs_client_options)
    > 
    >  
    > 
    > input_file_hdfs = "hdfs://user/buvana/manifest.csv"
    > 
    >  
    > 
    > lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs))
    > 
    > res = lines | "WriteMyFile" >>
    > beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv")
    > 
    > p.run()
    > 
    >  
    > 
    > -------------- error message at the Flink Job Manager -------------
    > 
    > 2020-05-2720:13:10
    > 
    > java.util.concurrent.ExecutionException: java.lang.RuntimeException:
    > Errorreceived fromSDKharness forinstruction 2: Traceback(most recent
    > call last):
    > 
    >   File"apache_beam/runners/common.py", line 883,
    > inapache_beam.runners.common.DoFnRunner.process
    > 
    >   File"apache_beam/runners/common.py", line 667,
    > inapache_beam.runners.common.PerWindowInvoker.invoke_process
    > 
    >   File"apache_beam/runners/common.py", line 748,
    > inapache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    > 
    >  
    > 
File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
    > line 1435, in<lambda>
    > 
    >     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
    > 
    >  
    > 
File"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
    > line 1001, in<lambda>
    > 
    >     lambda _, sink: sink.initialize_write(), self.sink)
    > 
    >  
    > 
File"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
    > line 140, in_f
    > 
    >     returnfnc(self, *args, **kwargs)
    > 
    >  
    > 
File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
    > line 163, ininitialize_write
    > 
    >     tmp_dir = self._create_temp_dir(file_path_prefix)
    > 
    >  
    > 
File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py",
    > line 168, in_create_temp_dir
    > 
    >     base_path, last_component = FileSystems.split(file_path_prefix)
    > 
    >  
    > 
File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
    > line 147, insplit
    > 
    >     filesystem = FileSystems.get_filesystem(path)
    > 
    >  
    > 
File"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
    > line 110, inget_filesystem
    > 
    >     returnsystems[0](pipeline_options=options)
    > 
    >  
    > 
File"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
    > line 115, in__init__
    > 
    >     raise ValueError('pipeline_options is not set')
    > 
    > ValueError: pipeline_options is notset
    > 
    >  
    > 

Reply via email to