[ 
https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645852#comment-16645852
 ] 

ASF GitHub Bot commented on KAFKA-6036:
---------------------------------------

guozhangwang opened a new pull request #5779: KAFKA-6036: Local Materialization 
for Source KTable
URL: https://github.com/apache/kafka/pull/5779
 
 
   Refactor the materialization for source KTables in the way that:
   
   1. If Materialized.as(queryableName) is specified, materialize;
   2. If the downstream operator requires to fetch from this KTable via 
ValueGetters, materialize;
   3. If the downstream operator requires to send old values, materialize.
   
   Otherwise do not materialize the KTable. E.g. 
`builder.table("topic").filter().toStream().to("topic")` would not create any 
state stores.
   
   There's a couple of minor changes along with PR as well:
   
   1. KTableImpl's `queryableStoreName` and `isQueryable` are merged into 
`queryableStoreName` only, and if it is null it means not queryable. As long as 
it is not null, it should be queryable (i.e. internally generated names will 
not be used any more).
   
   To achieve this, splitted `MaterializedInternal.storeName()` and 
`MaterializedInternal.queryableName()`. The former can be internally generated 
and will not be exposed to users. QueryableName can be modified to set to the 
internal store name if we decide to materialize it during the DSL parsing / 
physical topology generation phase. And only if queryableName is specified the 
corresponding KTable is determined to be materialized.
   
   2. Found some overlapping unit tests among `KTableImplTest`, and 
`KTableXXTest`, removed them.
   
   3. There are a few typing bugs found along the way, fixed them as well.
   
   -----------------------
   
   This PR is an illustration of experimenting a poc towards logical 
materializations.
   
   Today we've logically materialized the KTable for filter / mapValues / 
transformValues if queryableName is not specified via Materialized, but 
whenever users specify queryableName we will still always materialize. My 
original goal is to also consider logically materialize for queryable stores, 
but when implementing it via a wrapped store to apply the transformations on 
the fly I realized it is tougher than I thought, because we not only need to 
support `fetch` or `get`, but also needs to support range queries, 
`approximateNumEntries`, and `isOpen` etc as well, which are not efficient to 
support. So in the end I'd suggest we still stick with the rule of always 
materializing if queryableName is specified, and only consider logical 
materialization otherwise.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable logical materialization to physical materialization
> ----------------------------------------------------------
>
>                 Key: KAFKA-6036
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6036
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> Today whenever users specify a queryable store name for KTable, we would 
> always add a physical state store in the translated processor topology.
> For some scenarios, we should consider not physically materialize the KTable 
> but only "logically" materialize it when you have some simple transformation 
> operations or even join operations that generated new KTables, and which 
> needs to be materialized with a state store, you can use the changelog topic 
> of the previous KTable and applies the transformation logic upon restoration 
> instead of creating a new changelog topic. For example:
> {code}
> table1 = builder.table("topic1");
> table2 = table1.filter(..).join(table3); // table2 needs to be materialized 
> for joining
> {code}
> We can actually set the {{getter}} function of table2's materialized store, 
> say {{state2}} to be reading from {{topic1}} and then apply the filter 
> operator, instead of creating a new {{state2-changelog}} topic in this case.
> We can come up with a general internal impl optimizations to determine when 
> to logically materialize, and whether we should actually allow users of DSL 
> to "hint" whether to materialize or not (it then may need a KIP).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to