Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3477#discussion_r105854967
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -247,27 +275,148 @@ public int compare(final StateTransition<T> o1, 
final StateTransition<T> o2) {
         * @return Collection of computation states which result from the 
current one
         */
        private Collection<ComputationState<T>> computeNextStates(
    -                   final ComputationState<T> computationState,
    -                   final T event,
    -                   final long timestamp) {
    -           Stack<State<T>> states = new Stack<>();
    -           ArrayList<ComputationState<T>> resultingComputationStates = new 
ArrayList<>();
    -           State<T> state = computationState.getState();
    +           final ComputationState<T> computationState,
    +           final T event,
    +           final long timestamp) {
    +           final ArrayList<ComputationState<T>> resultingComputationStates 
= new ArrayList<>();
    +
    +           final OutgoingEdges<T> outgoingEdges = 
createDecisionGraph(computationState, event);
    +
    +           // Create the computing version based on the previously 
computed edges
    +           // We need to defer the creation of computation states until we 
know how many edges start
    +           // at this computation state so that we can assign proper 
version
    +           final List<StateTransition<T>> edges = outgoingEdges.getEdges();
    +           Integer takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
    +           Integer ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
    +           for (StateTransition<T> edge : edges) {
    +                   switch (edge.getAction()) {
    +                           case IGNORE: {
    +                                   if (!computationState.isStartState()) {
    +                                           final DeweyNumber version;
    +                                           if 
(!isEquivalentState(edge.getTargetState(), computationState.getState())) {
    +                                                   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
    +                                                   ignoreBranchesToVisit--;
    +                                           } else {
    +                                                   final int toIncrease = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
    +                                                           
outgoingEdges.getTotalTakeBranches());
    +                                                   version = 
computationState.getVersion().increase(toIncrease);
    +                                           }
    +
    +                                           resultingComputationStates.add(
    +                                                   
ComputationState.createState(
    +                                                           
edge.getTargetState(),
    +                                                           
computationState.getPreviousState(),
    +                                                           
computationState.getEvent(),
    +                                                           
computationState.getTimestamp(),
    +                                                           version,
    +                                                           
computationState.getStartTimestamp()
    +                                                   )
    +                                           );
    +                                           sharedBuffer.lock(
    +                                                   
edge.getTargetState().getName(),
    +                                                   
computationState.getEvent(),
    +                                                   
computationState.getTimestamp());
    +                                   }
    +                           }
    +                           break;
    +                           case TAKE:
    +                                   final State<T> newState = 
edge.getTargetState();
    +                                   final State<T> consumingState = 
edge.getSourceState();
    +                                   final State<T> previousEventState = 
computationState.getPreviousState();
    +
    +                                   final T previousEvent = 
computationState.getEvent();
    +                                   final DeweyNumber currentVersion = 
computationState.getVersion();
    +
    +                                   final DeweyNumber 
newComputationStateVersion = new 
DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
    +                                   takeBranchesToVisit--;
    +
    +                                   final long startTimestamp;
    +                                   if (computationState.isStartState()) {
    +                                           startTimestamp = timestamp;
    +                                           sharedBuffer.put(
    +                                                   
consumingState.getName(),
    +                                                   event,
    +                                                   timestamp,
    +                                                   currentVersion);
    +                                   } else {
    +                                           startTimestamp = 
computationState.getStartTimestamp();
    +                                           sharedBuffer.put(
    +                                                   
consumingState.getName(),
    +                                                   event,
    +                                                   timestamp,
    +                                                   
previousEventState.getName(),
    +                                                   previousEvent,
    +                                                   
computationState.getTimestamp(),
    +                                                   currentVersion);
    +                                   }
    +
    +                                   // a new computation state is referring 
to the shared entry
    +                                   
sharedBuffer.lock(consumingState.getName(), event, timestamp);
    +
    +                                   
resultingComputationStates.add(ComputationState.createState(
    +                                           newState,
    +                                           consumingState,
    +                                           event,
    +                                           timestamp,
    +                                           newComputationStateVersion,
    +                                           startTimestamp
    +                                   ));
    +                                   break;
    +                   }
    +           }
     
    -           states.push(state);
    +           if (computationState.isStartState()) {
    +                   final int totalBranches = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), 
outgoingEdges.getTotalTakeBranches());
    +                   final ComputationState<T> startState = 
createStartState(computationState, totalBranches);
    +                   resultingComputationStates.add(startState);
    +           }
    +
    +           if (computationState.getEvent() != null) {
    +                   // release the shared entry referenced by the current 
computation state.
    +                   sharedBuffer.release(
    +                           computationState.getState().getName(),
    +                           computationState.getEvent(),
    +                           computationState.getTimestamp());
    +                   // try to remove unnecessary shared buffer entries
    +                   sharedBuffer.remove(
    +                           computationState.getState().getName(),
    +                           computationState.getEvent(),
    +                           computationState.getTimestamp());
    +           }
    +
    +           return resultingComputationStates;
    +   }
    +
    +   private int calculateIncreasingSelfState(int ignoreBranches, int 
takeBranches) {
    +           if (takeBranches == 0 && ignoreBranches == 0) {
    +                   return 0;
    +           }
    +
    +           return ignoreBranches + 1;
    +   }
    +
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to