mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r609948744



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
##########
@@ -38,20 +48,36 @@
     private final String otherWindowName;
     private final long joinBeforeMs;
     private final long joinAfterMs;
+    private final long joinGraceMs;
 
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? 
extends R> joiner;
     private final boolean outer;
+    private final Optional<String> outerJoinWindowName;
+    private final boolean thisJoin;
+
+    // Observed time is AtomicLong because this time is shared between the 
left and side processor nodes. However,
+    // this time is not updated in parallel, so we can call get() several 
times without worry about getting different
+    // times.
+    private final AtomicLong maxObservedStreamTime;

Review comment:
       Do we need to maintain it manually? Could we use `context.streamTime()` 
instead?
   
   Note that `context.streamTime()` might be slightly different because we 
advance it for every input record. Thus, if there is a filter before the join, 
the join might not get all records and thus it's locally observed stream-time 
could differ from the task stream-time.
   
   It's a smaller semantic impact/difference and it's unclear to me, if we 
should prefer processor-local stream-time or task stream-time?
   
   \cc @guozhangwang @vvcephei 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to