[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468817#comment-15468817 ]
Guozhang Wang commented on KAFKA-4113: -------------------------------------- More thoughts regarding this issue after discussing with [~jkreps] offline: 1. Within a single stream, records may not be ordered based on their timestamps. And since Kafka Streams always try to process records based on the source topic's log append ordering, it will result in so-called {{late arrived records}}. 2. When a single Kafka Streams task has multiple input streams, it will try to "synchronize" these streams based on their timestamps in a best-effort manner. Therefore we may process a record from one stream with higher timestamp before processing another record from another stream with lower timestamp. 3. For table-stream joins, ideally we want to process records strictly following the timestamp ordering across multiple streams; in that case, I believe users do NOT really need to {{bootstrap a KTable}} since as long as the timestamps of the stream and the table changelog are defined correctly, we are guaranteed to have the correct table snapshot when joining with the stream. For example, say if your table's update traffic is only once-a-week, whose timestamp is defined as the EOW, then when you are (re-)starting the application, it is guaranteed that any stream records whose timestamp is defined before the EOW time will be joined against the "old" table snapshot, and any stream records whose timestamp is defined after the EOW time will be joined against the "updated" table snapshot, which is the right behavior. 4. However because 1) and 2) above, we are not strictly following the timestamp ordering, the join result is not guaranteed to be "correct" or deterministic. In addition, note that the above reasoning assumes that the timestamps on both streams are defined in a consistent manner, for example, if both of these records are generated by the same application who's using the same clock for setting the timestamps; otherwise, for example if the joining streams are not set by the same application of service and there is a time drift between their clocks, then even {{strictly following the timestamp-based ordering}} may still not generate the correct result, and scenarios like [~mjsax] mentioned that a KTable's record may not available when the KStream's record has arrived and is trying to be enriched with the KTable, if its timestamp is indeed defined to be smaller than the corresponding KTable's record timestamp. 5. Therefore, users propose {{bootstrap a KTable}} mainly as a way to give the table's changelog stream's time a bit "advantage" over the ordering based on their timestamps so that they are more likely to be processed than the other record stream with the similar timestamps. On the other hand, because of 4) mentioned above I think it is very hard, or even impossible to get absolute "correct answers", but just deterministic answers to the best (that is also the motivation of using window retention period in Kafka Streams, or watermarks / hints indicating if there is no late records, along with triggering mechanisms in other frameworks I think). Following these arguments, here are a list of proposals I'm thinking about to tackle this requirement: 1. Give users the flexibility to define ordering priorities across multiple streams in determining what is the next record to process (i.e. "synchronize" them). There are difference ways to expose this API; for example, Samza uses a {{MessageChooser}} user-customizable interface. 2. More restrictive than 1) above, we only allow users to specify an amount of time that one stream should go a little "in advance" with other streams such that its records with timestamp {{t + delta}} where {{delta}} is configurable is considered at the same time with other stream's records with time {{t}}. I think most the proposed options in the description of this ticket fall into this category. 3. Different to proposal 1) / 2) above. We change the implementation of KTable-KStream joins to also materialize the KStream based on a sliding window, so that when a record from KStream arrives, it tries to join with the current snapshot of KTable with its backed state store; and when a record from KTable arrives, it tries to join with the KStream's materialized window store with any matching records whose timestamp is smaller than the KTable's update record. 4. This is complementary to 2) / 3) above, that if we do not make the stream synchronization mechanism customizable as proposed in 1), then we can at least consider making it deterministic. So that any join types will generate deterministic results as well. Thoughts [~mjsax] [~enothereska] [~damianguy]? > Allow KTable bootstrap > ---------------------- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)