[ https://issues.apache.org/jira/browse/BEAM-14294?focusedWorklogId=765209&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765209 ]
ASF GitHub Bot logged work on BEAM-14294: ----------------------------------------- Author: ASF GitHub Bot Created on: 02/May/22 23:39 Start Date: 02/May/22 23:39 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on code in PR #17384: URL: https://github.com/apache/beam/pull/17384#discussion_r863260992 ########## sdks/python/apache_beam/utils/windowed_value.py: ########## @@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN): return wv +class BatchingMode(Enum): + CONCRETE = 1 + HOMOGENEOUS = 2 + + +class WindowedBatch(object): + """A batch of N windowed values, each having a value, a timestamp and set of + windows.""" + def with_values(self, new_values): + # type: (Any) -> WindowedBatch + + """Creates a new WindowedBatch with the same timestamps and windows as this. + + This is the fasted way to create a new WindowedValue. + """ + raise NotImplementedError + + def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]: + raise NotImplementedError + + @staticmethod + def from_windowed_values( + windowed_values: Sequence[WindowedValue], + *, + produce_fn: Callable, + mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']: + if mode == BatchingMode.HOMOGENEOUS: + import collections + grouped = collections.defaultdict(lambda: []) + for wv in windowed_values: + grouped[(wv.timestamp, tuple(wv.windows), + wv.pane_info)].append(wv.value) + + for key, values in grouped.items(): + timestamp, windows, pane_info = key + yield HomogeneousWindowedBatch.of( + produce_fn(values), timestamp, windows, pane_info) + elif mode == BatchingMode.CONCRETE: + yield ConcreteWindowedBatch( + produce_fn([wv.value for wv in windowed_values]), + [wv.timestamp + for wv in windowed_values], [wv.windows for wv in windowed_values], + [wv.pane_info for wv in windowed_values]) + else: + raise AssertionError( + "Unrecognized BatchingMode in " + f"WindowedBatch.from_windowed_values: {mode!r}") + + +class HomogeneousWindowedBatch(WindowedBatch): + """A WindowedBatch with Homogeneous event-time information, represented + internally as a WindowedValue. + """ + def __init__(self, wv): + self._wv = wv + + @staticmethod + def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN): + return HomogeneousWindowedBatch( + WindowedValue(values, timestamp, windows, pane_info)) + + @property + def values(self): + return self._wv.value + + @property + def timestamp(self): Review Comment: These are used in `PerWindowInvoker._invoke_process_batch_per_window` to populate DoFn params Issue Time Tracking ------------------- Worklog Id: (was: 765209) Time Spent: 2h 10m (was: 2h) > MVP for SDK worker changes to support process_batch > --------------------------------------------------- > > Key: BEAM-14294 > URL: https://issues.apache.org/jira/browse/BEAM-14294 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Brian Hulette > Assignee: Brian Hulette > Priority: P2 > Time Spent: 2h 10m > Remaining Estimate: 0h > > The initial MVP may only work in some restricted circumstances (e.g. > @yields_element on process_batch, or batch-to-batch without a 1:1 > input:output mapping might not be supported). These cases should fail early. -- This message was sent by Atlassian Jira (v8.20.7#820007)