[ https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931696#comment-16931696 ]
Matthias J. Sax commented on KAFKA-8377: ---------------------------------------- {quote}1. Does this mean that every stateful operation on a KTable must be materialized? {quote} No. We only need to materialize the result of a KTable operation, iff, the next operation is a stateful transformValues. {quote}2. If that's the case should the user be notified that they need to use a materialized store for such operations e.g. after the topology optimization we can suggest that a materialized store needs to be created. I'm not a 100% sure if we must force create a StateStore (since users may want to pass specific configurations to the statestore) {quote} The user does not need to create a store IMHO – we can just do this internally. However, to be able to materialize the result, we need to know the corresponding `Serdes` – not sure if we need to do anything special about it... But for some cases, if we force materialization we might not have the correct `Serdes` and thus a user would need to specify them upstream via `Materialized` to avoid runtime errors (if they don't do this and hit a runtime error, it might be hard for users to understand the problem...) {quote}3. Is it also possible that's users are materializing the state on REDIS or some other caching mechanism {quote} Theoretically yes. But we don't need to worry about this case. If a user plugs in a custom store, they would need to specify this upstream anyway forcing a materialization explicitly. > KTable#transformValue might lead to incorrect result in joins > ------------------------------------------------------------- > > Key: KAFKA-8377 > URL: https://issues.apache.org/jira/browse/KAFKA-8377 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Matthias J. Sax > Assignee: Aishwarya Pradeep Kumar > Priority: Major > Labels: newbie++ > > Kafka Streams uses an optimization to not materialize every result KTable. If > a non-materialized KTable is input to a join, the lookup into the table > results in a lookup of the parents table plus a call to the operator. For > example, > {code:java} > KTable nonMaterialized = materializedTable.filter(...); > KTable table2 = ... > table2.join(nonMaterialized,...){code} > If there is a table2 input record, the lookup to the other side is performed > as a lookup into materializedTable plus applying the filter(). > For stateless operation like filter, this is safe. However, > #transformValues() might have an attached state store. Hence, when an input > record r is processed by #transformValues() with current state S, it might > produce an output record r' (that is not materialized). When the join later > does a lookup to get r from the parent table, there is no guarantee that > #transformValues() again produces r' because its state might not be the same > any longer. > Hence, it seems to be required, to always materialize the result of a > KTable#transformValues() operation if there is state. Note, that if there > would be a consecutive filter() after tranformValue(), it would also be ok to > materialize the filter() result. Furthermore, if there is no downstream > join(), materialization is also not required. > Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful > #transformValues()` operator. -- This message was sent by Atlassian Jira (v8.3.2#803003)