Hi,

My Beam pipeline is designed to work with an unbounded source KafkaIO.
It roughly looks like below:
p.apply(KafkaIO.read() ...)                                   // (A-1)
  .apply(WithKeys.of(...).withKeyType(...))
  .apply(Window.into(FixedWindows.of(...)))
  .apply(Combine.perKey(...))                              // (B)
  .apply(Window.into(new GlobalWindows()))     // to have per-key stats in
(C)
  .apply(ParDo.of(new MyStatefulDoFn()))          // (C)
Note that (C) has its own state which is expected to be fetched and updated
by window results (B) in order of event-time.

Now I'm writing an integration test where (A-1) is replaced by (A-2):

> p.apply(TextIO.read().from("test.txt"))                  // (A-2)

"text.txt" contains samples having a single key.

I get a wrong result and it turns out that window results didn't feed into
(C) in order.
Is it because (A-2) makes the pipeline a bounded one?

Q1. How to prevent this from happening?
Q2. How do you guys usually write an integration test for an unbounded one
with stateful function?

Best,

Dongwon

Reply via email to