I have a streaming job based on Event time,  which has a 60 seconds window and 
10 seconds sliding window.
Data will come in batches every 10 second.


Here's the code.
```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)


env.setParallelism(parallel)


env.addSource(source)
      .map(json => {
          new InvokingInfoWrapper(xxx)
        })
      .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) {
        override def extractTimestamp(invoking: InvokingInfoWrapper): Long = {
          invoking.timestamp
        }
      })
      .keyBy(invokingInfo => {
        s"${invokingInfo.caller}_${invokingInfo.callee}"
      })
      .timeWindow(Time.seconds(60), Time.seconds(10))
      .reduce(innerReducer).map(invokingInfo => {
 // ##2map =================================================
      //some mapping code
      invokingInfo
      })
      .addSink(new 
WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")

```




And I Have noticed that something wrong:
1. the first data came with timestamp:03:15:48
2. the second data came with timestamp:03:15:59?? and triggered the reduce 
operation(5 reduce operations??there should be 5 window)
3. the third one: 03:16:06,   and also triggered reduce opertaions.
4. now the fourth data came with timestamp:03:17:55,   at that time, a new 
window should be open, and the previous window should closed and the result 
should enter line "##2map" above. But it didn't.
5. the fifth data came with timestamp:03:18:01, and triggered the reduce 
operation with the fourth data.


So it seems that the top three datas had drop silently.


Somebody help on this?

Reply via email to