[ https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-3514: ----------------------------------- Description: Our current stream task's timestamp is used for punctuate function as well as selecting which stream to process next (i.e. best effort stream synchronization). And it is defined as the smallest timestamp over all partitions in the task's partition group. This results in two unintuitive corner cases: 1) observing a late arrived record would keep that stream's timestamp low for a period of time, and hence keep being process until that late record. For example take two partitions within the same task annotated by their timestamps: {code} Stream A: 5, 6, 7, 8, 9, 1, 10 {code} {code} Stream B: 2, 3, 4, 5 {code} The late arrived record with timestamp "1" will cause stream A to be selected continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 until the record itself is dequeued and processed, then stream B will be selected starting with timestamp 2. 2) an empty buffered partition will cause its timestamp to be not advanced, and hence the task timestamp as well since it is the smallest among all partitions. This may not be a severe problem compared with 1) above though. *Update* There is one more thing to consider (full discussion found here: http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) {quote} Let's assume the following case. - a stream processor that uses the Processor API - context.schedule(1000) is called in the init() - the processor reads only one topic that has one partition - using custom timestamp extractor, but that timestamp is just a wall clock time Image the following events: 1., for 10 seconds I send in 5 messages / second 2., does not send any messages for 3 seconds 3., starts the 5 messages / second again I see that punctuate() is not called during the 3 seconds when I do not send any messages. This is ok according to the documentation, because there is not any new messages to trigger the punctuate() call. When the first few messages arrives after a restart the sending (point 3. above) I see the following sequence of method calls: 1., process() on the 1st message 2., punctuate() is called 3 times 3., process() on the 2nd message 4., process() on each following message What I would expect instead is that punctuate() is called first and then process() is called on the messages, because the first message's timestamp is already 3 seconds older then the last punctuate() was called, so the first message belongs after the 3 punctuate() calls. {quote} was: Our current stream task's timestamp is used for punctuate function as well as selecting which stream to process next (i.e. best effort stream synchronization). And it is defined as the smallest timestamp over all partitions in the task's partition group. This results in two unintuitive corner cases: 1) observing a late arrived record would keep that stream's timestamp low for a period of time, and hence keep being process until that late record. For example take two partitions within the same task annotated by their timestamps: {code} Stream A: 5, 6, 7, 8, 9, 1, 10 {code} {code} Stream B: 2, 3, 4, 5 {code} The late arrived record with timestamp "1" will cause stream A to be selected continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 until the record itself is dequeued and processed, then stream B will be selected starting with timestamp 2. 2) an empty buffered partition will cause its timestamp to be not advanced, and hence the task timestamp as well since it is the smallest among all partitions. This may not be a severe problem compared with 1) above though. > Stream timestamp computation needs some further thoughts > -------------------------------------------------------- > > Key: KAFKA-3514 > URL: https://issues.apache.org/jira/browse/KAFKA-3514 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Eno Thereska > Labels: architecture > Fix For: 0.11.0.0 > > > Our current stream task's timestamp is used for punctuate function as well as > selecting which stream to process next (i.e. best effort stream > synchronization). And it is defined as the smallest timestamp over all > partitions in the task's partition group. This results in two unintuitive > corner cases: > 1) observing a late arrived record would keep that stream's timestamp low for > a period of time, and hence keep being process until that late record. For > example take two partitions within the same task annotated by their > timestamps: > {code} > Stream A: 5, 6, 7, 8, 9, 1, 10 > {code} > {code} > Stream B: 2, 3, 4, 5 > {code} > The late arrived record with timestamp "1" will cause stream A to be selected > continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 > until the record itself is dequeued and processed, then stream B will be > selected starting with timestamp 2. > 2) an empty buffered partition will cause its timestamp to be not advanced, > and hence the task timestamp as well since it is the smallest among all > partitions. This may not be a severe problem compared with 1) above though. > *Update* > There is one more thing to consider (full discussion found here: > http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor) > {quote} > Let's assume the following case. > - a stream processor that uses the Processor API > - context.schedule(1000) is called in the init() > - the processor reads only one topic that has one partition > - using custom timestamp extractor, but that timestamp is just a wall > clock time > Image the following events: > 1., for 10 seconds I send in 5 messages / second > 2., does not send any messages for 3 seconds > 3., starts the 5 messages / second again > I see that punctuate() is not called during the 3 seconds when I do not > send any messages. This is ok according to the documentation, because > there is not any new messages to trigger the punctuate() call. When the > first few messages arrives after a restart the sending (point 3. above) I > see the following sequence of method calls: > 1., process() on the 1st message > 2., punctuate() is called 3 times > 3., process() on the 2nd message > 4., process() on each following message > What I would expect instead is that punctuate() is called first and then > process() is called on the messages, because the first message's timestamp > is already 3 seconds older then the last punctuate() was called, so the > first message belongs after the 3 punctuate() calls. > {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)