[ 
https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765184
 ]

ASF GitHub Bot logged work on BEAM-14294:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/22 22:45
            Start Date: 02/May/22 22:45
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863241349


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:
##########
@@ -121,6 +126,148 @@ def test_pardo(self):
           | beam.Map(lambda e: e + 'x'))
       assert_that(res, equal_to(['aax', 'bcbcx']))
 
+  def test_batch_pardo(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([6, 12, 18]))
+
+  def test_batch_rebatch_pardos(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.ParDo(ListPlusOneDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([9, 15, 21]))
+
+  def test_batch_pardo_fusion_break(self):
+    class NormalizeDoFn(beam.DoFn):
+      @no_type_check
+      def process_batch(
+          self,
+          batch: np.ndarray,
+          mean: np.float64,
+      ) -> Iterator[np.ndarray]:
+        assert isinstance(batch, np.ndarray)
+        yield batch - mean
+
+      # infer_output_type must be defined (when there's no process method),

Review Comment:
   It does fall back to Any, but in this case I want the element type to be 
specific since it also represents the element type of the ndarray 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 765184)
    Time Spent: 1h 20m  (was: 1h 10m)

> MVP for SDK worker changes to support process_batch
> ---------------------------------------------------
>
>                 Key: BEAM-14294
>                 URL: https://issues.apache.org/jira/browse/BEAM-14294
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Brian Hulette
>            Priority: P2
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The initial MVP may only work in some restricted circumstances (e.g. 
> @yields_element on process_batch, or batch-to-batch without a 1:1 
> input:output mapping might not be supported). These cases should fail early. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to