vcrfxia commented on code in PR #13496: URL: https://github.com/apache/kafka/pull/13496#discussion_r1160829732
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java: ########## @@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) { @Override public ValueAndTimestamp<VOut> get(final K key) { - final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key); + return computeJoin(key, valueGetter1::get, valueGetter2::get); + } + + @Override + public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) { Review Comment: > Because materialization vs non-materialization should be a non-functional (ie, non semantical) change, but only a perf/resource footprint change. Hm, maybe I'm not understanding what's considered a "functional" change and what isn't. For example, is caching a functional change? Because versioned stores aside for a moment, if a table processor is materialized with a non-versioned store with caching enabled, then the downstream will see a different set of results than if the table were not materialized / caching was not enabled. In either case, the guarantee is that the latest (by offset) record is the "correct" value for the key, and the others should be effectively ignored. Now turning to versioned stores, the only guarantee today when joining two versioned tables is that the latest (by timestamp and offset) result is correct and the others should be ignored. (In the future, we could expand this guarantee by also emitting correct older join results, but we don't do this today.) But we still have to answer the question of what happens if a "user" tries to query for an older join result (via the KTableValueGetter) -- we don't guarantee that the result is correct, but they could still do this. If the result table is materialized (as a versioned store), then they may get incorrect results. If the result table is not materialized, then it cannot be versioned (as you point out) and so they technically can't do this (at least from the join merge node), but if they somehow had access to the inner/left/outer join processor (before the join merge processor) and called `get(key, ts)` on the KTableValueGetter, what should be returned? I think it's natural to ret urn the correct older join result because we have it available. If you prefer to make the case to say that we should simply disallow this, based on the fact that versioned joins do not guarantee correct older join results today, I think that's fine too but we'll have to reimplement this same logic in the future when we do support correct older join history. And having the logic here doesn't introduce any incorrectness (IMO) in the meantime, so it seems better to leave it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org