Dear Community,

I'm really struggling on a co-grouped stream. The workload is the following:

*
    val firstStream: DataStream[FirstType] =
      firstRaw.assignTimestampsAndWatermarks(new
MyCustomFirstExtractor(maxOutOfOrder))

    val secondStream: DataStream[SecondType] = secondRaw
      .assignTimestampsAndWatermarks(new
MyCustomSecondExtractor(maxOutOfOrder))
      .map(new toSecondsStreamMapper())
*

where both the Extractors extend BoundedOutOfOrdernessTimestampExtractor by
overriding the extractTimestamp method and assigning timestamps owned
respectively by FirstType and SecondType objects.

*override def extractTimestamp(first: FirstType): Long = first.timestamp*

Then I'm calling cogroup as follows

*

val stockDetails = firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .apply(new MyCogroupFunction())
      .uid("myCogroup")
      .name("My CoGroup")

*

The problem is the CoGroup function is never triggered. I did several tests
and I was not able to solve it at all. 
The first relevant point is that event time can be seriously out-of-order. I
can even bump into 0 timestamp. Then I faked also timestamps in order to
distribute them in a set of two seconds, five seconds, so forth. These tries
didn't change at all the behavior: no one window is raised.

Another relevant is: I'm running locally by reading from a pre-loaded kafka
topic, then all the events are ridden sequentially at startup.

I will give a couple example

Workload 1 (faked timestamps)
fields (id, timestamp)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131328)
FirstType(9781783433803 ,1490280131328)

SecondType(9781783433803,1490280130465)
SecondType(9781783433803,1490280131027)
SecondType(9781783433803,1490280131051)
SecondType(9781783433803,1490280131070)
SecondType(9781783433803,1490280131085)
SecondType(9781783433803,1490280131103)
SecondType(9781783433803,1490280131124)
SecondType(9781783433803,1490280131143)
SecondType(9781783433803,1490280131158)
SecondType(9781783433803,1490280131175)

Workload 2 (real case timestamps)

> FirstType(9781783433803, 1490172958602)
1> FirstType(9781783433803, ,1490172958611)
1> FirstType(9781783433803, 1490172958611)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803 ,1490196171869)
1> FirstType(9781783433803, 1490196171869)

SecondType(9781783433803 ,0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 1489689399726)
SecondType(9781783433803, 1489689399726)

I confirm that I have healthy incoming streams at the entrance of the
coGroup operator.
I think I'm likely missing something easy.

Any help will be really appreciated.

Sincerly,

Andrea




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to