I have a streaming job based on Event time, which has a 60 seconds window and 10 seconds sliding window. Data will come in batches every 10 second.
Here's the code. ``` val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(watermarkGenInterval) env.setParallelism(parallel) env.addSource(source) .map(json => { new InvokingInfoWrapper(xxx) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5)) { override def extractTimestamp(invoking: InvokingInfoWrapper): Long = { invoking.timestamp } }) .keyBy(invokingInfo => { s"${invokingInfo.caller}_${invokingInfo.callee}" }) .timeWindow(Time.seconds(60), Time.seconds(10)) .reduce(innerReducer).map(invokingInfo => { // ##2map ================================================= //some mapping code invokingInfo }) .addSink(new WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink") ``` And I Have noticed that something wrong: 1. the first data came with timestamp:03:15:48 2. the second data came with timestamp:03:15:59?? and triggered the reduce operation(5 reduce operations??there should be 5 window) 3. the third one: 03:16:06, and also triggered reduce opertaions. 4. now the fourth data came with timestamp:03:17:55, at that time, a new window should be open, and the previous window should closed and the result should enter line "##2map" above. But it didn't. 5. the fifth data came with timestamp:03:18:01, and triggered the reduce operation with the fourth data. So it seems that the top three datas had drop silently. Somebody help on this?