[ 
https://issues.apache.org/jira/browse/FLINK-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066521#comment-16066521
 ] 

ASF GitHub Bot commented on FLINK-7008:
---------------------------------------

Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4195#discussion_r124550121
  
    --- Diff: 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 ---
    @@ -297,6 +297,85 @@ public void testKeyedAdvancingTimeWithoutElements() 
throws Exception {
        }
     
        @Test
    +   public void testKeyedCEPOperatorNFAChanged() throws Exception {
    --- End diff --
    
    I meant rather checking if the resetting for the flag works correct. We 
could do it by veryfying correct number of invocations of `update` method. Also 
we should test it with both RocksDB and InMemory state backends.
    
    We can do it with Mockito. Just a suggestion for the code:
    
        @Test
        public void testKeyedCEPOperatorNFAChanged() throws Exception {
    
                String rocksDbPath = tempFolder.newFolder().getAbsolutePath();
                RocksDBStateBackend rocksDBStateBackend = new 
RocksDBStateBackend(new MemoryStateBackend());
                rocksDBStateBackend.setDbStoragePath(rocksDbPath);
    
                KeyedCEPPatternOperator<Event, Integer> operator = new 
KeyedCEPPatternOperator<>(
                        Event.createTypeSerializer(),
                        true,
                        IntSerializer.INSTANCE,
                        new SimpleNFAFactory(),
                        true);
                OneInputStreamOperatorTestHarness<Event, Map<String, 
List<Event>>> harness = getCepTestHarness(operator);
    
                try {
                        harness.setStateBackend(rocksDBStateBackend);
    
                        harness.open();
    
                        final ValueState nfaOperatorState = 
Whitebox.<ValueState>getInternalState(operator, "nfaOperatorState");
                        final ValueState nfaOperatorStateSpy = 
Mockito.spy(nfaOperatorState);
                        Whitebox.setInternalState(operator, "nfaOperatorState", 
nfaOperatorStateSpy);
    
                        Event startEvent = new Event(42, "c", 1.0);
                        SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
                        Event endEvent = new Event(42, "b", 1.0);
    
                        harness.processElement(new StreamRecord<>(startEvent, 
1L));
                        harness.processElement(new StreamRecord<>(new Event(42, 
"d", 1.0), 4L));
                        harness.processElement(new 
StreamRecord<Event>(middleEvent, 4L));
                        harness.processElement(new StreamRecord<>(endEvent, 
4L));
    
                        Mockito.verify(nfaOperatorStateSpy, 
Mockito.times(3)).update(Mockito.any());
                        // get and verify the output
    
                        Queue<Object> result = harness.getOutput();
    
                        assertEquals(1, result.size());
    
                        verifyPattern(result.poll(), startEvent, middleEvent, 
endEvent);
                } finally {
                        harness.close();
                }
        }


> Update NFA state only when the NFA changes.
> -------------------------------------------
>
>                 Key: FLINK-7008
>                 URL: https://issues.apache.org/jira/browse/FLINK-7008
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>    Affects Versions: 1.3.1
>            Reporter: Kostas Kloudas
>            Assignee: Dian Fu
>             Fix For: 1.4.0
>
>
> Currently in the {{AbstractKeyedCEPPatternOperator.updateNFA()}} method we 
> update the NFA state every time the NFA is touched. This leads to redundant 
> puts/gets to the state when there are no changes to the NFA itself.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to