[ 
https://issues.apache.org/jira/browse/KAFKA-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16197940#comment-16197940
 ] 

Guozhang Wang commented on KAFKA-5581:
--------------------------------------

[~johnma] I'd like to break this story into finer grained tasks under the story 
of https://issues.apache.org/jira/browse/KAFKA-6034 and making this as 
unassigned. Please feel free to take a look at 6034's subtasks and see if you 
are interested in working on some of them.

> Streams can be smarter in deciding when to create changelog topics for state 
> stores
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-5581
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5581
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Mariam John
>              Labels: architecture, performance
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. A few examples:
> There are a few places where the materialized store do not need a separate 
> changelog topic, for example:
> 1) If a KTable is read directly from a source topic, and is materialized i.e. 
> {code}
> table1 = builder.table("topic1", "store1")`.
> {code}
> In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do 
> not need to create a separate {{table1-changelog}} topic.
> 2) if a KTable is materialized and then sent directly into a sink topic with 
> the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> In this case {{state1}}'s changelog topic can just be {{topic2}}, and we do 
> not need to create a separate {{state1-changelog}} topic anymore;
> 3) if a KStream is materialized for joins where the streams are directly from 
> a topic, e.g.:
> {code}
> stream1 = builder.stream("topic1");
> stream2 = builder.stream("topic2");
> stream3 = stream1.join(stream2, windows);  // stream1 and stream2 are 
> materialized with a changelog topic
> {code}
> Since stream materialization is append-only we do not need a changelog for 
> the state store as well but can just use the source {{topic1}} and {{topic2}}.
> 4) When you have some simple transformation operations or even join 
> operations that generated new KTables, and which needs to be materialized 
> with a state store, you can use the changelog topic of the previous KTable 
> and applies the transformation logic upon restoration instead of creating a 
> new changelog topic. For example:
> {code}
> table1 = builder.table("topic1");
> table2 = table1.filter(..).join(table3); // table2 needs to be materialized 
> for joining
> {code}
> We can set the {{getter}} function of table2's materialized store, say 
> {{state2}} to be reading from {{topic1}} and then apply the filter operator, 
> instead of creating a new {{state2-changelog}} topic in this case.
> 5) more use cases ...
> We can come up with a general internal impl optimizations to determine when / 
> how to set the changelog topic for those materialized stores at the runtime 
> startup when generating the topology.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to