Dear Community, I'm really struggling on a co-grouped stream. The workload is the following:
* val firstStream: DataStream[FirstType] = firstRaw.assignTimestampsAndWatermarks(new MyCustomFirstExtractor(maxOutOfOrder)) val secondStream: DataStream[SecondType] = secondRaw .assignTimestampsAndWatermarks(new MyCustomSecondExtractor(maxOutOfOrder)) .map(new toSecondsStreamMapper()) * where both the Extractors extend BoundedOutOfOrdernessTimestampExtractor by overriding the extractTimestamp method and assigning timestamps owned respectively by FirstType and SecondType objects. *override def extractTimestamp(first: FirstType): Long = first.timestamp* Then I'm calling cogroup as follows * val stockDetails = firstStream .coGroup(secondStream) .where(_.id) .equalTo(_.id) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .apply(new MyCogroupFunction()) .uid("myCogroup") .name("My CoGroup") * The problem is the CoGroup function is never triggered. I did several tests and I was not able to solve it at all. The first relevant point is that event time can be seriously out-of-order. I can even bump into 0 timestamp. Then I faked also timestamps in order to distribute them in a set of two seconds, five seconds, so forth. These tries didn't change at all the behavior: no one window is raised. Another relevant is: I'm running locally by reading from a pre-loaded kafka topic, then all the events are ridden sequentially at startup. I will give a couple example Workload 1 (faked timestamps) fields (id, timestamp) FirstType(9781783433803 ,1490280129517) FirstType(9781783433803 ,1490280129517) FirstType(9781783433803 ,1490280131191) FirstType(9781783433803 ,1490280131191) FirstType(9781783433803 ,1490280131214) FirstType(9781783433803 ,1490280131214) FirstType(9781783433803 ,1490280131250) FirstType(9781783433803 ,1490280131250) FirstType(9781783433803 ,1490280131294) FirstType(9781783433803 ,1490280131294) FirstType(9781783433803 ,1490280131328) FirstType(9781783433803 ,1490280131328) SecondType(9781783433803,1490280130465) SecondType(9781783433803,1490280131027) SecondType(9781783433803,1490280131051) SecondType(9781783433803,1490280131070) SecondType(9781783433803,1490280131085) SecondType(9781783433803,1490280131103) SecondType(9781783433803,1490280131124) SecondType(9781783433803,1490280131143) SecondType(9781783433803,1490280131158) SecondType(9781783433803,1490280131175) Workload 2 (real case timestamps) > FirstType(9781783433803, 1490172958602) 1> FirstType(9781783433803, ,1490172958611) 1> FirstType(9781783433803, 1490172958611) 1> FirstType(9781783433803, 1490172958620) 1> FirstType(9781783433803, 1490172958620) 1> FirstType(9781783433803 ,1490196171869) 1> FirstType(9781783433803, 1490196171869) SecondType(9781783433803 ,0) SecondType(9781783433803, 0) SecondType(9781783433803, 1488834670490) SecondType(9781783433803, 1489577984143) SecondType(9781783433803, 0) SecondType(9781783433803, 0) SecondType(9781783433803, 0) SecondType(9781783433803, 1488834670490) SecondType(9781783433803, 1489577984143) SecondType(9781783433803, 1489689399726) SecondType(9781783433803, 1489689399726) I confirm that I have healthy incoming streams at the entrance of the coGroup operator. I think I'm likely missing something easy. Any help will be really appreciated. Sincerly, Andrea -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.