I am working on a program that uses a complex window and have run into some
issues. It is a 1 hour window with 7 days allowed lateness including a custom
trigger that gives us intermediate results every 5 minutes of processing time
until the end of 7 days event time when a final fire is triggered and the
window is purged. The window functions are an incremental reduce function as
well as a RichWindowFunction which performs some final computation before
outputting each result. I am building up a collection of objects so each time
the RichWindowFunction is run I want to take a diff with the previous set to
only output elements that have changed.
Example:
//In reality I am working with more complex objects than ints.
class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], Int,
Key, TimeWindow] {
@transient var state: ListState[Int]= _
override def open(parameters: Configuration): Unit = {
val info = new ListStateDescriptor(“previous”,
createTypeInformation[Int])
state = getRuntimeContext.getListState(info)
}
override def apply(key: Key, window: TimeWindow, input:
Iterable[Collection[Int]], out: Collector[Int]): Unit = {
val current = input.iterator.next
val previous = state.get().iterator.asScala.toSet
previous.clear()
for (elem <- current) {
if (!previous.contains(elem)) {
out.collect(elem)
}
state.add(elem) //store for the next run
}
}
}
The issue with this is that it causes a memory leak with RocksDb. When the
WindowOperator executes
clearAllState<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L527>
at the end of the windows lifetime it does not clear the ListState or any
other type of custom partitioned state that may have been created. This causes
my state size to grow indefinitely. It appears to me that a RichWindowFunction
should have a clear method, similar to triggers, for cleaning up state when the
window is destroyed.
Barring that I can envision two ways of solving this problem but have come
short of successfully implementing them.
1) If I had access to the watermark from within apply I could use that in
conjuction with the TimeWindow passed in and be able to tell if it was my final
EventTimeTimer that had gone off allowing me to manually clear the state:
ie: if (watermark < window.getEnd + Time.days(7).getMilliseconds) {
state.add(elem) // I know that my window is not
finished so I can store state.
}
2) Pass my elements into a second window with a count trigger of 1 and a
custom evictor which always keeps the two most recent elements and then do my
diff there.
Semantically this seems to work but in practice it causes my checkpoint times
to grow 10x and I seem to fail every 5th-7th checkpoint.
I am curious if anyone here has any ideas of what I might be able to do to
solve this problem.
Thank you,
Seth Wiesman