What is the best way to run unit tests on streams that contain ProcessTimeWindows?
Example: def bufferDataStreamByProcessId(ds: DataStream[MaalkaRecord]): DataStream[MaalkaRecord] = { ds.map { r => println(s"data in: $r") // Data shows up here r }.keyBy { mr => val r = mr.asInstanceOf[MaalkaDataRecord] r.dataProcessingId -> r.isBefore } .window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(10))) .reduce { (_, r2) => println(s"Reducing: r2") // Data does NOT show up here r2 } .map { r => println(s"Emitted: $r") // Data does NOT show up here r } } This stream completes about 100ms after the first element is received by the ProcessingTimeSessionWindow but no data is emitted from the sessionWindow. If i change the window to use a TumblingProcessTimeWindow.of(Time.milliseconds(1)) some of windows do emit, but the many of the expected values are missing. Any ideas? Cheers, Clay