Hi, I have added the code below to the start of processElement2 in CoProcessFunction. It prints timestamps and watermarks for the first 3 elements for each new watermark. Shouldn't the timestamp always be lower than the next watermark? The 3 timestamps before the last watermark are all larger than the watermark time
The output I get is wm -9223372036854775808 ts 1478815851242 ts 1478816075096 ts 1478816114186 wm 1478822353934 ts 1478835814359 ts 1478835083219 ts 1478836126621 wm 1478827220420 ts 1478836408336 ts 1478836469247 ts 1478836759959 if (getRuntimeContext.getIndexOfThisSubtask == 0 ) { if (context.timerService().currentWatermark() != printedWatermark ) { printedWatermark = context.timerService().currentWatermark() println ("wm " + printedWatermark ) n = 0 } else { n += 1 } if (n < 3 ) { println ("ts " + context.timestamp()) } }