Hello all, I am using kafka and kafka-streams 1.0.0
When working on a custom Processor from which I am scheduling a punctuation using WALL_CLOCK_TIME. I've noticed that whatever the punctuation interval I set, a call to my Punctuator is always triggered immediately. Is that a bug? Having a quick look at kafka-streams' code, I could find that all PunctuationSchedule's timestamps are matched against the current time in order to decide whether or not to trigger the punctuator (org.apache.kafka.streams.processor.internals.PunctuationQueue#mayPunctuate). However, I've only seen code that initializes PunctuationSchedule's timestamp to 0, which I guess is what is causing an immediate punctuation. At least when using WALL_CLOCK_TIME, shouldn't the PunctuationSchedule's timestamp be initialized to current time + interval? I am also hitting an OutOfMemoryError when running integration tests: java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3181) at java.util.PriorityQueue.grow(PriorityQueue.java:300) at java.util.PriorityQueue.offer(PriorityQueue.java:339) at java.util.PriorityQueue.add(PriorityQueue.java:321) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:55) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:619) at org.apache.kafka.streams.processor.internals.AssignedTasks.punctuate(AssignedTasks.java:430) at org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:324) at org.apache.kafka.streams.processor.internals.StreamThread.punctuate(StreamThread.java:969) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:834) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) I am only using WALL_CLOCK_TIME punctuation type, from a single processor (4 instances are running as I have 4 partitions on the processed topic). The punctuation interval is set to 1 minute, and I am canceling the scheduler at each punctuation, re-scheduling a new one (dealing with variable intervals). Although I run my tests in a JVM with little heap space, I am wondering if there could be a memory leak around there as I've not seen where canceled PunctuationSchedule are removed from the PunctuationQueue... Thank you, Fred