Kyle, Max, All, I am desperately trying to get Beam working on at least one of the runners of Flink or Spark. Facing failures in both cases with similar message.
Flink runner issue (Beam v 2.19.0) was reported yesterday with a permalink: https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E Also came across this related discussion: https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E I get a similar error message with Spark Runner as I got with the Flink Runner (although its now the newer version of Beam). I paste my environment details, code and the error message below. Code runs fine on Direct Runner. HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave. I hope to make some headway soon. Please help – may be I have to downgrade to a lower version of Beam where this issue did not exist; if so, plmk the version # Thank you, Regards, Buvana Spark Runner scenario: Beam version 2.21.0 on both the client end and the Job server ends. Docker Spark Job Server: https://hub.docker.com/r/apache/beam_spark_job_server docker run --net=host apache/beam_spark_job_server:latest --job-host XXXXXXX --job-port 8099 --spark-master-url spark://YYYYYYYY:7077 Client code: options = PipelineOptions([ "--hdfs_host=ZZZZZZZZZ", "--hdfs_user=hdfs", "--hdfs_port=50070", "--runner=PortableRunner", "--job_endpoint=XXXXXXXXX:8099" ]) p = beam.Pipeline(options=options) input_file_hdfs = "hdfs://user/buvana/manifest" lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs) res = lines | "WriteMyFile" >> beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv") p.run() Error message at the Spark Master UI: worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py", line 1501, in <lambda> wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1005, in <lambda> lambda _, sink: sink.initialize_write(), self.sink) File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 167, in initialize_write tmp_dir = self._create_temp_dir(file_path_prefix) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 172, in _create_temp_dir base_path, last_component = FileSystems.split(file_path_prefix) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 151, in split filesystem = FileSystems.get_filesystem(path) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set [while running 'WriteMyFile/Write/WriteImpl/InitializeWrite']