[ https://issues.apache.org/jira/browse/FLINK-9642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16521380#comment-16521380 ]
ASF GitHub Bot commented on FLINK-9642: --------------------------------------- Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/6205#discussion_r197629205 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java --- @@ -346,16 +351,87 @@ private void lockEvent(EventId eventId) throws Exception { * @throws Exception Thrown if the system cannot access the state. */ public void releaseEvent(EventId eventId) throws Exception { - Lockable<V> eventWrapper = eventsBuffer.get(eventId); + Lockable<V> eventWrapper = getEvent(eventId); if (eventWrapper != null) { if (eventWrapper.release()) { eventsBuffer.remove(eventId); + eventsBufferCache.remove(eventId); } else { - eventsBuffer.put(eventId, eventWrapper); + cacheEvent(eventId, eventWrapper); } } } + // Cache related method + + ///////////////////////////////////////////// + // Put + ///////////////////////////////////////////// + + /** + * Put a event to cache. + * @param eventId id of the event + * @param event event body + */ + private void cacheEvent(EventId eventId, Lockable<V> event) { + this.eventsBufferCache.put(eventId, event); + } + + /** + * Put a ShareBufferNode to cache. + * @param nodeId id of the event + * @param entry SharedBufferNode + */ + private void cacheEntry(NodeId nodeId, Lockable<SharedBufferNode> entry) { + this.entryCache.put(nodeId, entry); + } + + ///////////////////////////////////////////// + // Get + ///////////////////////////////////////////// + + /** + * Try to get the sharedBufferNode from state iff the node has not been quered during this turn process. + * @param nodeId id of the event --- End diff -- it means `if and only if`. > Reduce the count to deal with state during a CEP process > --------------------------------------------------------- > > Key: FLINK-9642 > URL: https://issues.apache.org/jira/browse/FLINK-9642 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.6.0 > Reporter: aitozi > Assignee: aitozi > Priority: Major > Labels: pull-request-available > > With the rework of sharedBuffer Flink-9418, the lock & release operation is > deal with rocksdb state which is different from the previous version which > will read the state of sharedBuffer all to memory, i think we can add a cache > or variable in sharedbuffer to cache the LockAble Object to mark the ref > change in once process in NFA, this will reduce the count when the events > point to the same NodeId.. And flush the result to MapState at the end of > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)