Hey Yidan, KafkaShuffle is initially motivated to support shuffle data materialization on Kafka, and started with a limited version supporting hash-partition only. Watermark is maintained and forwarded as part of shuffle data. So you are right, watermark storing/forwarding logic has nothing to do with whether the stream is keyed or not. The current approach in KafkaShuffle should also work for non-keyed streams if I remember correclty. So, yes, the logic can be extracted and generalized.
Best, Yuan On Thu, Mar 4, 2021 at 4:26 PM yidan zhao <hinobl...@gmail.com> wrote: > One more question, If I only need watermark's logic, not keyedStream, why > not provide methods such as writeDataStream and readDataStream. It uses the > similar methods for kafka producer sink records and broadcast watermark to > partitions and then kafka consumers read it and regenerate the watermark. I > think it will be more general? In this way, the kafka consumer reads the > stream from kafka, and can continue to call keyBy to get a keyedStream. I > don't know why KafkaShuffle only considers the 'keyedStream' case. > > Piotr Nowojski <pnowoj...@apache.org> 于2021年3月4日周四 下午3:54写道: > >> Great :) >> >> Just one more note. Currently FlinkKafkaShuffle has a critical bug [1] >> that probably will prevent you from using it directly. I hope it will be >> fixed in some next release. In the meantime you can just inspire your >> solution with the source code. >> >> Best, >> Piotrek >> >> >> [1] https://issues.apache.org/jira/browse/FLINK-21317 >> >> czw., 4 mar 2021 o 03:48 yidan zhao <hinobl...@gmail.com> napisał(a): >> >>> Yes, you are right and thank you. I take a brief look at what >>> FlinkKafkaShuffle is doing, it seems what I need and I will have a try. >>> >>>>