[ https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765184 ]
ASF GitHub Bot logged work on BEAM-14294: ----------------------------------------- Author: ASF GitHub Bot Created on: 02/May/22 22:45 Start Date: 02/May/22 22:45 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on code in PR #17384: URL: https://github.com/apache/beam/pull/17384#discussion_r863241349 ########## sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py: ########## @@ -121,6 +126,148 @@ def test_pardo(self): | beam.Map(lambda e: e + 'x')) assert_that(res, equal_to(['aax', 'bcbcx'])) + def test_batch_pardo(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types( + np.int64) + | beam.ParDo(ArrayMultiplyDoFn()) + | beam.Map(lambda x: x * 3)) + + assert_that(res, equal_to([6, 12, 18])) + + def test_batch_rebatch_pardos(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types( + np.int64) + | beam.ParDo(ArrayMultiplyDoFn()) + | beam.ParDo(ListPlusOneDoFn()) + | beam.Map(lambda x: x * 3)) + + assert_that(res, equal_to([9, 15, 21])) + + def test_batch_pardo_fusion_break(self): + class NormalizeDoFn(beam.DoFn): + @no_type_check + def process_batch( + self, + batch: np.ndarray, + mean: np.float64, + ) -> Iterator[np.ndarray]: + assert isinstance(batch, np.ndarray) + yield batch - mean + + # infer_output_type must be defined (when there's no process method), Review Comment: It does fall back to Any, but in this case I want the element type to be specific since it also represents the element type of the ndarray Issue Time Tracking ------------------- Worklog Id: (was: 765184) Time Spent: 1h 20m (was: 1h 10m) > MVP for SDK worker changes to support process_batch > --------------------------------------------------- > > Key: BEAM-14294 > URL: https://issues.apache.org/jira/browse/BEAM-14294 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Brian Hulette > Assignee: Brian Hulette > Priority: P2 > Time Spent: 1h 20m > Remaining Estimate: 0h > > The initial MVP may only work in some restricted circumstances (e.g. > @yields_element on process_batch, or batch-to-batch without a 1:1 > input:output mapping might not be supported). These cases should fail early. -- This message was sent by Atlassian Jira (v8.20.7#820007)