Lorenzo Cagnatel created KAFKA-13678:
----------------------------------------

             Summary: 2nd punctuation using STREAM_TIME does not respect 
scheduled interval
                 Key: KAFKA-13678
                 URL: https://issues.apache.org/jira/browse/KAFKA-13678
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.0.0
            Reporter: Lorenzo Cagnatel


Scheduling a punctuator using stream time, the first punctuation occurs 
immediately as documented, but the second one is not triggered at *t_schedule + 
interval* but it could happen before that time. 
For example, assume that we schedule a punctuation every 10 sec at timestamp 5 
(t5). The system now works like this:
{noformat}
t5 -> schedule, punctuate, next schedule at t10
t6 -> no punctuation
t7 -> no punctuation
t8 -> no punctuation
t9 -> no punctuation
t10 -> punctuate, next schedule at t20
...{noformat}
In this example the 2nd schedule occurs after 5 seconds from the first one, 
breaking the interval duration.

>From my point of view, a reasonable behaviour could be:
{noformat}
t5 -> schedule, punctuate, next schedule at t15
t6 -> no punctuation
t7 -> no punctuation
t8 -> no punctuation
t9 -> no punctuation
t10 -> no punctuation
t11 -> no punctuation
t12 -> no punctuation
t13 -> no punctuation
t14 -> no punctuation
t15 -> punctuate, next schedule at t25
...{noformat}
The origin of this problem can be found in {*}StreamTask.schedule{*}:
{code:java}
/**
* Schedules a punctuation for the processor
*
* @param interval the interval in milliseconds
* @param type the punctuation type
* @throws IllegalStateException if the current node is not null
*/
public Cancellable schedule(final long interval, final PunctuationType type, 
final Punctuator punctuator) {
   switch (type) {
      case STREAM_TIME:
         // align punctuation to 0L, punctuate as soon as we have data
         return schedule(0L, interval, type, punctuator);
      case WALL_CLOCK_TIME:
         // align punctuation to now, punctuate after interval has elapsed
         return schedule(time.milliseconds() + interval, interval, type, 
punctuator);
      default:
         throw new IllegalArgumentException("Unrecognized PunctuationType: " + 
type);
   }
}{code}
when, in case of stream time, it calls *schedule* with {*}startTime=0{*}.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to