[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15926036#comment-15926036
 ] 

Arun Mathew commented on KAFKA-3514:
------------------------------------

Hi [~mjsax] [~mihbor] . We were building an audit trail for kafka based on 
kafka stream and encountered similar issue.

Our work around was to use a hybrid of event time and system time. During 
regular operation we use event time. But when we create the punctuation 
schedules (object representing the next punctuation time) we also record the 
system time at which the punctuation schedule was created. The punctuate code 
was modified to punctuate anyway, in case the interval specified in the 
punctuation schedule has elapsed in terms of system time (current time - time 
of punctuation schedule creation), a new punctuation schedule corresponding to 
the next expected punctuation time (current punctuation time + punctuation 
interval) is also created. 

In an earlier version of kafka the above logic sufficed as the mayBePunctuate 
was called as part of the polling for events (in the absence of events). But 
current version doesn't seem to call it so we had to patch that portion a bit 
too.

Please let me know your thoughts.

> Stream timestamp computation needs some further thoughts
> --------------------------------------------------------
>
>                 Key: KAFKA-3514
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3514
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Eno Thereska
>              Labels: architecture
>             Fix For: 0.11.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to