reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加
是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复
测试输入如下:
1001,/home,1000
1002,/home,2000
输出如下:
input> test.Event(user=1001, page=/home, ts=1000)
input> test.Event(user=1002, page=/home, ts=2000)
test.WordCount(word=/home, count=2)
test.WordCount(word=/home, count=3)
代码如下:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.Serializable;
import java.time.Duration;
public class test {
public static void main(String[] args) {
//准备环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从端口读数据
SingleOutputStreamOperator<Event> ds1 =
env.socketTextStream("hadoop102", 55555).map(
value->{
String[] strings = value.split(",");
return new
Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) );
}
).assignTimestampsAndWatermarks(
//增加水位线策略
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event,
l) -> Event.getTs())
);
//检查输入流
ds1.print("input");
ds1.map(event -> new WordCount(event.getPage(), 1)
).keyBy(WordCount::getWord
//按键分组
).window(
//TumblingEventTimeWindows.of(Time.seconds(10))
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))
//size为10步长为5的滑动窗口
).reduce(
//先增量聚合.将多个数据处理为一个中间结果
(wordCount1, wordCount2) -> {
Integer count = wordCount1.getCount();
wordCount1.setCount(count + 1);
System.out.println(wordCount1);
return wordCount1;
}
);
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Event {
private String user;
private String page;
private Long ts;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class WordCount implements Serializable {
private String word;
private Integer count;
}
}