Hello all,

I am using kafka and kafka-streams 1.0.0

When working on a custom Processor from which I am scheduling a
punctuation using WALL_CLOCK_TIME. I've noticed that whatever the
punctuation interval I set, a call to my Punctuator is always
triggered immediately. Is that a bug?

Having a quick look at kafka-streams' code, I could find that all
PunctuationSchedule's timestamps are matched against the current time
in order to decide whether or not to trigger the punctuator
(org.apache.kafka.streams.processor.internals.PunctuationQueue#mayPunctuate).
However, I've only seen code that initializes PunctuationSchedule's
timestamp to 0, which I guess is what is causing an immediate
punctuation. At least when using WALL_CLOCK_TIME, shouldn't the
PunctuationSchedule's timestamp be initialized to current time +
interval?

I am also hitting an OutOfMemoryError when running integration tests:
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3181)
        at java.util.PriorityQueue.grow(PriorityQueue.java:300)
        at java.util.PriorityQueue.offer(PriorityQueue.java:339)
        at java.util.PriorityQueue.add(PriorityQueue.java:321)
        at 
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:55)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateSystemTime(StreamTask.java:619)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.punctuate(AssignedTasks.java:430)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:324)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.punctuate(StreamThread.java:969)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:834)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

I am only using WALL_CLOCK_TIME punctuation type, from a single
processor (4 instances are running as I have 4 partitions on the
processed topic). The punctuation interval is set to 1 minute, and I
am canceling the scheduler at each punctuation, re-scheduling a new
one (dealing with variable intervals).
Although I run my tests in a JVM with little heap space, I am
wondering if there could be a memory leak around there as I've not
seen where canceled PunctuationSchedule are removed from the
PunctuationQueue...

Thank you, Fred

Reply via email to