实际使用你肯定不会是console producer吧。或者你换java代码写kafka,方便控制些。
wei_yuze <wei_y...@qq.com.invalid> 于2023年2月8日周三 13:30写道: > > 非常感谢各位的回答! > > > > Weihua和飞雨正确定位出了问题。问题出在Flink 并发数大于Kafka分区数,导致部分Flink task slot > 接收不到数据,进而导致watermark(取所有task slot的最小值)无法推进。 > > > 我尝试了Weihua提供的两个解决方案后都可以推进watermark求得窗口聚合结果。 > > > 后来我想,理想的解决方式应该是使Flink的并发数接近于或等于Kafka的分区数。我的Kafka分区数为3,于是Flink setParallelism > 为3。后来发现又无法推进watermark。检查Kafka后发现,kafka Console Producer把所有的数据都推送到了第0号分区。 > > > > 请问哪位能指点一下,让Kafka topic的每个分区都能收到数据? > > > > > > Best, > > Lucas > > > > Original Email > > > > Sender:"Weihua Hu"< huweihua....@gmail.com >; > > Sent Time:2023/2/7 18:48 > > To:"user-zh"< user-zh@flink.apache.org >; > > Subject:Re: Kafka 数据源无法实现基于事件时间的窗口聚合 > > > Hi, > > 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task > 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。 > 可以尝试通过以下办法解决: > 1. 将 source 并发控制为 1 > 2. 为 watermark 策略开始 idleness 处理,参考 [#1] > > fromElement 数据源会强制指定并发为 1 > > [#1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources > > > Best, > Weihua > > > On Tue, Feb 7, 2023 at 1:31 PM wei_yuze wrote: > > > 您好! > > > > > > > > > > > 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource > > 和 kafkaSource > > 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。 > > > > > > > > > > public class WindowReduceTest2 { public static void > > main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > // 使用fromElement数据源 > > DataStreamSource> env.fromElements( > > new > > Event2("Alice", "./home", "2023-02-04 17:10:11"), > > new Event2("Bob", > > "./cart", "2023-02-04 17:10:12"), > > new > > Event2("Alice", "./home", "2023-02-04 17:10:13"), > > new > > Event2("Alice", "./home", "2023-02-04 17:10:15"), > > new > Event2("Cary", > > "./home", "2023-02-04 17:10:16"), > > new > Event2("Cary", > > "./home", "2023-02-04 17:10:16") > > ); > > > > > > // 使用Kafka数据源 > > JsonDeserializationSchema<Event2> > > jsonFormat = new JsonDeserializationSchema<>(Event2.class); > > KafkaSource> > KafkaSource.<Event2>builder() > > > > .setBootstrapServers(Config.KAFKA_BROKERS) > > > > .setTopics(Config.KAFKA_TOPIC) > > > > .setGroupId("my-group") > > > > .setStartingOffsets(OffsetsInitializer.earliest()) > > > > .setValueOnlyDeserializer(jsonFormat) > > .build(); > > DataStreamSource> env.fromSource(source, > WatermarkStrategy.noWatermarks(), "Kafka Source"); > > kafkaSource.print(); > > > > > > // 生成watermark,从数据中提取时间作为事件时间 > > SingleOutputStreamOperator<Event2> > > watermarkedStream = > > > kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2>forBoundedOutOfOrderness(Duration.ZERO) > > > > .withTimestampAssigner(new SerializableTimestampAssigner> > > > @Override > > > > public long extractTimestamp(Event2 element, long recordTimestamp) { > > > > SimpleDateFormat simpleDateFormat = new > > SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); > > > > Date date = null; > > > > try { > > > > date = > > simpleDateFormat.parse(element.getTime()); > > > > } catch (ParseException e) { > > > > throw new RuntimeException(e); > > > > } > > > > long time = date.getTime(); > > > > System.out.println(time); > > > > return time; > > } > > })); > > > > > > // 窗口聚合 > > watermarkedStream.map(new MapFunction<Event2, > > Tuple2> > > > @Override > > > > public Tuple2> > > > // 将数据转换成二元组,方便计算 > > > > return Tuple2.of(value.getUser(), 1L); > > } > > }) > > .keyBy(r -> > > r.f0) > > // 设置滚动事件时间窗口 > > > > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > > .reduce(new > > ReduceFunction<Tuple2> > > > @Override > > > > public Tuple2> Tuple2> > > > // 定义累加规则,窗口闭合时,向下游发送累加结果 > > > > return Tuple2.of(value1.f0, value1.f1 + value2.f1); > > } > > }) > > > .print("Aggregated > > stream"); > > > > > > env.execute(); > > } > > } > > > > > > > > > > > > > > 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows > > ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。 > > > > > > > > 感谢您花时间查看这个问题! > > Lucas