Kyle, Max, All,

I am desperately trying to get Beam working on at least one of the runners of 
Flink or Spark. Facing failures in both cases with similar message.

Flink runner issue (Beam v 2.19.0) was reported yesterday with a permalink: 
https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E

Also came across this related discussion:
https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E

I get a similar error message with Spark Runner as I got with the Flink Runner 
(although its now the newer version of Beam). I paste my environment details, 
code and the error message below. Code runs fine on Direct Runner.

HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave.

I hope to make some headway soon. Please help – may be I have to downgrade to a 
lower version of Beam where this issue did not exist; if so, plmk the version #

Thank you,
Regards,
Buvana

Spark Runner scenario:

Beam version 2.21.0 on both the client end and the Job server ends.

Docker Spark Job Server:
https://hub.docker.com/r/apache/beam_spark_job_server
docker run --net=host apache/beam_spark_job_server:latest --job-host XXXXXXX 
--job-port 8099 --spark-master-url spark://YYYYYYYY:7077

Client code:

options = PipelineOptions([
    "--hdfs_host=ZZZZZZZZZ",
    "--hdfs_user=hdfs",
    "--hdfs_port=50070",
    "--runner=PortableRunner",
    "--job_endpoint=XXXXXXXXX:8099"
])
p = beam.Pipeline(options=options)
input_file_hdfs = "hdfs://user/buvana/manifest"
lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)
res = lines | "WriteMyFile" >> 
beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv")
p.run()

Error message at the Spark Master UI:

worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 670, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 671, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 963, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1045, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 
421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 814, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
 line 1501, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
 line 1005, in <lambda>
    lambda _, sink: sink.initialize_write(), self.sink)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
167, in initialize_write
    tmp_dir = self._create_temp_dir(file_path_prefix)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
172, in _create_temp_dir
    base_path, last_component = FileSystems.split(file_path_prefix)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 151, in split
    filesystem = FileSystems.get_filesystem(path)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running 
'WriteMyFile/Write/WriteImpl/InitializeWrite']

Reply via email to