Further to that:

At the Flink Job/Task Manager end, I configured/setup the following:
HADOOP_CONF_DIR
HADOOP_USER_NAME
hadoop jars copied under $FLINK_HOME/lib

And made sure a pyflink script is able to read and write into the hdfs system.

Should I setup / configure anything at the Job Server?

I came across this thread, which helped to some extent, but not completely:
https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E

From: "Ramanan, Buvana (Nokia - US/Murray Hill)" 
<buvana.rama...@nokia-bell-labs.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Wednesday, May 27, 2020 at 8:30 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Flink Runner with HDFS

Hello,

I am trying to read from, process and write data to HDFS with beam pipelines in 
python. Using Flink Runner. Beam version 2.19.0. Flink 1.9.2

My initial code (pasted below my sign) to make sure I can read and write, works 
fine on Local Runner. However, I get the following error message (pasted below 
my sign) at the Flink Job manager when I use Portable Runner with Flink Runner 
I invoke Job server as:

docker run --net=host apachebeam/flink1.9_job_server:latest --flink-master 
$IP:8081 --job-host $IP  --job-port 8099

I am supplying the pipeline options in my python code. Yet the error message is 
regarding the missing pipeline_options. And strangely the python command at the 
client side does not return any error message and simply terminates.

Please let me know how I can fix my code and get this running.

Thank you,
Regards,
Buvana

----- Code that reads and writes to hdfs ------
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import HadoopFileSystemOptions
from apache_beam.io.hadoopfilesystem import HadoopFileSystem

options = PipelineOptions([
    "--hdfs_host=XXXXXX.143",
    "--hdfs_user=hdfs",
    "--hdfs_port=50070",
    "--runner=PortableRunner",
    "--job_endpoint=XXXXXXX.134:8099"
])
p = beam.Pipeline(options=options)

HDFS_HOSTNAME = XXXXXX.143'
HDFS_PORT = 50070
hdfs_client_options = HadoopFileSystemOptions(hdfs_host=HDFS_HOSTNAME, 
hdfs_port=HDFS_PORT, hdfs_user='hdfs')
hdfs_client = HadoopFileSystem(hdfs_client_options)

input_file_hdfs = "hdfs://user/buvana/manifest.csv"

lines = p | 'ReadMyFile' >> beam.Create(hdfs_client.open(input_file_hdfs))
res = lines | "WriteMyFile" >> 
beam.io.WriteToText("hdfs://user/buvana/copy_frfr", ".csv")
p.run()

-------------- error message at the Flink Job Manager -------------
2020-05-27 20:13:10
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction 2: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 883, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 667, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 748, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File 
"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
 line 1435, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File 
"/home/tfs/venv_beam_2_19_0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
 line 1001, in <lambda>
    lambda _, sink: sink.initialize_write(), self.sink)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 140, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
163, in initialize_write
    tmp_dir = self._create_temp_dir(file_path_prefix)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
168, in _create_temp_dir
    base_path, last_component = FileSystems.split(file_path_prefix)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 147, in split
    filesystem = FileSystems.get_filesystem(path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 110, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 115, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set

Reply via email to