Hello Kyle,

That works. Produces the expected output.

-Buvana

________________________________
From: Kyle Weaver <kcwea...@google.com>
Sent: Thursday, May 28, 2020 9:19 PM
To: user@beam.apache.org <user@beam.apache.org>
Subject: Re: HDFS I/O with Beam on Spark and Flink runners - consistent Error 
messages

Hi Buvana,

I suspect this is a bug. If you can try running your pipeline again with these 
changes:

1. Remove `--spark-master-url spark://YYYYYYYY:7077` from your Docker run 
command.
2. Add `--environment_type=LOOPBACK` to your pipeline options.

It will help us confirm the cause of the issue.

On Thu, May 28, 2020 at 7:12 PM Ramanan, Buvana (Nokia - US/Murray Hill) 
<buvana.rama...@nokia-bell-labs.com<mailto:buvana.rama...@nokia-bell-labs.com>> 
wrote:

Kyle, Max, All,



I am desperately trying to get Beam working on at least one of the runners of 
Flink or Spark. Facing failures in both cases with similar message.



Flink runner issue (Beam v 2.19.0) was reported yesterday with a permalink: 
https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E



Also came across this related discussion:

https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E



I get a similar error message with Spark Runner as I got with the Flink Runner 
(although its now the newer version of Beam). I paste my environment details, 
code and the error message below. Code runs fine on Direct Runner.



HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave.



I hope to make some headway soon. Please help – may be I have to downgrade to a 
lower version of Beam where this issue did not exist; if so, plmk the version #



Thank you,

Regards,

Buvana



Spark Runner scenario:



Beam version 2.21.0 on both the client end and the Job server ends.



Docker Spark Job Server:

https://hub.docker.com/r/apache/beam_spark_job_server

docker run --net=host apache/beam_spark_job_server:latest --job-host XXXXXXX 
--job-port 8099 --spark-master-url spark://YYYYYYYY:7077



Client code:



options = PipelineOptions([

    "--hdfs_host=ZZZZZZZZZ",

    "--hdfs_user=hdfs",

    "--hdfs_port=50070",

    "--runner=PortableRunner",

    "--job_endpoint=XXXXXXXXX:8099"

])

p = beam.Pipeline(options=options)

input_file_hdfs = "hdfs://user/buvana/manifest"

lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)

res = lines | "WriteMyFile" >> 
beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv")

p.run()



Error message at the Spark Master UI:



worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive

  File "apache_beam/runners/worker/operations.py", line 670, in 
apache_beam.runners.worker.operations.DoOperation.process

  File "apache_beam/runners/worker/operations.py", line 671, in 
apache_beam.runners.worker.operations.DoOperation.process

  File "apache_beam/runners/common.py", line 963, in 
apache_beam.runners.common.DoFnRunner.process

  File "apache_beam/runners/common.py", line 1045, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented

  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 
421, in raise_with_traceback

    raise exc.with_traceback(traceback)

  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process

  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process

  File "apache_beam/runners/common.py", line 814, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window

  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
 line 1501, in <lambda>

    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]

  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
 line 1005, in <lambda>

    lambda _, sink: sink.initialize_write(), self.sink)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f

    return fnc(self, *args, **kwargs)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
167, in initialize_write

    tmp_dir = self._create_temp_dir(file_path_prefix)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
172, in _create_temp_dir

    base_path, last_component = FileSystems.split(file_path_prefix)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 151, in split

    filesystem = FileSystems.get_filesystem(path)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem

    return systems[0](pipeline_options=options)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__

    raise ValueError('pipeline_options is not set')

ValueError: pipeline_options is not set [while running 
'WriteMyFile/Write/WriteImpl/InitializeWrite']

Reply via email to