1996fanrui commented on code in PR #22852: URL: https://github.com/apache/flink/pull/22852#discussion_r1245546244
########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ########## @@ -631,8 +631,9 @@ private void ensureStarted() { } } - private static class WatermarkAggregator<T> { + static class WatermarkAggregator<T> { private final Map<T, Watermark> watermarks = new HashMap<>(); + /** The aggregatedWatermark is the smallest watermark of all keys. */ Review Comment: Hi @pnowojski @StefanRRichter , thanks for your review and the great suggestion. Your suggestion is very helpful, and it's a more general and understandable solution. I have tried it with Flink's `HeapPriorityQueue` and Java's `PriorityQueue`, and I found the performance of `HeapPriorityQueue` is 20~30 times better than Java's `PriorityQueue`. The following is the benchmark duration: - The duration based on master branch is 1 min 44s - The duration based on master branch with old PR is 469ms - The duration based on master branch with current PR(Flink's `HeapPriorityQueue`) is 284ms - The duration based on master branch with current PR(Java's `PriorityQueue`) is about 6 s So the `HeapPriorityQueue` is very strong and I will learn it later, thanks to @StefanRRichter for the great job. I choose the Flink's `HeapPriorityQueue` based on the benchmark test, and updated this PR. This benchmark test is removed, so I backup it here, it can run directly inside of `SourceCoordinatorAlignmentTest`. ``` @Test void testWatermarkAggregatorBenchmark() { testWatermarkAggregatorRandomly(20, 10000, false, true); testWatermarkAggregatorRandomly(20, 10000, false, false); testWatermarkAggregatorRandomly(10, 20000, false, true); testWatermarkAggregatorRandomly(10, 20000, false, false); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org