Hi, I am experiencing an inconsistency between DirectRunner and DataflowRunner while using the side input pattern for FlatMap in streaming pipelines. Consider the logic presented in a unit test below:
``` import unittest from apache_beam import FlatMap, Flatten, Map, ParDo, pvalue, WindowInto from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.test_stream import TestStream from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms import window from apache_beam.transforms.trigger import Repeatedly, AfterCount, AccumulationMode from typing import List, Iterator class Event: def __init__(self, timestamp): self.timestamp = timestamp def __str__(self): return f"Event: {self.timestamp}" class Message: def __init__(self, timestamp): self.timestamp = timestamp def __str__(self): return f"MSG: {self.timestamp}" def match_message_to_events( message: Message, events: List[Event], ) -> Iterator[Event]: print(f"Trying to match message: {message} with events: {[str(event) for event in events]}") matched = False for event in events: if True: matched = True yield event if not matched: return class TestMatch(unittest.TestCase): def test__match_event_with_message(self,): event_created_at = 23 message_create_at = [5, 16] event_input = TimestampedValue( value=Event(timestamp=event_created_at), timestamp=event_created_at ) message_inputs = [ TimestampedValue(value=Message(timestamp=timestamp), timestamp=timestamp) for timestamp in message_create_at ] stream = ( TestStream() .add_elements([message_inputs[0]], tag="Message") .advance_watermark_to(message_inputs[0].timestamp, tag="Message") .add_elements([message_inputs[1]], tag="Message") .advance_watermark_to(message_inputs[1].timestamp, tag="Message") .add_elements([event_input], tag="Event") .advance_watermark_to(event_input.timestamp, tag="Event") .advance_watermark_to_infinity(tag="Message") ) with TestPipeline() as pipeline: input_pcolls = pipeline | "Create" >> stream windowed_events = input_pcolls["Event"] | "Window Events" >> WindowInto( window.SlidingWindows(10, 5), trigger=Repeatedly(AfterCount(1)), accumulation_mode=AccumulationMode.ACCUMULATING, ) windowed_messages = input_pcolls[ "Message" ] | "Window Message Main Events" >> WindowInto( window.SlidingWindows(10, 5), trigger=Repeatedly(AfterCount(1)), accumulation_mode=AccumulationMode.ACCUMULATING, ) output = windowed_messages | "Match Message to previous Events" >> FlatMap( match_message_to_events, pvalue.AsList(windowed_events), ) ``` In general, I have two windowed PCollections used in a streaming pipeline and one of them serves as the side input to FlatMap. The issue I encounter is that while running this code on Dataflow, if the window for the side input PCollection is empty, the FlatMap step is not being executed at all. This causes the element in the main PCollection to get stuck in the pipeline, consequently stopping watermark progression, causing later steps in the pipeline to break if they depend on the watermark (e.g. timers based on the watermark time domain). However, when executed on DirectRunner, the test produces expected output, with empty lists as side input: ``` Trying to match message: MSG: 5 with events: [] Trying to match message: MSG: 5 with events: [] Trying to match message: MSG: 16 with events: ['Event: 23'] Trying to match message: MSG: 16 with events: [] ``` Only when the job on Dataflow is drained, and watermark advanced to infinity, Dataflow passes empty windows to the side input. What is the suggested solution to this issue? Is there a way to force Dataflow to pass empty windows as the side input?