[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15592931#comment-15592931
 ] 

Guozhang Wang commented on KAFKA-4113:
--------------------------------------

[~gfodor] The main reason we do not log the source KTable is that the source 
topic itself is exactly the changelog of this source KTable's materialized 
state store, and if we log it with another topic, these two topics will have 
exactly the same messages. Instead, we register the source topic as the 
changelog of this state store, 

And during the state store restoration process, the store should be brought 
back to current state by replaying the changelog topic (i.e. the source topic) 
from offset 0 up to the LEO.  But from your description it seems not the case 
for you. I checked the source code in trunk and do not have an obvious bug in 
the {{ProcessorStateManager}} class: i.e. when the KTable is created from a 
source topic via {{builder.table(...)}} then the source topic should be 
registered as the changelog and hence used to restore the state upon 
(re-)initialization. If you are running the current trunk of Kafka then I think 
there might be some hidden bugs in the code that is not exposed yet.

> Allow KTable bootstrap
> ----------------------
>
>                 Key: KAFKA-4113
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4113
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to