Hi, All

  When i use the Tumbling Windows, find lost some record. My code as follow

*env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);*

*env.addSource(FlinkKafkaConsumer011......)*







*.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<JSONObject>(Time.minutes(3)) {
              @Override                    public long
extractTimestamp(JSONObject jsonObject) {                        long
logTime = jsonObject.getLongValue("logTime");                        return
logTime;                    }                })*


*.keyBy(jsonObject -> {                    return
jsonObject.getString("userId");                })*

*.timeWindow(Time.seconds(30))*

*.process(new ProcessWindowFunction<JSONObject, JSONObject, String,
TimeWindow>() {*
*                 public void process(String key, Context context,
Iterable<JSONObject> iterable, Collector<JSONObject> collector) throws
Exception {*




*                        SimpleDateFormat sdf = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");                        String
start = sdf.format(new Date(context.window().getStart()));
      String end = sdf.format(new Date(context.window().getEnd()));
                System.out.println(start + "----" + end);*
*                        for (JSONObject jsonObject : iterable) {*
*                           collector.collect(jsonObject);*
*}}}*
*.print("====");*

>From the print result, i found lost some record in the tumbling window. I
can't figure out, any one can help me ?

Reply via email to