[ https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066521#comment-16066521 ]
ASF GitHub Bot commented on FLINK-7008: --------------------------------------- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/4195#discussion_r124550121 --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java --- @@ -297,6 +297,85 @@ public void testKeyedAdvancingTimeWithoutElements() throws Exception { } @Test + public void testKeyedCEPOperatorNFAChanged() throws Exception { --- End diff -- I meant rather checking if the resetting for the flag works correct. We could do it by veryfying correct number of invocations of `update` method. Also we should test it with both RocksDB and InMemory state backends. We can do it with Mockito. Just a suggestion for the code: @Test public void testKeyedCEPOperatorNFAChanged() throws Exception { String rocksDbPath = tempFolder.newFolder().getAbsolutePath(); RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend()); rocksDBStateBackend.setDbStoragePath(rocksDbPath); KeyedCEPPatternOperator<Event, Integer> operator = new KeyedCEPPatternOperator<>( Event.createTypeSerializer(), true, IntSerializer.INSTANCE, new SimpleNFAFactory(), true); OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(operator); try { harness.setStateBackend(rocksDBStateBackend); harness.open(); final ValueState nfaOperatorState = Whitebox.<ValueState>getInternalState(operator, "nfaOperatorState"); final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState); Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy); Event startEvent = new Event(42, "c", 1.0); SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0); Event endEvent = new Event(42, "b", 1.0); harness.processElement(new StreamRecord<>(startEvent, 1L)); harness.processElement(new StreamRecord<>(new Event(42, "d", 1.0), 4L)); harness.processElement(new StreamRecord<Event>(middleEvent, 4L)); harness.processElement(new StreamRecord<>(endEvent, 4L)); Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any()); // get and verify the output Queue<Object> result = harness.getOutput(); assertEquals(1, result.size()); verifyPattern(result.poll(), startEvent, middleEvent, endEvent); } finally { harness.close(); } } > Update NFA state only when the NFA changes. > ------------------------------------------- > > Key: FLINK-7008 > URL: https://issues.apache.org/jira/browse/FLINK-7008 > Project: Flink > Issue Type: Improvement > Components: CEP > Affects Versions: 1.3.1 > Reporter: Kostas Kloudas > Assignee: Dian Fu > Fix For: 1.4.0 > > > Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we > update the NFA state every time the NFA is touched. This leads to redundant > puts/gets to the state when there are no changes to the NFA itself. -- This message was sent by Atlassian JIRA (v6.4.14#64029)