Hi, I am currently working with Flink CEP version 1.17, and I am in the process of load testing for potential memory leaks related to checkpoint data. While analyzing the CepOperator code, I have come across a particular pattern regarding timer registration and event processing that I believe could potentially be optimized. Here is the current behavior I observed:
For every incoming event, CepOperator registers a timer with a timestamp t1. When the event timer is triggered at t1, CepOperator processes all the elements that have timestamps less than the current watermark. This includes events with timestamps equal to t1 but also can include events with timestamps greater than t1 but still less than the current watermark. While this behaviour is functional, it leads me to the following query: Since the events with timestamps greater than t1 and less than the watermark are already being processed by CepOperator, would it not be beneficial to delete the timers associated with these events? We are already removing timestamp mapping to events or events count via advanceTime method, why not remove timers associated with these timestamps as well. By deleting these timers, I believe we could reduce memory consumption and CPU utilization, which could be especially beneficial for high-throughput scenarios or when working with constrained resources. I would greatly appreciate your insights on this matter, and if this is something that could be considered for optimization within the Flink CEP library.