Hello!
I was unable to do event time window aggregation with Kafka source, but had no problem with "fromElement" source. The code is attached as follow. The code has two data sources, named "streamSource" and "kafkaSource" respectively. The program works well with "streamSource", but not with "watermarkedStream". public class WindowReduceTest2 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 使用fromElement数据源 DataStreamSource<Event2> streamSource = env.fromElements( new Event2("Alice", "./home", "2023-02-04 17:10:11"), new Event2("Bob", "./cart", "2023-02-04 17:10:12"), new Event2("Alice", "./home", "2023-02-04 17:10:13"), new Event2("Alice", "./home", "2023-02-04 17:10:15"), new Event2("Cary", "./home", "2023-02-04 17:10:16"), new Event2("Cary", "./home", "2023-02-04 17:10:16") ); // 使用Kafka数据源 JsonDeserializationSchema<Event2> jsonFormat = new JsonDeserializationSchema<>(Event2.class); KafkaSource<Event2> source = KafkaSource.<Event2>builder() .setBootstrapServers(Config.KAFKA_BROKERS) .setTopics(Config.KAFKA_TOPIC) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(jsonFormat) .build(); DataStreamSource<Event2> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); kafkaSource.print(); // 生成watermark,从数据中提取时间作为事件时间 SingleOutputStreamOperator<Event2> watermarkedStream = kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<Event2>() { @Override public long extractTimestamp(Event2 element, long recordTimestamp) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = null; try { date = simpleDateFormat.parse(element.getTime()); } catch (ParseException e) { throw new RuntimeException(e); } long time = date.getTime(); System.out.println(time); return time; } })); // 窗口聚合 watermarkedStream.map(new MapFunction<Event2, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(Event2 value) throws Exception { // 将数据转换成二元组,方便计算 return Tuple2.of(value.getUser(), 1L); } }) .keyBy(r -> r.f0) // 设置滚动事件时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { // 定义累加规则,窗口闭合时,向下游发送累加结果 return Tuple2.of(value1.f0, value1.f1 + value2.f1); } }) .print("Aggregated stream"); env.execute(); } } Notably, if TumblingEventTimeWindows was replaced with TumblingProcessingTimeWindows, the program works well even with "watermarkedStream" Thanks for your time! Lucas