Guozhang Wang created KAFKA-5581: ------------------------------------ Summary: Streams can be smarter in deciding when to create changelog topics for state stores Key: KAFKA-5581 URL: https://issues.apache.org/jira/browse/KAFKA-5581 Project: Kafka Issue Type: Bug Components: streams Reporter: Guozhang Wang
Today Streams make all state stores to be backed by a changelog topic by default unless users overrides it by {{disableLogging}} when creating the state store / materializing the KTable. However there are a few cases where a separate changelog topic would not be required as we can re-use an existing topic for that. A few examples: There are a few places where the materialized store do not need a separate changelog topic, for example: 1) If a KTable is read directly from a source topic, and is materialized i.e. {code} table1 = builder.table("topic1", "store1")`. {code} In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do not need to create a separate {{table1-changelog}} topic. 2) if a KTable is materialized and then sent directly into a sink topic with the same key, e.g. {code} table1 = stream.groupBy(...).aggregate("state1").to("topic2"); {code} In this case {{state1}}'s changelog topic can just be {{topic2}}, and we do not need to create a separate {{state1-changelog}} topic anymore; 3) if a KStream is materialized for joins where the streams are directly from a topic, e.g.: {code} stream1 = builder.stream("topic1"); stream2 = builder.stream("topic2"); stream3 = stream1.join(stream2, windows); // stream1 and stream2 are materialized with a changelog topic {code} Since stream materialization is append-only we do not need a changelog for the state store as well but can just use the source {{topic1}} and {{topic2}}. 4) When you have some simple transformation operations or even join operations that generated new KTables, and which needs to be materialized with a state store, you can use the changelog topic of the previous KTable and applies the transformation logic upon restoration instead of creating a new changelog topic. For example: {code} table1 = builder.table("topic1"); table2 = table1.filter(..).join(table3); // table2 needs to be materialized for joining {code} We can set the {{getter}} function of table2's materialized store, say {{state2}} to be reading from {{topic1}} and then apply the filter operator, instead of creating a new {{state2-changelog}} topic in this case. 5) more use cases ... We can come up with a general internal impl optimizations to determine when / how to set the changelog topic for those materialized stores at the runtime startup when generating the topology. -- This message was sent by Atlassian JIRA (v6.4.14#64029)