vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158941945


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = 
valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long 
asOfTimestamp) {

Review Comment:
   > the value-getter should return the same thing as if the result state-store 
would have been materialized
   
   Why is this true? I agree that this is not happening for this particular 
value getter, but I don't understand why this is the property that we wish to 
preserve. 
   
   Wherever the KTable that is a result of a processor computation is 
materialized, then the value getter for that table uses the materialization 
directly. It's only when the result is not materialized where the value getter 
may need to compute results on the fly. When a join value getter needs to 
perform computation on the fly, this value getter can actually return correct 
results for a timestamped get (as long as both inputs are versioned), by 
performing timestamped lookups from both inputs and joining the result. 
   
   What you point out is that this join result may have never been emitted 
downstream, and that is true, but the two feel separate to me. In an "ideal" 
world I think we would emit all older join results downstream when joining 
versioned tables. (It's just not practical from a computation standpoint, and 
it's also not clear that users want this by default.) I guess I don't see why 
the fact that we did not emit the result downstream means we should not return 
it if asked to compute it on-the-fly. It seems like if we have the correct 
value available, then we should return it.
   
   It does lead to an interesting/confusing situation where if the result of a 
join between two versioned tables is materialized with a versioned store, then 
timestamped gets will not necessarily return correct results, even though 
correct results could be obtained by not materializing the join result and 
simply using the value getter instead. The alternative is to say that this 
value getter does not support timestamped gets, even though it does have a way 
to compute them (correctly), which feels odd. Curious to hear your thoughts on 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to