Hi, I am noticing that the checkpointing state has been constantly growing for the below subtask. Only the current active window elements should be checkpointed ? why is it constantly growing ?
finalStream.keyBy("<>").countWindow(2,1) .apply((_, _, input: scala.Iterable[], out: Collector[]) => { val inputArray = input.toArray ... do something }