1996fanrui commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1245546244


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -631,8 +631,9 @@ private void ensureStarted() {
         }
     }
 
-    private static class WatermarkAggregator<T> {
+    static class WatermarkAggregator<T> {
         private final Map<T, Watermark> watermarks = new HashMap<>();
+        /** The aggregatedWatermark is the smallest watermark of all keys. */

Review Comment:
   Hi @pnowojski  @StefanRRichter , thanks for your review and the great 
suggestion.
   
   Your suggestion is very helpful, and it's a more general and understandable 
solution. I have tried it with Flink's `HeapPriorityQueue` and Java's 
`PriorityQueue`, and I found the performance of `HeapPriorityQueue` is 20~30 
times better than Java's `PriorityQueue`.
   
   The following is the benchmark duration:
   
   - The duration based on master branch is 1 min 44s
   - The duration based on master branch with old PR is 469ms
   - The duration based on master branch with current PR(Flink's 
`HeapPriorityQueue`) is 284ms
   - The duration based on master branch with current PR(Java's 
`PriorityQueue`) is about 6 s
   
   So the `HeapPriorityQueue` is very strong and I will learn it later, thanks 
to @StefanRRichter for the great job.
   
   I choose the Flink's `HeapPriorityQueue` based on the benchmark test, and 
updated this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to