vcrfxia commented on code in PR #13496: URL: https://github.com/apache/kafka/pull/13496#discussion_r1158941945
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java: ########## @@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) { @Override public ValueAndTimestamp<VOut> get(final K key) { - final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key); + return computeJoin(key, valueGetter1::get, valueGetter2::get); + } + + @Override + public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) { Review Comment: > the value-getter should return the same thing as if the result state-store would have been materialized Why is this true? I agree that this is not happening for this particular value getter, but I don't understand why this is the property that we wish to preserve. Wherever the KTable that is a result of a processor computation is materialized, then the value getter for that table uses the materialization directly. It's only when the result is not materialized where the value getter may need to compute results on the fly. When a join value getter needs to perform computation on the fly, this value getter can actually return correct results for a timestamped get (as long as both inputs are versioned), by performing timestamped lookups from both inputs and joining the result. What you point out is that this join result may have never been emitted downstream, and that is true, but the two feel separate to me. In an "ideal" world I think we would emit all older join results downstream when joining versioned tables. (It's just not practical from a computation standpoint, and it's also not clear that users want this by default.) I guess I don't see why the fact that we did not emit the result downstream means we should not return it if asked to compute it on-the-fly. It seems like if we have the correct value available, then we should return it. It does lead to an interesting/confusing situation where if the result of a join between two versioned tables is materialized with a versioned store, then timestamped gets will not necessarily return correct results, even though correct results could be obtained by not materializing the join result and simply using the value getter instead. The alternative is to say that this value getter does not support timestamped gets, even though it does have a way to compute them (correctly), which feels odd. Curious to hear your thoughts on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org