Jiaxin Shan created BEAM-10813:
----------------------------------
Summary: TypeError: Expected bytes, got list in
apache_beam.coders.coder_impl.BytesCoderImpl.encode_to_stream
Key: BEAM-10813
URL: https://issues.apache.org/jira/browse/BEAM-10813
Project: Beam
Issue Type: Bug
Components: runner-flink, sdk-py-harness
Environment: Flink 1.10.1
Beam worker pool: apache/beam_python3.6_sdk:2.22.0
SDK: apache beam 2.22.0
Python 3.6
Reporter: Jiaxin Shan
I am trying to run
[https://github.com/tensorflow/tfx/tree/master/tfx/examples/chicago_taxi_pipeline]
using Flink Runner on Kubernetes.
I resolved a few issues and finally make beam-worker-pool to pick up the tasks.
Then, I get follow error messages
{code:java}
// code placeholder
Traceback (most recent call last):
File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
line 545, in <lambda>
target=lambda: self._read_inputs(elements_iterator),
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py",
line 528, in _read_inputs
for elements in elements_iterator:
File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 416, in
__next__
return self._next()
File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 689, in
_next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that
terminated with:
status = StatusCode.UNAVAILABLE
details = "DNS resolution failed for service: "
debug_error_string =
"{"created":"@1598390105.406081334","description":"Resolver transient
failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":213,"referenced_errors":[{"created":"@1598390105.406079944","description":"DNS
resolution failed for service:
","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":377,"grpc_status":14,"referenced_errors":[{"created":"@1598390105.406076335","description":"unparseable
host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":410,"target_address":""}]}]}"
>Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 553, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1122, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 194, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 164, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 210, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 230, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1187, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1198, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1000, in
apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 192, in
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 165, in
apache_beam.coders.coder_impl.CoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 488, in
apache_beam.coders.coder_impl.BytesCoderImpl.encode_to_stream
TypeError: Expected bytes, got listDuring handling of the above exception,
another exception occurred:Traceback (most recent call last):
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 245, in _execute
response = task()
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 506, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 972, in process_bundle
element.data)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 218, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 755, in
apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 764, in
apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 971, in
apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 711, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 807, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1122, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line
446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 553, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1122, in
apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 194, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 164, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 210, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 230, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1187, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1198, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1000, in
apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 192, in
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 165, in
apache_beam.coders.coder_impl.CoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 488, in
apache_beam.coders.coder_impl.BytesCoderImpl.encode_to_stream
TypeError: Expected bytes, got list [while running
'InputToSerializedExample/InputSourceToExample/ParseCSVLine']2020/08/25
21:15:10 Python exited: <nil>
{code}
This code can run using DirectRunner without any problems..Not sure why it
doesn't work on Flink. TFX Job Information
{code:java}
- --pipeline_name - parameterized_tfx_oss - --pipeline_root -
'{{inputs.parameters.pipeline-root}}' - --kubeflow_metadata_config -
|- { "grpc_config": { "grpc_service_host": {
"environment_variable": "METADATA_GRPC_SERVICE_SERVICE_HOST" },
"grpc_service_port": { "environment_variable":
"METADATA_GRPC_SERVICE_SERVICE_PORT" } } } -
--beam_pipeline_args - '["--runner=FlinkRunner",
"--flink_master=beam-flink-cluster-jobmanager:8081",
"--flink_submit_uber_jar", "--environment_type=EXTERNAL",
"--environment_config=localhost:50000"]' - --additional_pipeline_args
- '{}' - --component_launcher_class_path -
tfx.orchestration.launcher.in_process_component_launcher.InProcessComponentLauncher
- --serialized_component - '{"__class__": "NodeWrapper",
"__module__": "tfx.orchestration.kubeflow.node_wrapper",
"__tfx_object_type__": "jsonable", "_exec_properties": {"custom_config": null,
"input_config": "{\n \"splits\": [\n {\n \"name\":
\"single_split\",\n \"pattern\": \"*\"\n }\n ]\n}",
"output_config": "{\n \"split_config\": {\n \"splits\": [\n {\n
\"hash_buckets\": 2,\n \"name\": \"train\"\n },\n {\n
\"hash_buckets\": 1,\n \"name\": \"eval\"\n }\n ]\n
}\n}"}, "_id": "CsvExampleGen", "_inputs": {"__class__":
"_PropertyDictWrapper", "__module__": "tfx.types.node_common",
"__tfx_object_type__": "jsonable", "_compat_aliases": {"input_base": "input"},
"_data": {"input": {"__class__": "Channel", "__module__":
"tfx.types.channel", "__tfx_object_type__": "jsonable", "artifacts":
[{"__artifact_class_module__": "tfx.types.standard_artifacts",
"__artifact_class_name__": "ExternalArtifact", "artifact": {"uri":
"{{inputs.parameters.data-root}}"}, "artifact_type": {"name":
"ExternalArtifact"}}], "output_key": null, "producer_component_id": null,
"type": {"name": "ExternalArtifact"}}}}, "_outputs": {"__class__":
"_PropertyDictWrapper", "__module__": "tfx.types.node_common",
"__tfx_object_type__": "jsonable", "_compat_aliases": {}, "_data":
{"examples": {"__class__": "Channel", "__module__": "tfx.types.channel",
"__tfx_object_type__": "jsonable", "artifacts": [{"__artifact_class_module__":
"tfx.types.standard_artifacts", "__artifact_class_name__": "Examples",
"artifact": {"custom_properties": {"name": {"string_value": "examples"},
"pipeline_name": {"string_value": "parameterized_tfx_oss"},
"producer_component": {"string_value": "CsvExampleGen"}}, "properties":
{"split_names": {"string_value": "[\"train\", \"eval\"]"}}},
"artifact_type": {"name": "Examples", "properties": {"span": "INT",
"split_names": "STRING"}}}], "output_key": "examples", "producer_component_id":
"CsvExampleGen", "type": {"name": "Examples", "properties": {"span":
"INT", "split_names": "STRING"}}}}}, "_type":
"tfx.components.example_gen.csv_example_gen.component.CsvExampleGen",
"driver_class": {"__class__": "Driver", "__module__":
"tfx.components.example_gen.driver", "__tfx_object_type__": "class"},
"executor_spec": {"__class__": "ExecutorClassSpec", "__module__":
"tfx.components.base.executor_spec", "__tfx_object_type__": "jsonable",
"executor_class": {"__class__": "Executor", "__module__":
"tfx.components.example_gen.csv_example_gen.executor",
"__tfx_object_type__": "class"}}}' - --component_config - 'null'
- --enable_cache command: [python,
/tfx-src/tfx/orchestration/kubeflow/container_entrypoint.py]
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)