[ 
https://issues.apache.org/jira/browse/KAFKA-8377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931696#comment-16931696
 ] 

Matthias J. Sax commented on KAFKA-8377:
----------------------------------------

{quote}1. Does this mean that every stateful operation on a KTable must be 
materialized?
{quote}
No. We only need to materialize the result of a KTable operation, iff, the next 
operation is a stateful transformValues.
{quote}2. If that's the case should the user be notified that they need to use 
a materialized store for such operations e.g. after the topology optimization 
we can suggest that a materialized store needs to be created. I'm not a 100% 
sure if we must force create a StateStore (since users may want to pass 
specific configurations to the statestore)
{quote}
The user does not need to create a store IMHO – we can just do this internally. 
However, to be able to materialize the result, we need to know the 
corresponding `Serdes` – not sure if we need to do anything special about it... 
But for some cases, if we force materialization we might not have the correct 
`Serdes` and thus a user would need to specify them upstream via `Materialized` 
to avoid runtime errors (if they don't do this and hit a runtime error, it 
might be hard for users to understand the problem...)
{quote}3. Is it also possible that's users are materializing the state on REDIS 
or some other caching mechanism
{quote}
Theoretically yes. But we don't need to worry about this case. If a user plugs 
in a custom store, they would need to specify this upstream anyway forcing a 
materialization explicitly.

> KTable#transformValue might lead to incorrect result in joins
> -------------------------------------------------------------
>
>                 Key: KAFKA-8377
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8377
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Aishwarya Pradeep Kumar
>            Priority: Major
>              Labels: newbie++
>
> Kafka Streams uses an optimization to not materialize every result KTable. If 
> a non-materialized KTable is input to a join, the lookup into the table 
> results in a lookup of the parents table plus a call to the operator. For 
> example,
> {code:java}
> KTable nonMaterialized = materializedTable.filter(...);
> KTable table2 = ...
> table2.join(nonMaterialized,...){code}
> If there is a table2 input record, the lookup to the other side is performed 
> as a lookup into materializedTable plus applying the filter().
> For stateless operation like filter, this is safe. However, 
> #transformValues() might have an attached state store. Hence, when an input 
> record r is processed by #transformValues() with current state S, it might 
> produce an output record r' (that is not materialized). When the join later 
> does a lookup to get r from the parent table, there is no guarantee that 
> #transformValues() again produces r' because its state might not be the same 
> any longer.
> Hence, it seems to be required, to always materialize the result of a 
> KTable#transformValues() operation if there is state. Note, that if there 
> would be a consecutive filter() after tranformValue(), it would also be ok to 
> materialize the filter() result. Furthermore, if there is no downstream 
> join(), materialization is also not required.
> Basically, it seems to be unsafe to apply `KTableValueGetter` on a stateful 
> #transformValues()` operator.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to