[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-3514:
----------------------------
    Fix Version/s: 2.1.0

> Stream timestamp computation needs some further thoughts
> --------------------------------------------------------
>
>                 Key: KAFKA-3514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>              Labels: architecture, kip
>             Fix For: 2.1.0
>
>
> KIP-353: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code:java}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code:java}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> [http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor])
> {quote}Let's assume the following case.
>  - a stream processor that uses the Processor API
>  - context.schedule(1000) is called in the init()
>  - the processor reads only one topic that has one partition
>  - using custom timestamp extractor, but that timestamp is just a wall
>  clock time
>  Image the following events:
>  1., for 10 seconds I send in 5 messages / second
>  2., does not send any messages for 3 seconds
>  3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
>  send any messages. This is ok according to the documentation, because 
>  there is not any new messages to trigger the punctuate() call. When the 
>  first few messages arrives after a restart the sending (point 3. above) I 
>  see the following sequence of method calls:
> 1., process() on the 1st message
>  2., punctuate() is called 3 times
>  3., process() on the 2nd message
>  4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
>  process() is called on the messages, because the first message's timestamp 
>  is already 3 seconds older then the last punctuate() was called, so the 
>  first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to