Hello Fred, Thanks for reporting the issue.
1) Nice find about the punctuation start time with WALL_CLOCK_TIME type. I agree with you that this should better be initialized as current time + interval. Do you mind creating a JIRA for Kafka? And if you'd like to submit a patch for it that would be best :) 2) About the OOM error, if it is consistently reproducible could you make a thread dump upon the exception? That would help me to investigate further to see if there is anything from the Streams library behind the scene. I took a pass over the code but I cannot find any obvious leaks. Note that in the PunctuationQueue#mayPunctuate PunctuationSchedule sched = top; pq.poll(); if (!sched.isCancelled()) { processorNodePunctuator.punctuate(sched.node(), timestamp, type, sched.punctuator()); pq.add(sched.next(timestamp)); punctuated = true; } I.e. the cancelled punctuation is still popped from the queue before checking its "cancelled" flag. Guozhang On Tue, Dec 5, 2017 at 7:42 PM, frederic arno <frederica...@gmail.com> wrote: > Hello all, > > I am using kafka and kafka-streams 1.0.0 > > When working on a custom Processor from which I am scheduling a > punctuation using WALL_CLOCK_TIME. I've noticed that whatever the > punctuation interval I set, a call to my Punctuator is always > triggered immediately. Is that a bug? > > Having a quick look at kafka-streams' code, I could find that all > PunctuationSchedule's timestamps are matched against the current time > in order to decide whether or not to trigger the punctuator > (org.apache.kafka.streams.processor.internals.PunctuationQue > ue#mayPunctuate). > However, I've only seen code that initializes PunctuationSchedule's > timestamp to 0, which I guess is what is causing an immediate > punctuation. At least when using WALL_CLOCK_TIME, shouldn't the > PunctuationSchedule's timestamp be initialized to current time + > interval? > > I am also hitting an OutOfMemoryError when running integration tests: > java.lang.OutOfMemoryError: Java heap space > at java.util.Arrays.copyOf(Arrays.java:3181) > at java.util.PriorityQueue.grow(PriorityQueue.java:300) > at java.util.PriorityQueue.offer(PriorityQueue.java:339) > at java.util.PriorityQueue.add(PriorityQueue.java:321) > at org.apache.kafka.streams.processor.internals.PunctuationQueu > e.mayPunctuate(PunctuationQueue.java:55) > at org.apache.kafka.streams.processor.internals.StreamTask.mayb > ePunctuateSystemTime(StreamTask.java:619) > at org.apache.kafka.streams.processor.internals.AssignedTasks. > punctuate(AssignedTasks.java:430) > at org.apache.kafka.streams.processor.internals.TaskManager. > punctuate(TaskManager.java:324) > at org.apache.kafka.streams.processor.internals.StreamThread. > punctuate(StreamThread.java:969) > at org.apache.kafka.streams.processor.internals.StreamThread. > runOnce(StreamThread.java:834) > at org.apache.kafka.streams.processor.internals.StreamThread. > runLoop(StreamThread.java:774) > at org.apache.kafka.streams.processor.internals.StreamThread. > run(StreamThread.java:744) > > I am only using WALL_CLOCK_TIME punctuation type, from a single > processor (4 instances are running as I have 4 partitions on the > processed topic). The punctuation interval is set to 1 minute, and I > am canceling the scheduler at each punctuation, re-scheduling a new > one (dealing with variable intervals). > Although I run my tests in a JVM with little heap space, I am > wondering if there could be a memory leak around there as I've not > seen where canceled PunctuationSchedule are removed from the > PunctuationQueue... > > Thank you, Fred > -- -- Guozhang