[ https://issues.apache.org/jira/browse/BEAM-14112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ning updated BEAM-14112: ------------------------ Fix Version/s: 2.38.0 Resolution: Fixed Status: Resolved (was: Triage Needed) > ReadFromBigQuery cannot be used with the interactive runner > ----------------------------------------------------------- > > Key: BEAM-14112 > URL: https://issues.apache.org/jira/browse/BEAM-14112 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp, runner-py-interactive > Affects Versions: 2.35.0, 2.36.0, 2.37.0 > Reporter: Chun Yang > Assignee: Chun Yang > Priority: P2 > Fix For: 2.38.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > A change in Apache Beam 2.35.0 caused ReadFromBigQuery to no longer work with > the Python interactive runner. > The error can be reproduced with the following code: > {code:python}#!/usr/bin/env python > """Reproduce pickle issue when using RFBQ in interactive runner.""" > import apache_beam as beam > > from apache_beam.runners.interactive.interactive_runner import > InteractiveRunner > import apache_beam.runners.interactive.interactive_beam as ib > > > > > > options = beam.options.pipeline_options.PipelineOptions( > > project="...", > temp_location="...", > ) > > > > pipeline = beam.Pipeline(InteractiveRunner(), options=options) > > pcoll = pipeline | beam.io.ReadFromBigQuery(query="SELECT 1") > > print(ib.collect(pcoll)){code} > {code:none}Traceback (most recent call last): > File "apache_beam/runners/common.py", line 1198, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 536, in > apache_beam.runners.common.SimpleInvoker.invoke_process > File "apache_beam/runners/common.py", line 1361, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/worker/operations.py", line 214, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 178, in > apache_beam.runners.worker.operations.ConsumerSet.update_counters_start > File "apache_beam/runners/worker/opcounters.py", line 211, in > apache_beam.runners.worker.opcounters.OperationCounters.update_from > File "apache_beam/runners/worker/opcounters.py", line 250, in > apache_beam.runners.worker.opcounters.OperationCounters.do_sample > File "apache_beam/coders/coder_impl.py", line 1425, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 1436, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 987, in > apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 987, in > apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 207, in > apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 1514, in > apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size > File "apache_beam/coders/coder_impl.py", line 246, in > apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size > File "apache_beam/coders/coder_impl.py", line 441, in > apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream > File "apache_beam/coders/coder_impl.py", line 268, in > apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py", > line 802, in <lambda> > lambda x: dumps(x, protocol), pickle.loads) > TypeError: can't pickle generator objects > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File "repro.py", line 16, in <module> > print(ib.collect(pcoll)) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/utils.py", > line 270, in run_within_progress_indicator > return func(*args, **kwargs) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_beam.py", > line 664, in collect > recording = recording_manager.record([pcoll], max_n=n, > max_duration=duration) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/recording_manager.py", > line 458, in record > self.user_pipeline.options).run() > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/pipeline_fragment.py", > line 113, in run > return self.deduce_fragment().run() > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 573, in run > return self.runner.run_pipeline(self, self._options) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/interactive/interactive_runner.py", > line 195, in run_pipeline > pipeline_to_execute.run(), pipeline_instrument) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 573, in run > return self.runner.run_pipeline(self, self._options) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", > line 131, in run_pipeline > return runner.run_pipeline(pipeline, options) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 200, in run_pipeline > pipeline.to_runner_api(default_environment=self._default_environment)) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 210, in run_via_runner_api > return self.run_stages(stage_context, stages) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 396, in run_stages > runner_execution_context, bundle_context_manager) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 667, in _run_stage > bundle_manager)) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 784, in _run_bundle > data_input, data_output, input_timers, expected_timer_output) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", > line 1094, in process_bundle > result_future = self._worker_handler.control_conn.push(process_bundle_req) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", > line 378, in push > response = self.worker.do_instruction(request) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 581, in do_instruction > getattr(request, request_type), request.instruction_id) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 618, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 996, in process_bundle > element.data) > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 221, in process_encoded > self.output(decoded_value) > File "apache_beam/runners/worker/operations.py", line 346, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 348, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 215, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 707, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/worker/operations.py", line 708, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/common.py", line 1200, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 1265, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > File "apache_beam/runners/common.py", line 1198, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 536, in > apache_beam.runners.common.SimpleInvoker.invoke_process > File "apache_beam/runners/common.py", line 1361, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/worker/operations.py", line 215, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 707, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/worker/operations.py", line 708, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/common.py", line 1200, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 1265, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > File "apache_beam/runners/common.py", line 1198, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 536, in > apache_beam.runners.common.SimpleInvoker.invoke_process > File "apache_beam/runners/common.py", line 1361, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/worker/operations.py", line 215, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 707, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/worker/operations.py", line 708, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/common.py", line 1200, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 1281, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > File "apache_beam/runners/common.py", line 1198, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 536, in > apache_beam.runners.common.SimpleInvoker.invoke_process > File "apache_beam/runners/common.py", line 1361, in > apache_beam.runners.common._OutputProcessor.process_outputs > File "apache_beam/runners/worker/operations.py", line 214, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 178, in > apache_beam.runners.worker.operations.ConsumerSet.update_counters_start > File "apache_beam/runners/worker/opcounters.py", line 211, in > apache_beam.runners.worker.opcounters.OperationCounters.update_from > File "apache_beam/runners/worker/opcounters.py", line 250, in > apache_beam.runners.worker.opcounters.OperationCounters.do_sample > File "apache_beam/coders/coder_impl.py", line 1425, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 1436, in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 987, in > apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 987, in > apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 207, in > apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables > File "apache_beam/coders/coder_impl.py", line 1514, in > apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size > File "apache_beam/coders/coder_impl.py", line 246, in > apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size > File "apache_beam/coders/coder_impl.py", line 441, in > apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream > File "apache_beam/coders/coder_impl.py", line 268, in > apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream > File > "/home/chuck.yang/src/venv/py3-general/lib/python3.7/site-packages/apache_beam/coders/coders.py", > line 802, in <lambda> > lambda x: dumps(x, protocol), pickle.loads) > TypeError: can't pickle generator objects [while running > 'ReadFromBigQuery/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction']{code} > I suspect the error is caused by this change that was first released in > 2.35.0: https://github.com/apache/beam/pull/15610 -- This message was sent by Atlassian Jira (v8.20.1#820001)