Hi,
验证了下,问题疑似出现在reduce函数中,复用了下wordCount1这个对象。我试了下new一个新的WordCount作为输出应该就可以了。
猜测这可能和基于Heap的state backend有关,多个窗口的heap state可能直接使用的是一个对象的地址。
```
.reduce(
(wordCount1, wordCount2) -> {
WordCount newWC =
new WordCount(
wordCount1.word, wordCount1.count + wordCount2.count);
System.err.println(newWC);
return newWC;
})
```
--
Best!
Xuyang
在 2023-11-03 10:53:37,"tao zhang" <[email protected]> 写道:
>reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加
>是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复
>
>测试输入如下:
>1001,/home,1000
>1002,/home,2000
>
>输出如下:
>input> test.Event(user=1001, page=/home, ts=1000)
>input> test.Event(user=1002, page=/home, ts=2000)
>test.WordCount(word=/home, count=2)
>test.WordCount(word=/home, count=3)
>
>代码如下:
>
>import lombok.AllArgsConstructor;
>import lombok.Data;
>import lombok.NoArgsConstructor;
>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import
>org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>import java.io.Serializable;
>import java.time.Duration;
>
>public class test {
> public static void main(String[] args) {
> //准备环境
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>
> //从端口读数据
> SingleOutputStreamOperator<Event> ds1 =
> env.socketTextStream("hadoop102", 55555).map(
> value->{
> String[] strings = value.split(",");
> return new
> Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) );
> }
>
> ).assignTimestampsAndWatermarks(
> //增加水位线策略
>
> WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event,
> l) -> Event.getTs())
> );
> //检查输入流
> ds1.print("input");
>
>
> ds1.map(event -> new WordCount(event.getPage(), 1)
> ).keyBy(WordCount::getWord
> //按键分组
> ).window(
> //TumblingEventTimeWindows.of(Time.seconds(10))
> SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))
> //size为10步长为5的滑动窗口
> ).reduce(
> //先增量聚合.将多个数据处理为一个中间结果
>
> (wordCount1, wordCount2) -> {
>
> Integer count = wordCount1.getCount();
>
> wordCount1.setCount(count + 1);
>
> System.out.println(wordCount1);
>
> return wordCount1;
> }
>
>
> );
>
> try {
> env.execute();
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> }
>
> @Data
> @AllArgsConstructor
> @NoArgsConstructor
> public static class Event {
> private String user;
> private String page;
> private Long ts;
>
> }
>
> @Data
> @AllArgsConstructor
> @NoArgsConstructor
>
> public static class WordCount implements Serializable {
> private String word;
> private Integer count;
>
> }
>
>
>
>}
>