Hi,

In a datastream processing problem, the source generated data every 8
millisecond and timestamp is a field of the data. In default Flink time
behavior data enter the time window but when I set Flink time to EventTime
it will output nothing! Here is the code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

SingleOutputStreamOperator<Tuple3<String,Long, JSONObject>> res =
aggregatedTuple
                .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long,
JSONObject>>(Time.milliseconds(8)) {

            @Override
            public long extractTimestamp(Tuple3<String, Long,
JSONObject> element) {
                return element.f1 ;
            }
        }).keyBy(1).timeWindow(Time.milliseconds(8))
                .allowedLateness(Time.milliseconds(3))
                .sideOutputLateData(lateOutputTag)
                .reduce(processing...);
        DataStream<Tuple3<String, Long, JSONObject>> lateData =
res.getSideOutput(lateOutputTag);
        res.print();

What is the problem with my code?
According to the Flink documents, my understanding about EventTime is that
for example in case of time window when a new data received it start a new
(logical window) based on new data event timestamp and wait 8 milliseconds
(according to my code) to see if any other data with the same key received
or not and after 8 millisecond (from timestamp of the first element of the
window) it will be triggered. Since data source generated data in a
constant periodic interval, I set a watermarck of  8, too. Is my
understanding about Flink window in EventTime correct?

Reply via email to