Hi Kyle, As reported earlier, LOOPBACK with Portable Runner/Job Server works fine.
Further to that, I tried PortableRunner with additional options as follows: "--runner=PortableRunner", "--job_endpoint=embed", "--environment_config=apache/beam_python3.6_sdk" And I get an error message (see attachment) similar to what I get with Spark and Flink Runners where clusters are external. thanks, Buvana ________________________________ From: Ramanan, Buvana (Nokia - US/Murray Hill) <buvana.rama...@nokia-bell-labs.com> Sent: Thursday, May 28, 2020 11:47 PM To: user@beam.apache.org <user@beam.apache.org> Subject: Re: HDFS I/O with Beam on Spark and Flink runners - consistent Error messages Hello Kyle, That works. Produces the expected output. -Buvana ________________________________ From: Kyle Weaver <kcwea...@google.com> Sent: Thursday, May 28, 2020 9:19 PM To: user@beam.apache.org <user@beam.apache.org> Subject: Re: HDFS I/O with Beam on Spark and Flink runners - consistent Error messages Hi Buvana, I suspect this is a bug. If you can try running your pipeline again with these changes: 1. Remove `--spark-master-url spark://YYYYYYYY:7077` from your Docker run command. 2. Add `--environment_type=LOOPBACK` to your pipeline options. It will help us confirm the cause of the issue. On Thu, May 28, 2020 at 7:12 PM Ramanan, Buvana (Nokia - US/Murray Hill) <buvana.rama...@nokia-bell-labs.com<mailto:buvana.rama...@nokia-bell-labs.com>> wrote: Kyle, Max, All, I am desperately trying to get Beam working on at least one of the runners of Flink or Spark. Facing failures in both cases with similar message. Flink runner issue (Beam v 2.19.0) was reported yesterday with a permalink: https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E Also came across this related discussion: https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E I get a similar error message with Spark Runner as I got with the Flink Runner (although its now the newer version of Beam). I paste my environment details, code and the error message below. Code runs fine on Direct Runner. HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave. I hope to make some headway soon. Please help – may be I have to downgrade to a lower version of Beam where this issue did not exist; if so, plmk the version # Thank you, Regards, Buvana Spark Runner scenario: Beam version 2.21.0 on both the client end and the Job server ends. Docker Spark Job Server: https://hub.docker.com/r/apache/beam_spark_job_server docker run --net=host apache/beam_spark_job_server:latest --job-host XXXXXXX --job-port 8099 --spark-master-url spark://YYYYYYYY:7077 Client code: options = PipelineOptions([ "--hdfs_host=ZZZZZZZZZ", "--hdfs_user=hdfs", "--hdfs_port=50070", "--runner=PortableRunner", "--job_endpoint=XXXXXXXXX:8099" ]) p = beam.Pipeline(options=options) input_file_hdfs = "hdfs://user/buvana/manifest" lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs) res = lines | "WriteMyFile" >> beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv") p.run() Error message at the Spark Master UI: worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py", line 1501, in <lambda> wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1005, in <lambda> lambda _, sink: sink.initialize_write(), self.sink) File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 167, in initialize_write tmp_dir = self._create_temp_dir(file_path_prefix) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 172, in _create_temp_dir base_path, last_component = FileSystems.split(file_path_prefix) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 151, in split filesystem = FileSystems.get_filesystem(path) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set [while running 'WriteMyFile/Write/WriteImpl/InitializeWrite']
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.6 interpreter. Using default tag: latest latest: Pulling from apache/beam_python3.6_sdk Digest: sha256:48bd82920212ce2acea17d142048aa1c667f47c82b35c04b134df4638d7b8926 Status: Image is up to date for apache/beam_python3.6_sdk:latest docker.io/apache/beam_python3.6_sdk:latest ERROR:root:('info', pipeline_options { fields { key: "beam:option:artifact_port:v1" value { string_value: "0" } } fields { key: "beam:option:dataflow_endpoint:v1" value { string_value: "https://dataflow.googleapis.com" } } fields { key: "beam:option:direct_num_workers:v1" value { string_value: "1" } } fields { key: "beam:option:direct_runner_bundle_repeat:v1" value { string_value: "0" } } fields { key: "beam:option:direct_runner_use_stacked_bundle:v1" value { bool_value: true } } fields { key: "beam:option:direct_running_mode:v1" value { string_value: "in_memory" } } fields { key: "beam:option:dry_run:v1" value { bool_value: false } } fields { key: "beam:option:enable_streaming_engine:v1" value { bool_value: false } } fields { key: "beam:option:environment_cache_millis:v1" value { string_value: "0" } } fields { key: "beam:option:environment_config:v1" value { string_value: "apache/beam_python3.6_sdk" } } fields { key: "beam:option:expansion_port:v1" value { string_value: "0" } } fields { key: "beam:option:experiments:v1" value { list_value { values { string_value: "beam_fn_api" } } } } fields { key: "beam:option:flink_master:v1" value { string_value: "[auto]" } } fields { key: "beam:option:flink_submit_uber_jar:v1" value { bool_value: false } } fields { key: "beam:option:flink_version:v1" value { string_value: "1.10" } } fields { key: "beam:option:hdfs_full_urls:v1" value { bool_value: false } } fields { key: "beam:option:hdfs_host:v1" value { string_value: "ZZZZZZZZZZ" } } fields { key: "beam:option:hdfs_port:v1" value { string_value: "50070" } } fields { key: "beam:option:hdfs_user:v1" value { string_value: "hdfs" } } fields { key: "beam:option:job_endpoint:v1" value { string_value: "embed" } } fields { key: "beam:option:job_port:v1" value { string_value: "0" } } fields { key: "beam:option:job_server_timeout:v1" value { string_value: "60" } } fields { key: "beam:option:no_auth:v1" value { bool_value: false } } fields { key: "beam:option:pipeline_type_check:v1" value { bool_value: true } } fields { key: "beam:option:profile_cpu:v1" value { bool_value: false } } fields { key: "beam:option:profile_memory:v1" value { bool_value: false } } fields { key: "beam:option:profile_sample_rate:v1" value { number_value: 1.0 } } fields { key: "beam:option:runtime_type_check:v1" value { bool_value: false } } fields { key: "beam:option:save_main_session:v1" value { bool_value: false } } fields { key: "beam:option:sdk_location:v1" value { string_value: "container" } } fields { key: "beam:option:sdk_worker_parallelism:v1" value { string_value: "1" } } fields { key: "beam:option:spark_master_url:v1" value { string_value: "local[4]" } } fields { key: "beam:option:spark_submit_uber_jar:v1" value { bool_value: false } } fields { key: "beam:option:streaming:v1" value { bool_value: false } } fields { key: "beam:option:type_check_strictness:v1" value { string_value: "DEFAULT_TO_ANY" } } fields { key: "beam:option:update:v1" value { bool_value: false } } } retrieval_token: "/tmp/tmpzjfxi5nc/b6aaf7812dbef9ccaef8f57e980b40b1005b6dc63f2966bd74d25ab66f9a6b37/MANIFEST" , 'context', <grpc._server._Context object at 0x7fb61f745d30>) ERROR:root:('info', pipeline_options { fields { key: "beam:option:artifact_port:v1" value { string_value: "0" } } fields { key: "beam:option:dataflow_endpoint:v1" value { string_value: "https://dataflow.googleapis.com" } } fields { key: "beam:option:direct_num_workers:v1" value { string_value: "1" } } fields { key: "beam:option:direct_runner_bundle_repeat:v1" value { string_value: "0" } } fields { key: "beam:option:direct_runner_use_stacked_bundle:v1" value { bool_value: true } } fields { key: "beam:option:direct_running_mode:v1" value { string_value: "in_memory" } } fields { key: "beam:option:dry_run:v1" value { bool_value: false } } fields { key: "beam:option:enable_streaming_engine:v1" value { bool_value: false } } fields { key: "beam:option:environment_cache_millis:v1" value { string_value: "0" } } fields { key: "beam:option:environment_config:v1" value { string_value: "apache/beam_python3.6_sdk" } } fields { key: "beam:option:expansion_port:v1" value { string_value: "0" } } fields { key: "beam:option:experiments:v1" value { list_value { values { string_value: "beam_fn_api" } } } } fields { key: "beam:option:flink_master:v1" value { string_value: "[auto]" } } fields { key: "beam:option:flink_submit_uber_jar:v1" value { bool_value: false } } fields { key: "beam:option:flink_version:v1" value { string_value: "1.10" } } fields { key: "beam:option:hdfs_full_urls:v1" value { bool_value: false } } fields { key: "beam:option:hdfs_host:v1" value { string_value: "ZZZZZZZZZZ" } } fields { key: "beam:option:hdfs_port:v1" value { string_value: "50070" } } fields { key: "beam:option:hdfs_user:v1" value { string_value: "hdfs" } } fields { key: "beam:option:job_endpoint:v1" value { string_value: "embed" } } fields { key: "beam:option:job_port:v1" value { string_value: "0" } } fields { key: "beam:option:job_server_timeout:v1" value { string_value: "60" } } fields { key: "beam:option:no_auth:v1" value { bool_value: false } } fields { key: "beam:option:pipeline_type_check:v1" value { bool_value: true } } fields { key: "beam:option:profile_cpu:v1" value { bool_value: false } } fields { key: "beam:option:profile_memory:v1" value { bool_value: false } } fields { key: "beam:option:profile_sample_rate:v1" value { number_value: 1.0 } } fields { key: "beam:option:runtime_type_check:v1" value { bool_value: false } } fields { key: "beam:option:save_main_session:v1" value { bool_value: false } } fields { key: "beam:option:sdk_location:v1" value { string_value: "container" } } fields { key: "beam:option:sdk_worker_parallelism:v1" value { string_value: "1" } } fields { key: "beam:option:spark_master_url:v1" value { string_value: "local[4]" } } fields { key: "beam:option:spark_submit_uber_jar:v1" value { bool_value: false } } fields { key: "beam:option:streaming:v1" value { bool_value: false } } fields { key: "beam:option:type_check_strictness:v1" value { string_value: "DEFAULT_TO_ANY" } } fields { key: "beam:option:update:v1" value { bool_value: false } } } retrieval_token: "/tmp/tmpzjfxi5nc/b6aaf7812dbef9ccaef8f57e980b40b1005b6dc63f2966bd74d25ab66f9a6b37/MANIFEST" logging_endpoint { url: "localhost:40674" } artifact_endpoint { url: "localhost:39934" } control_endpoint { url: "localhost:39934" } , 'worker_id', 'worker_0') WARNING:root:severity: WARN timestamp { seconds: 1590724880 nanos: 840057611 } message: "No session file found: /tmp/staged/pickled_main_session. Functions defined in __main__ (interactive session) may fail." log_location: "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py:240" thread: "MainThread" WARNING:root:severity: WARN timestamp { seconds: 1590724880 nanos: 844359397 } message: "Discarding unparseable args: [\'--direct_runner_use_stacked_bundle\', \'--job_server_timeout=60\', \'--pipeline_type_check\']" log_location: "/usr/local/lib/python3.6/site-packages/apache_beam/options/pipeline_options.py:309" thread: "MainThread" ERROR:root:severity: ERROR timestamp { seconds: 1590724880 nanos: 958515882 } message: "Error processing instruction bundle_1. Original traceback is\nTraceback (most recent call last):\n File \"apache_beam/runners/common.py\", line 961, in apache_beam.runners.common.DoFnRunner.process\n File \"apache_beam/runners/common.py\", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process\n File \"apache_beam/runners/common.py\", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n File \"apache_beam/runners/common.py\", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\", line 1395, in process\n element)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py\", line 1448, in initial_restriction\n range_tracker = self._source.get_range_tracker(None, None)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", line 210, in get_range_tracker\n return self._get_concat_source().get_range_tracker(\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py\", line 135, in _f\n return fnc(self, *args, **kwargs)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", line 145, in _get_concat_source\n match_result = FileSystems.match([pattern])[0]\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 203, in match\n filesystem = FileSystems.get_filesystem(patterns[0])\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 113, in get_filesystem\n return systems[0](pipeline_options=options)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py\", line 114, in __init__\n raise ValueError(\'pipeline_options is not set\')\nValueError: pipeline_options is not set\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 245, in _execute\n response = task()\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 302, in <lambda>\n lambda: self.create_worker().do_instruction(request), request)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 471, in do_instruction\n getattr(request, request_type), request.instruction_id)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 506, in process_bundle\n bundle_processor.process_bundle(instruction_id))\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\", line 972, in process_bundle\n element.data)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\", line 218, in process_encoded\n self.output(decoded_value)\n File \"apache_beam/runners/worker/operations.py\", line 330, in apache_beam.runners.worker.operations.Operation.output\n File \"apache_beam/runners/worker/operations.py\", line 332, in apache_beam.runners.worker.operations.Operation.output\n File \"apache_beam/runners/worker/operations.py\", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive\n File \"apache_beam/runners/worker/operations.py\", line 670, in apache_beam.runners.worker.operations.DoOperation.process\n File \"apache_beam/runners/worker/operations.py\", line 671, in apache_beam.runners.worker.operations.DoOperation.process\n File \"apache_beam/runners/common.py\", line 963, in apache_beam.runners.common.DoFnRunner.process\n File \"apache_beam/runners/common.py\", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented\n File \"/usr/local/lib/python3.6/site-packages/future/utils/__init__.py\", line 421, in raise_with_traceback\n raise exc.with_traceback(traceback)\n File \"apache_beam/runners/common.py\", line 961, in apache_beam.runners.common.DoFnRunner.process\n File \"apache_beam/runners/common.py\", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process\n File \"apache_beam/runners/common.py\", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n File \"apache_beam/runners/common.py\", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\", line 1395, in process\n element)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py\", line 1448, in initial_restriction\n range_tracker = self._source.get_range_tracker(None, None)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", line 210, in get_range_tracker\n return self._get_concat_source().get_range_tracker(\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py\", line 135, in _f\n return fnc(self, *args, **kwargs)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", line 145, in _get_concat_source\n match_result = FileSystems.match([pattern])[0]\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 203, in match\n filesystem = FileSystems.get_filesystem(patterns[0])\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 113, in get_filesystem\n return systems[0](pipeline_options=options)\n File \"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py\", line 114, in __init__\n raise ValueError(\'pipeline_options is not set\')\nValueError: pipeline_options is not set [while running \'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction\']\n\n" instruction_id: "bundle_1" log_location: "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py:252" thread: "Thread-13" 0f9c70c36c0a24caaa8fd6f4fd60e33168c25e63a79d4deff94712399c248266 ERROR:apache_beam.runners.portability.local_job_service:Error running pipeline. Traceback (most recent call last): File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py", line 280, in _run_job self._pipeline_proto) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 189, in run_via_runner_api return self.run_stages(stage_context, stages) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 335, in run_stages bundle_context_manager, File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 545, in _run_stage expected_timer_output) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1051, in process_bundle for result, split_result in executor.map(execute, part_inputs): File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in result_iterator yield fs.pop().result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result return self.__get_result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1048, in execute part_map, expected_outputs, fired_timers, expected_output_timers) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 986, in process_bundle raise RuntimeError(result.error) RuntimeError: Traceback (most recent call last): File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1395, in process element) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1448, in initial_restriction range_tracker = self._source.get_range_tracker(None, None) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 210, in get_range_tracker return self._get_concat_source().get_range_tracker( File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 145, in _get_concat_source match_result = FileSystems.match([pattern])[0] File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 203, in match filesystem = FileSystems.get_filesystem(patterns[0]) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task() File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1395, in process element) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1448, in initial_restriction range_tracker = self._source.get_range_tracker(None, None) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 210, in get_range_tracker return self._get_concat_source().get_range_tracker( File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 145, in _get_concat_source match_result = FileSystems.match([pattern])[0] File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 203, in match filesystem = FileSystems.get_filesystem(patterns[0]) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set [while running 'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction'] ERROR:apache_beam.runners.portability.local_job_service:<module 'traceback' from '/usr/lib64/python3.6/traceback.py'> Traceback (most recent call last): File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py", line 280, in _run_job self._pipeline_proto) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 189, in run_via_runner_api return self.run_stages(stage_context, stages) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 335, in run_stages bundle_context_manager, File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 545, in _run_stage expected_timer_output) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1051, in process_bundle for result, split_result in executor.map(execute, part_inputs): File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in result_iterator yield fs.pop().result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result return self.__get_result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1048, in execute part_map, expected_outputs, fired_timers, expected_output_timers) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 986, in process_bundle raise RuntimeError(result.error) RuntimeError: Traceback (most recent call last): File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1395, in process element) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1448, in initial_restriction range_tracker = self._source.get_range_tracker(None, None) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 210, in get_range_tracker return self._get_concat_source().get_range_tracker( File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 145, in _get_concat_source match_result = FileSystems.match([pattern])[0] File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 203, in match filesystem = FileSystems.get_filesystem(patterns[0]) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task() File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1395, in process element) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1448, in initial_restriction range_tracker = self._source.get_range_tracker(None, None) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 210, in get_range_tracker return self._get_concat_source().get_range_tracker( File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 145, in _get_concat_source match_result = FileSystems.match([pattern])[0] File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 203, in match filesystem = FileSystems.get_filesystem(patterns[0]) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set [while running 'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction'] Exception in thread Thread-1: Traceback (most recent call last): File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/usr/lib64/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py", line 280, in _run_job self._pipeline_proto) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 189, in run_via_runner_api return self.run_stages(stage_context, stages) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 335, in run_stages bundle_context_manager, File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 545, in _run_stage expected_timer_output) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1051, in process_bundle for result, split_result in executor.map(execute, part_inputs): File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in result_iterator yield fs.pop().result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result return self.__get_result() File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs)) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1048, in execute part_map, expected_outputs, fired_timers, expected_output_timers) File "/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 986, in process_bundle raise RuntimeError(result.error) RuntimeError: Traceback (most recent call last): File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1395, in process element) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1448, in initial_restriction range_tracker = self._source.get_range_tracker(None, None) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 210, in get_range_tracker return self._get_concat_source().get_range_tracker( File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 145, in _get_concat_source match_result = FileSystems.match([pattern])[0] File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 203, in match filesystem = FileSystems.get_filesystem(patterns[0]) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task() File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data) File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 812, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1395, in process element) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 1448, in initial_restriction range_tracker = self._source.get_range_tracker(None, None) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 210, in get_range_tracker return self._get_concat_source().get_range_tracker( File "/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", line 135, in _f return fnc(self, *args, **kwargs) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", line 145, in _get_concat_source match_result = FileSystems.match([pattern])[0] File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 203, in match filesystem = FileSystems.get_filesystem(patterns[0]) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", line 113, in get_filesystem return systems[0](pipeline_options=options) File "/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", line 114, in __init__ raise ValueError('pipeline_options is not set') ValueError: pipeline_options is not set [while running 'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']