Hi, My Beam pipeline is designed to work with an unbounded source KafkaIO. It roughly looks like below: p.apply(KafkaIO.read() ...) // (A-1) .apply(WithKeys.of(...).withKeyType(...)) .apply(Window.into(FixedWindows.of(...))) .apply(Combine.perKey(...)) // (B) .apply(Window.into(new GlobalWindows())) // to have per-key stats in (C) .apply(ParDo.of(new MyStatefulDoFn())) // (C) Note that (C) has its own state which is expected to be fetched and updated by window results (B) in order of event-time.
Now I'm writing an integration test where (A-1) is replaced by (A-2): > p.apply(TextIO.read().from("test.txt")) // (A-2) "text.txt" contains samples having a single key. I get a wrong result and it turns out that window results didn't feed into (C) in order. Is it because (A-2) makes the pipeline a bounded one? Q1. How to prevent this from happening? Q2. How do you guys usually write an integration test for an unbounded one with stateful function? Best, Dongwon