[ https://issues.apache.org/jira/browse/FLINK-32701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-32701: ----------------------------------- Labels: CEP cep stale-critical (was: CEP cep) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Critical but is unassigned and neither itself nor its Sub-Tasks have been updated for 14 days. I have gone ahead and marked it "stale-critical". If this ticket is critical, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Potential Memory Leak in Flink CEP due to Persistent Starting States in > NFAState > -------------------------------------------------------------------------------- > > Key: FLINK-32701 > URL: https://issues.apache.org/jira/browse/FLINK-32701 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.17.0, 1.16.1, 1.16.2, 1.17.1 > Reporter: Puneet Duggal > Priority: Critical > Labels: CEP, cep, stale-critical > Attachments: Screenshot 2023-07-26 at 11.45.06 AM.png, Screenshot > 2023-07-26 at 11.50.28 AM.png > > > Our team has encountered a potential memory leak issue while working with the > Complex Event Processing (CEP) library in Flink v1.17. > h2. Context > The CEP Operator maintains a keyed state called NFAState, which holds two > queues: one for partial matches and one for completed matches. When a key is > first encountered, the CEP creates a starting computation state and stores it > in the partial matches queue. As more events occur that match the defined > conditions (e.g., a TAKE condition), additional computation states get added > to the queue, with their specific type (normal, pending, end) depending on > the pattern sequence. > However, I have noticed that the starting computation state remains in the > partial matches queue even after the pattern sequence has been completely > matched. This is also the case for keys that have already timed out. As a > result, the state gets stored for all keys that the CEP ever encounters, > leading to a continual increase in the checkpoint size. > h2. How to reproduce this > # Pattern Sequence - A not_followed_by B within 5 mins > # Time Characteristic - EventTime > # StateBackend - FsStateBackend > On my local machine, I started this pipeline and started sending events at > the rate of 10 events per second (only A) and as expected after 5 mins, CEP > started sending pattern matched output with the same rate. But the issue was > that after every 2 mins (checkpoint interval), checkpoint size kept on > increasing. Expectation was that after 5 mins (2-3 checkpoints), checkpoint > size will remain constant since any window of 5 mins will consist of the same > number of unique keys (older ones will get matched or timed out hence removed > from state). But as you can see below attached images, checkpoint size kept > on increasing till 40 checkpoints (around 1.5hrs). > P.S. - After 3 checkpoints (6 mins), the checkpoint size was around 1.78MB. > Hence assumption is that ideal checkpoint size for a 5 min window should be > less than 1.78MB. > As you can see after 39 checkpoints, I triggered a savepoint for this > pipeline. After that I used a savepoint reader to investigate what all is > getting stored in CEP states. Below code investigates NFAState of CEPOperator > for potential memory leak. > {code:java} > import lombok.AllArgsConstructor; > import lombok.Data; > import lombok.NoArgsConstructor; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.cep.nfa.NFAState; > import org.apache.flink.cep.nfa.NFAStateSerializer; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.state.api.OperatorIdentifier; > import org.apache.flink.state.api.SavepointReader; > import org.apache.flink.state.api.functions.KeyedStateReaderFunction; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.util.Collector; > import org.junit.jupiter.api.Test; > import java.io.Serializable; > import java.util.Objects; > public class NFAStateReaderTest { > private static final String NFA_STATE_NAME = "nfaStateName"; > @Test > public void testNfaStateReader() throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > SavepointReader savepointReader = > SavepointReader.read(environment, > "file:///opt/flink/savepoints/savepoint-093404-9bc0a38654df", new > FsStateBackend("file:///abc")); > DataStream<NFAStateOutput> stream = > savepointReader.readKeyedState(OperatorIdentifier.forUid("select_pattern_events"), > new NFAStateReaderTest.NFAStateReaderFunction()); > stream.print(); > environment.execute(); > } > static class NFAStateReaderFunction extends > KeyedStateReaderFunction<DynamicTuple, NFAStateOutput> { > private ValueState<NFAState> computationStates; > private static Long danglingNfaCount = 0L; > private static Long newNfaCount = 0L; > private static Long minTimestamp = Long.MAX_VALUE; > private static Long minKeyForCurrentNfa = Long.MAX_VALUE; > private static Long minKeyForDanglingNfa = Long.MAX_VALUE; > private static Long maxKeyForDanglingNfa = Long.MIN_VALUE; > private static Long maxKeyForCurrentNfa = Long.MIN_VALUE; > @Override > public void open(Configuration parameters) { > computationStates = getRuntimeContext().getState(new > ValueStateDescriptor<>(NFA_STATE_NAME, new NFAStateSerializer())); > } > @Override > public void readKey(DynamicTuple key, Context ctx, > Collector<NFAStateOutput> out) throws Exception { > NFAState nfaState = computationStates.value(); > if > (Objects.requireNonNull(nfaState.getPartialMatches().peek()).getStartTimestamp() > != -1) { > minTimestamp = Math.min(minTimestamp, > nfaState.getPartialMatches().peek().getStartTimestamp()); > minKeyForCurrentNfa = Math.min(minKeyForCurrentNfa, > Long.parseLong(key.getTuple().getField(0))); > maxKeyForCurrentNfa = Math.max(maxKeyForCurrentNfa, > Long.parseLong(key.getTuple().getField(0))); > newNfaCount++; > } else { > danglingNfaCount++; > minKeyForDanglingNfa = Math.min(minKeyForDanglingNfa, > Long.parseLong(key.getTuple().getField(0))); > maxKeyForDanglingNfa = Math.max(maxKeyForDanglingNfa, > Long.parseLong(key.getTuple().getField(0))); > } > NFAStateOutput nfaStateOutput = > new NFAStateOutput( > danglingNfaCount, > minTimestamp, > newNfaCount, > minKeyForCurrentNfa, > maxKeyForCurrentNfa, > minKeyForDanglingNfa, > maxKeyForDanglingNfa); > out.collect(nfaStateOutput); > } > } > @Data > @NoArgsConstructor > @AllArgsConstructor > static class NFAStateOutput implements Serializable { > private Long danglingNfaCount; > private Long minTimestamp; > private Long newNfaCount; > private Long minKeyForCurrentNfa; > private Long maxKeyForCurrentNfa; > private Long minKeyForDanglingNfa; > private Long maxKeyForDanglingNfa; > } > } > {code} > > As an output it printed nfaStateOutput for each key but since all the > attributes in nfaStateOutput are aggregates, hence finalOutput printed was > {code:java} > NFAStateReaderTest.NFAStateOutput(danglingNfaCount=34391, > minTimestamp=1690359951958, newNfaCount=3000, minKeyForCurrentNfa=6244230, > maxKeyForCurrentNfa=6247229, minKeyForDanglingNfa=629818, > maxKeyForDanglingNfa=6244229){code} > > As we can see, checkpoint is storing approximately 34391 dangling states (for > keys which have expired (matched or timed out) ) whereas there are only 3000 > active keys (for which there are partial matches which are eligible for > further pattern sequence matching) which is expected since throughput is 10 > events per second which amounts to 3000 unique keys in 5 mins. > h2. Questions > Hence, I am curious about the reasoning behind this design choice, > specifically why the starting state remains in the partial matches queue for > all keys, even those that have either timed out or completed their matches. > Additionally, I am wondering what the implications would be if we were to > delete this starting state assuming that > # it is the only state left in the partial match queue. > # The completed match queue in nfaState is empty. -- This message was sent by Atlassian Jira (v8.20.10#820010)