[ 
https://issues.apache.org/jira/browse/BEAM-11158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548941#comment-17548941
 ] 

Danny McCormick commented on BEAM-11158:
----------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/20551

> Side Inputs to beam.Partition
> -----------------------------
>
>                 Key: BEAM-11158
>                 URL: https://issues.apache.org/jira/browse/BEAM-11158
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Joseph Toth
>            Priority: P3
>
> Side inputs work with a regular ParDo and function, but I can't seem to get 
> it to work with beam.Partition. The code and exception below demonstrates the 
> problem.
> ```
> import apache_beam as beam
> def main():
>   class SideFn(beam.PartitionFn):
>     def partition_for(self, element, *args, **kwargs):
>       print(element, args, kwargs)
>   def just_print(element, *args, **kwargs):
>       print(element, args, kwargs)
>   with beam.Pipeline() as p:
>       side = p | 'CreateSide' >> beam.Create(['a'])
>       p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  
> side=beam.pvalue.AsSingleton(side))
>       # p | beam.Create([1, 2, 3]) | beam.ParDo(just_print, 99,  
> side=beam.pvalue.AsSingleton(side))
> if __name__ == '__main__':
>     main()
> ```
> /Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/bin/python 
> /Users/joetoth/projects/joetoth.com/psy/part.py
> Traceback (most recent call last):
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
>  line 134, in <genexpr>
>     (k, next(v_iter)) if isinstance(v, ArgumentPlaceholder) else (k, v) for k,
> StopIteration
> The above exception was the direct cause of the following exception:
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1213, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 742, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 804, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
>  line 133, in insert_values_in_args
>     new_kwargs = dict(
> RuntimeError: generator raised StopIteration
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 19, in <module>
>     main()
>   File "/Users/joetoth/projects/joetoth.com/psy/part.py", line 14, in main
>     p | beam.Create([1, 2, 3]) | beam.Partition(SideFn(), 99,  
> side=beam.pvalue.AsSingleton(side))
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 568, in __exit__
>     self.result = self.run()
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/pipeline.py",
>  line 547, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 119, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 175, in run_pipeline
>     self._latest_run_result = self.run_via_runner_api(
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 186, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 344, in run_stages
>     stage_results = self._run_stage(
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 527, in _run_stage
>     last_result, deferred_inputs, fired_timers = self._run_bundle(
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 571, in _run_bundle
>     result, splits = bundle_manager.process_bundle(
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
>  line 852, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py",
>  line 353, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 483, in do_instruction
>     return getattr(self, request_type)(
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 519, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 984, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 221, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 354, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 356, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 218, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1279, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1213, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 569, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 218, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1279, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1213, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 569, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 218, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1279, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File "apache_beam/runners/common.py", line 1213, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 569, in 
> apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 1374, in 
> apache_beam.runners.common._OutputProcessor.process_outputs
>   File "apache_beam/runners/worker/operations.py", line 218, in 
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 703, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/worker/operations.py", line 704, in 
> apache_beam.runners.worker.operations.DoOperation.process
>   File "apache_beam/runners/common.py", line 1215, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 1294, in 
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/future/utils/__init__.py",
>  line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 1213, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 742, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 804, in 
> apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File 
> "/Users/joetoth/.local/share/virtualenvs/joetoth.com-s5in2Toy/lib/python3.8/site-packages/apache_beam/internal/util.py",
>  line 133, in insert_values_in_args
>     new_kwargs = dict(
> RuntimeError: generator raised StopIteration [while running 
> 'Partition(SideFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
> Process finished with exit code 1



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to