Hi Kyle,

As reported earlier, LOOPBACK with Portable Runner/Job Server works fine.

Further to that, I tried PortableRunner with additional options as follows:

    "--runner=PortableRunner",
    "--job_endpoint=embed",
    "--environment_config=apache/beam_python3.6_sdk"

And I get an error message (see attachment) similar to what I get with Spark 
and Flink Runners where clusters are external.

thanks,
Buvana

________________________________
From: Ramanan, Buvana (Nokia - US/Murray Hill) 
<buvana.rama...@nokia-bell-labs.com>
Sent: Thursday, May 28, 2020 11:47 PM
To: user@beam.apache.org <user@beam.apache.org>
Subject: Re: HDFS I/O with Beam on Spark and Flink runners - consistent Error 
messages

Hello Kyle,

That works. Produces the expected output.

-Buvana

________________________________
From: Kyle Weaver <kcwea...@google.com>
Sent: Thursday, May 28, 2020 9:19 PM
To: user@beam.apache.org <user@beam.apache.org>
Subject: Re: HDFS I/O with Beam on Spark and Flink runners - consistent Error 
messages

Hi Buvana,

I suspect this is a bug. If you can try running your pipeline again with these 
changes:

1. Remove `--spark-master-url spark://YYYYYYYY:7077` from your Docker run 
command.
2. Add `--environment_type=LOOPBACK` to your pipeline options.

It will help us confirm the cause of the issue.

On Thu, May 28, 2020 at 7:12 PM Ramanan, Buvana (Nokia - US/Murray Hill) 
<buvana.rama...@nokia-bell-labs.com<mailto:buvana.rama...@nokia-bell-labs.com>> 
wrote:

Kyle, Max, All,



I am desperately trying to get Beam working on at least one of the runners of 
Flink or Spark. Facing failures in both cases with similar message.



Flink runner issue (Beam v 2.19.0) was reported yesterday with a permalink: 
https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E



Also came across this related discussion:

https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E



I get a similar error message with Spark Runner as I got with the Flink Runner 
(although its now the newer version of Beam). I paste my environment details, 
code and the error message below. Code runs fine on Direct Runner.



HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave.



I hope to make some headway soon. Please help – may be I have to downgrade to a 
lower version of Beam where this issue did not exist; if so, plmk the version #



Thank you,

Regards,

Buvana



Spark Runner scenario:



Beam version 2.21.0 on both the client end and the Job server ends.



Docker Spark Job Server:

https://hub.docker.com/r/apache/beam_spark_job_server

docker run --net=host apache/beam_spark_job_server:latest --job-host XXXXXXX 
--job-port 8099 --spark-master-url spark://YYYYYYYY:7077



Client code:



options = PipelineOptions([

    "--hdfs_host=ZZZZZZZZZ",

    "--hdfs_user=hdfs",

    "--hdfs_port=50070",

    "--runner=PortableRunner",

    "--job_endpoint=XXXXXXXXX:8099"

])

p = beam.Pipeline(options=options)

input_file_hdfs = "hdfs://user/buvana/manifest"

lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)

res = lines | "WriteMyFile" >> 
beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv")

p.run()



Error message at the Spark Master UI:



worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive

  File "apache_beam/runners/worker/operations.py", line 670, in 
apache_beam.runners.worker.operations.DoOperation.process

  File "apache_beam/runners/worker/operations.py", line 671, in 
apache_beam.runners.worker.operations.DoOperation.process

  File "apache_beam/runners/common.py", line 963, in 
apache_beam.runners.common.DoFnRunner.process

  File "apache_beam/runners/common.py", line 1045, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented

  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 
421, in raise_with_traceback

    raise exc.with_traceback(traceback)

  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process

  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process

  File "apache_beam/runners/common.py", line 814, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window

  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
 line 1501, in <lambda>

    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]

  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
 line 1005, in <lambda>

    lambda _, sink: sink.initialize_write(), self.sink)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f

    return fnc(self, *args, **kwargs)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
167, in initialize_write

    tmp_dir = self._create_temp_dir(file_path_prefix)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line 
172, in _create_temp_dir

    base_path, last_component = FileSystems.split(file_path_prefix)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 151, in split

    filesystem = FileSystems.get_filesystem(path)

  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem

    return systems[0](pipeline_options=options)

  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__

    raise ValueError('pipeline_options is not set')

ValueError: pipeline_options is not set [while running 
'WriteMyFile/Write/WriteImpl/InitializeWrite']
WARNING:root:Make sure that locally built Python SDK docker image has Python 
3.6 interpreter.
Using default tag: latest
latest: Pulling from apache/beam_python3.6_sdk
Digest: sha256:48bd82920212ce2acea17d142048aa1c667f47c82b35c04b134df4638d7b8926
Status: Image is up to date for apache/beam_python3.6_sdk:latest
docker.io/apache/beam_python3.6_sdk:latest
ERROR:root:('info', pipeline_options {
  fields {
    key: "beam:option:artifact_port:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:dataflow_endpoint:v1"
    value {
      string_value: "https://dataflow.googleapis.com";
    }
  }
  fields {
    key: "beam:option:direct_num_workers:v1"
    value {
      string_value: "1"
    }
  }
  fields {
    key: "beam:option:direct_runner_bundle_repeat:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:direct_runner_use_stacked_bundle:v1"
    value {
      bool_value: true
    }
  }
  fields {
    key: "beam:option:direct_running_mode:v1"
    value {
      string_value: "in_memory"
    }
  }
  fields {
    key: "beam:option:dry_run:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:enable_streaming_engine:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:environment_cache_millis:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:environment_config:v1"
    value {
      string_value: "apache/beam_python3.6_sdk"
    }
  }
  fields {
    key: "beam:option:expansion_port:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:experiments:v1"
    value {
      list_value {
        values {
          string_value: "beam_fn_api"
        }
      }
    }
  }
  fields {
    key: "beam:option:flink_master:v1"
    value {
      string_value: "[auto]"
    }
  }
  fields {
    key: "beam:option:flink_submit_uber_jar:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:flink_version:v1"
    value {
      string_value: "1.10"
    }
  }
  fields {
    key: "beam:option:hdfs_full_urls:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:hdfs_host:v1"
    value {
      string_value: "ZZZZZZZZZZ"
    }
  }
  fields {
    key: "beam:option:hdfs_port:v1"
    value {
      string_value: "50070"
    }
  }
  fields {
    key: "beam:option:hdfs_user:v1"
    value {
      string_value: "hdfs"
    }
  }
  fields {
    key: "beam:option:job_endpoint:v1"
    value {
      string_value: "embed"
    }
  }
  fields {
    key: "beam:option:job_port:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:job_server_timeout:v1"
    value {
      string_value: "60"
    }
  }
  fields {
    key: "beam:option:no_auth:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:pipeline_type_check:v1"
    value {
      bool_value: true
    }
  }
  fields {
    key: "beam:option:profile_cpu:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:profile_memory:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:profile_sample_rate:v1"
    value {
      number_value: 1.0
    }
  }
  fields {
    key: "beam:option:runtime_type_check:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:save_main_session:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:sdk_location:v1"
    value {
      string_value: "container"
    }
  }
  fields {
    key: "beam:option:sdk_worker_parallelism:v1"
    value {
      string_value: "1"
    }
  }
  fields {
    key: "beam:option:spark_master_url:v1"
    value {
      string_value: "local[4]"
    }
  }
  fields {
    key: "beam:option:spark_submit_uber_jar:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:streaming:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:type_check_strictness:v1"
    value {
      string_value: "DEFAULT_TO_ANY"
    }
  }
  fields {
    key: "beam:option:update:v1"
    value {
      bool_value: false
    }
  }
}
retrieval_token: 
"/tmp/tmpzjfxi5nc/b6aaf7812dbef9ccaef8f57e980b40b1005b6dc63f2966bd74d25ab66f9a6b37/MANIFEST"
, 'context', <grpc._server._Context object at 0x7fb61f745d30>)
ERROR:root:('info', pipeline_options {
  fields {
    key: "beam:option:artifact_port:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:dataflow_endpoint:v1"
    value {
      string_value: "https://dataflow.googleapis.com";
    }
  }
  fields {
    key: "beam:option:direct_num_workers:v1"
    value {
      string_value: "1"
    }
  }
  fields {
    key: "beam:option:direct_runner_bundle_repeat:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:direct_runner_use_stacked_bundle:v1"
    value {
      bool_value: true
    }
  }
  fields {
    key: "beam:option:direct_running_mode:v1"
    value {
      string_value: "in_memory"
    }
  }
  fields {
    key: "beam:option:dry_run:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:enable_streaming_engine:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:environment_cache_millis:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:environment_config:v1"
    value {
      string_value: "apache/beam_python3.6_sdk"
    }
  }
  fields {
    key: "beam:option:expansion_port:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:experiments:v1"
    value {
      list_value {
        values {
          string_value: "beam_fn_api"
        }
      }
    }
  }
  fields {
    key: "beam:option:flink_master:v1"
    value {
      string_value: "[auto]"
    }
  }
  fields {
    key: "beam:option:flink_submit_uber_jar:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:flink_version:v1"
    value {
      string_value: "1.10"
    }
  }
  fields {
    key: "beam:option:hdfs_full_urls:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:hdfs_host:v1"
    value {
      string_value: "ZZZZZZZZZZ"
    }
  }
  fields {
    key: "beam:option:hdfs_port:v1"
    value {
      string_value: "50070"
    }
  }
  fields {
    key: "beam:option:hdfs_user:v1"
    value {
      string_value: "hdfs"
    }
  }
  fields {
    key: "beam:option:job_endpoint:v1"
    value {
      string_value: "embed"
    }
  }
  fields {
    key: "beam:option:job_port:v1"
    value {
      string_value: "0"
    }
  }
  fields {
    key: "beam:option:job_server_timeout:v1"
    value {
      string_value: "60"
    }
  }
  fields {
    key: "beam:option:no_auth:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:pipeline_type_check:v1"
    value {
      bool_value: true
    }
  }
  fields {
    key: "beam:option:profile_cpu:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:profile_memory:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:profile_sample_rate:v1"
    value {
      number_value: 1.0
    }
  }
  fields {
    key: "beam:option:runtime_type_check:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:save_main_session:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:sdk_location:v1"
    value {
      string_value: "container"
    }
  }
  fields {
    key: "beam:option:sdk_worker_parallelism:v1"
    value {
      string_value: "1"
    }
  }
  fields {
    key: "beam:option:spark_master_url:v1"
    value {
      string_value: "local[4]"
    }
  }
  fields {
    key: "beam:option:spark_submit_uber_jar:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:streaming:v1"
    value {
      bool_value: false
    }
  }
  fields {
    key: "beam:option:type_check_strictness:v1"
    value {
      string_value: "DEFAULT_TO_ANY"
    }
  }
  fields {
    key: "beam:option:update:v1"
    value {
      bool_value: false
    }
  }
}
retrieval_token: 
"/tmp/tmpzjfxi5nc/b6aaf7812dbef9ccaef8f57e980b40b1005b6dc63f2966bd74d25ab66f9a6b37/MANIFEST"
logging_endpoint {
  url: "localhost:40674"
}
artifact_endpoint {
  url: "localhost:39934"
}
control_endpoint {
  url: "localhost:39934"
}
, 'worker_id', 'worker_0')
WARNING:root:severity: WARN
timestamp {
  seconds: 1590724880
  nanos: 840057611
}
message: "No session file found: /tmp/staged/pickled_main_session. Functions 
defined in __main__ (interactive session) may fail."
log_location: 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py:240"
thread: "MainThread"

WARNING:root:severity: WARN
timestamp {
  seconds: 1590724880
  nanos: 844359397
}
message: "Discarding unparseable args: [\'--direct_runner_use_stacked_bundle\', 
\'--job_server_timeout=60\', \'--pipeline_type_check\']"
log_location: 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/pipeline_options.py:309"
thread: "MainThread"

ERROR:root:severity: ERROR
timestamp {
  seconds: 1590724880
  nanos: 958515882
}
message: "Error processing instruction bundle_1. Original traceback 
is\nTraceback (most recent call last):\n  File 
\"apache_beam/runners/common.py\", line 961, in 
apache_beam.runners.common.DoFnRunner.process\n  File 
\"apache_beam/runners/common.py\", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process\n  File 
\"apache_beam/runners/common.py\", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n  File 
\"apache_beam/runners/common.py\", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
 line 1395, in process\n    element)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py\", line 1448, 
in initial_restriction\n    range_tracker = 
self._source.get_range_tracker(None, None)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", 
line 210, in get_range_tracker\n    return 
self._get_concat_source().get_range_tracker(\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py\",
 line 135, in _f\n    return fnc(self, *args, **kwargs)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", 
line 145, in _get_concat_source\n    match_result = 
FileSystems.match([pattern])[0]\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 
203, in match\n    filesystem = FileSystems.get_filesystem(patterns[0])\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 
113, in get_filesystem\n    return systems[0](pipeline_options=options)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py\", 
line 114, in __init__\n    raise ValueError(\'pipeline_options is not 
set\')\nValueError: pipeline_options is not set\n\nDuring handling of the above 
exception, another exception occurred:\n\nTraceback (most recent call last):\n  
File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
 line 245, in _execute\n    response = task()\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
 line 302, in <lambda>\n    lambda: 
self.create_worker().do_instruction(request), request)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
 line 471, in do_instruction\n    getattr(request, request_type), 
request.instruction_id)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
 line 506, in process_bundle\n    
bundle_processor.process_bundle(instruction_id))\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
 line 972, in process_bundle\n    element.data)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
 line 218, in process_encoded\n    self.output(decoded_value)\n  File 
\"apache_beam/runners/worker/operations.py\", line 330, in 
apache_beam.runners.worker.operations.Operation.output\n  File 
\"apache_beam/runners/worker/operations.py\", line 332, in 
apache_beam.runners.worker.operations.Operation.output\n  File 
\"apache_beam/runners/worker/operations.py\", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive\n  File 
\"apache_beam/runners/worker/operations.py\", line 670, in 
apache_beam.runners.worker.operations.DoOperation.process\n  File 
\"apache_beam/runners/worker/operations.py\", line 671, in 
apache_beam.runners.worker.operations.DoOperation.process\n  File 
\"apache_beam/runners/common.py\", line 963, in 
apache_beam.runners.common.DoFnRunner.process\n  File 
\"apache_beam/runners/common.py\", line 1045, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented\n  File 
\"/usr/local/lib/python3.6/site-packages/future/utils/__init__.py\", line 421, 
in raise_with_traceback\n    raise exc.with_traceback(traceback)\n  File 
\"apache_beam/runners/common.py\", line 961, in 
apache_beam.runners.common.DoFnRunner.process\n  File 
\"apache_beam/runners/common.py\", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process\n  File 
\"apache_beam/runners/common.py\", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n  File 
\"apache_beam/runners/common.py\", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
 line 1395, in process\n    element)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py\", line 1448, 
in initial_restriction\n    range_tracker = 
self._source.get_range_tracker(None, None)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", 
line 210, in get_range_tracker\n    return 
self._get_concat_source().get_range_tracker(\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py\",
 line 135, in _f\n    return fnc(self, *args, **kwargs)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\", 
line 145, in _get_concat_source\n    match_result = 
FileSystems.match([pattern])[0]\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 
203, in match\n    filesystem = FileSystems.get_filesystem(patterns[0])\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line 
113, in get_filesystem\n    return systems[0](pipeline_options=options)\n  File 
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py\", 
line 114, in __init__\n    raise ValueError(\'pipeline_options is not 
set\')\nValueError: pipeline_options is not set [while running 
\'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction\']\n\n"
instruction_id: "bundle_1"
log_location: 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py:252"
thread: "Thread-13"

0f9c70c36c0a24caaa8fd6f4fd60e33168c25e63a79d4deff94712399c248266
ERROR:apache_beam.runners.portability.local_job_service:Error running pipeline.
Traceback (most recent call last):
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py",
 line 280, in _run_job
    self._pipeline_proto)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 189, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 335, in run_stages
    bundle_context_manager,
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 545, in _run_stage
    expected_timer_output)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1051, in process_bundle
    for result, split_result in executor.map(execute, part_inputs):
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in 
result_iterator
    yield fs.pop().result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in 
__get_result
    raise self._exception
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 44, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1048, in execute
    part_map, expected_outputs, fired_timers, expected_output_timers)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 986, in process_bundle
    raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1395, in process
    element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 
1448, in initial_restriction
    range_tracker = self._source.get_range_tracker(None, None)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 210, in get_range_tracker
    return self._get_concat_source().get_range_tracker(
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 145, in _get_concat_source
    match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 203, in match
    filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 245, in _execute
    response = task()
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 302, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 471, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 506, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 972, in process_bundle
    element.data)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 670, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 671, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 963, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1045, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 
421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1395, in process
    element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 
1448, in initial_restriction
    range_tracker = self._source.get_range_tracker(None, None)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 210, in get_range_tracker
    return self._get_concat_source().get_range_tracker(
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 145, in _get_concat_source
    match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 203, in match
    filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running 
'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']

ERROR:apache_beam.runners.portability.local_job_service:<module 'traceback' 
from '/usr/lib64/python3.6/traceback.py'>
Traceback (most recent call last):
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py",
 line 280, in _run_job
    self._pipeline_proto)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 189, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 335, in run_stages
    bundle_context_manager,
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 545, in _run_stage
    expected_timer_output)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1051, in process_bundle
    for result, split_result in executor.map(execute, part_inputs):
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in 
result_iterator
    yield fs.pop().result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in 
__get_result
    raise self._exception
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 44, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1048, in execute
    part_map, expected_outputs, fired_timers, expected_output_timers)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 986, in process_bundle
    raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1395, in process
    element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 
1448, in initial_restriction
    range_tracker = self._source.get_range_tracker(None, None)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 210, in get_range_tracker
    return self._get_concat_source().get_range_tracker(
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 145, in _get_concat_source
    match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 203, in match
    filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 245, in _execute
    response = task()
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 302, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 471, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 506, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 972, in process_bundle
    element.data)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 670, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 671, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 963, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1045, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 
421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1395, in process
    element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 
1448, in initial_restriction
    range_tracker = self._source.get_range_tracker(None, None)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 210, in get_range_tracker
    return self._get_concat_source().get_range_tracker(
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 145, in _get_concat_source
    match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 203, in match
    filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running 
'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py",
 line 280, in _run_job
    self._pipeline_proto)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 189, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 335, in run_stages
    bundle_context_manager,
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 545, in _run_stage
    expected_timer_output)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1051, in process_bundle
    for result, split_result in executor.map(execute, part_inputs):
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in 
result_iterator
    yield fs.pop().result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in 
__get_result
    raise self._exception
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
 line 44, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 1048, in execute
    part_map, expected_outputs, fired_timers, expected_output_timers)
  File 
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
 line 986, in process_bundle
    raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1395, in process
    element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 
1448, in initial_restriction
    range_tracker = self._source.get_range_tracker(None, None)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 210, in get_range_tracker
    return self._get_concat_source().get_range_tracker(
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 145, in _get_concat_source
    match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 203, in match
    filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 245, in _execute
    response = task()
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 302, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 471, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 506, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 972, in process_bundle
    element.data)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 670, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 671, in 
apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 963, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1045, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 
421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 961, in 
apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 726, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 812, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1095, in 
apache_beam.runners.common._OutputProcessor.process_outputs
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1395, in process
    element)
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line 
1448, in initial_restriction
    range_tracker = self._source.get_range_tracker(None, None)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 210, in get_range_tracker
    return self._get_concat_source().get_range_tracker(
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py", 
line 135, in _f
    return fnc(self, *args, **kwargs)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py", 
line 145, in _get_concat_source
    match_result = FileSystems.match([pattern])[0]
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 203, in match
    filesystem = FileSystems.get_filesystem(patterns[0])
  File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py", 
line 113, in get_filesystem
    return systems[0](pipeline_options=options)
  File 
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py", 
line 114, in __init__
    raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running 
'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']


Reply via email to