[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16853186#comment-16853186 ]
Guozhang Wang commented on KAFKA-8377: -------------------------------------- Yes, I'm suggesting to add another condition in `KTableImpl#valueGetterSupplier()`: similar to `KTableSrouceValueGetterSupplier` such that if some descendant processors require to get values which delegate all the way to it, then it would materialize, but for transformValuesGetterSupplier we can choose to only materialize if it is associated with some stores indeed. Regarding `KTableSourceValueGetterSupplier`: good catch, I agree it can be removed with `KTableMaterializedValueGetterSupplier` > KTable#transformValue might lead to incorrect result in joins > ------------------------------------------------------------- > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Matthias J. Sax > Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)