vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1160829732


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = 
valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long 
asOfTimestamp) {

Review Comment:
   > Because materialization vs non-materialization should be a non-functional 
(ie, non semantical) change, but only a perf/resource footprint change.
   
   Hm, maybe I'm not understanding what's considered a "functional" change and 
what isn't. For example, is caching a functional change? Because versioned 
stores aside for a moment, if a table processor is materialized with a 
non-versioned store with caching enabled, then the downstream will see a 
different set of results than if the table were not materialized / caching was 
not enabled. In either case, the guarantee is that the latest (by offset) 
record is the "correct" value for the key, and the others should be effectively 
ignored. 
   
   Now turning to versioned stores, the only guarantee today when joining two 
versioned tables is that the latest (by timestamp and offset) result is correct 
and the others should be ignored. (In the future, we could expand this 
guarantee by also emitting correct older join results, but we don't do this 
today.) But we still have to answer the question of what happens if a "user" 
tries to query for an older join result (via the KTableValueGetter) -- we don't 
guarantee that the result is correct, but they could still do this. If the 
result table is materialized (as a versioned store), then they may get 
incorrect results. If the result table is not materialized, then it cannot be 
versioned (as you point out) and so they technically can't do this (at least 
from the join merge node), but if they somehow had access to the 
inner/left/outer join processor (before the join merge processor) and called 
`get(key, ts)` on the KTableValueGetter, what should be returned? I think it's 
natural to ret
 urn the correct older join result because we have it available. If you prefer 
to make the case to say that we should simply disallow this, based on the fact 
that versioned joins do not guarantee correct older join results today, I think 
that's fine too but we'll have to reimplement this same logic in the future 
when we do support correct older join history. And having the logic here 
doesn't introduce any incorrectness (IMO) in the meantime, so it seems better 
to leave it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to