Hello!



I was unable to do event time window aggregation with Kafka source, but had no 
problem with "fromElement" source. The code is attached as follow. The code has 
two data sources, named "streamSource" and "kafkaSource" respectively. The 
program works well with "streamSource", but not with "watermarkedStream".




public class WindowReduceTest2 {    public static void main(String[] 
args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


        // 使用fromElement数据源
&nbsp; &nbsp; &nbsp; &nbsp; DataStreamSource<Event2&gt; streamSource = 
env.fromElements(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Alice", 
"./home", "2023-02-04 17:10:11"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Bob", 
"./cart", "2023-02-04 17:10:12"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Alice", 
"./home", "2023-02-04 17:10:13"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Alice", 
"./home", "2023-02-04 17:10:15"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Cary", 
"./home", "2023-02-04 17:10:16"),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new Event2("Cary", 
"./home", "2023-02-04 17:10:16")
&nbsp; &nbsp; &nbsp; &nbsp; );


&nbsp; &nbsp; &nbsp; &nbsp; // 使用Kafka数据源
&nbsp; &nbsp; &nbsp; &nbsp; JsonDeserializationSchema<Event2&gt; jsonFormat = 
new JsonDeserializationSchema<&gt;(Event2.class);
&nbsp; &nbsp; &nbsp; &nbsp; KafkaSource<Event2&gt; source = 
KafkaSource.<Event2&gt;builder()
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
.setBootstrapServers(Config.KAFKA_BROKERS)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
.setTopics(Config.KAFKA_TOPIC)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setGroupId("my-group")
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
.setStartingOffsets(OffsetsInitializer.earliest())
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
.setValueOnlyDeserializer(jsonFormat)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
&nbsp; &nbsp; &nbsp; &nbsp; DataStreamSource<Event2&gt; kafkaSource = 
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
&nbsp; &nbsp; &nbsp; &nbsp; kafkaSource.print();


&nbsp; &nbsp; &nbsp; &nbsp; // 生成watermark,从数据中提取时间作为事件时间
&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<Event2&gt; 
watermarkedStream = 
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2&gt;forBoundedOutOfOrderness(Duration.ZERO)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
.withTimestampAssigner(new SerializableTimestampAssigner<Event2&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public 
long extractTimestamp(Event2 element, long recordTimestamp) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; Date date = null;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; try {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; date = simpleDateFormat.parse(element.getTime());
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; } catch (ParseException e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; throw new RuntimeException(e);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; long time = date.getTime();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; System.out.println(time);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; return time;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }));


&nbsp; &nbsp; &nbsp; &nbsp; // 窗口聚合
&nbsp; &nbsp; &nbsp; &nbsp; watermarkedStream.map(new MapFunction<Event2, 
Tuple2<String, Long&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public 
Tuple2<String, Long&gt; map(Event2 value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; // 将数据转换成二元组,方便计算
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; return Tuple2.of(value.getUser(), 1L);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(r -&gt; r.f0)
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 设置滚动事件时间窗口
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .reduce(new 
ReduceFunction<Tuple2<String, Long&gt;&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public 
Tuple2<String, Long&gt; reduce(Tuple2<String, Long&gt; value1, Tuple2<String, 
Long&gt; value2) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; // 定义累加规则,窗口闭合时,向下游发送累加结果
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; return Tuple2.of(value1.f0, value1.f1 + value2.f1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .print("Aggregated 
stream");


&nbsp; &nbsp; &nbsp; &nbsp; env.execute();
&nbsp; &nbsp; }
}






Notably, if TumblingEventTimeWindows was replaced with 
TumblingProcessingTimeWindows, the program works well even with 
"watermarkedStream"



Thanks for your time!

Lucas

Reply via email to