Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3477#discussion_r105854967 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -247,27 +275,148 @@ public int compare(final StateTransition<T> o1, final StateTransition<T> o2) { * @return Collection of computation states which result from the current one */ private Collection<ComputationState<T>> computeNextStates( - final ComputationState<T> computationState, - final T event, - final long timestamp) { - Stack<State<T>> states = new Stack<>(); - ArrayList<ComputationState<T>> resultingComputationStates = new ArrayList<>(); - State<T> state = computationState.getState(); + final ComputationState<T> computationState, + final T event, + final long timestamp) { + final ArrayList<ComputationState<T>> resultingComputationStates = new ArrayList<>(); + + final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event); + + // Create the computing version based on the previously computed edges + // We need to defer the creation of computation states until we know how many edges start + // at this computation state so that we can assign proper version + final List<StateTransition<T>> edges = outgoingEdges.getEdges(); + Integer takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1); + Integer ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches(); + for (StateTransition<T> edge : edges) { + switch (edge.getAction()) { + case IGNORE: { + if (!computationState.isStartState()) { + final DeweyNumber version; + if (!isEquivalentState(edge.getTargetState(), computationState.getState())) { + version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage(); + ignoreBranchesToVisit--; + } else { + final int toIncrease = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), + outgoingEdges.getTotalTakeBranches()); + version = computationState.getVersion().increase(toIncrease); + } + + resultingComputationStates.add( + ComputationState.createState( + edge.getTargetState(), + computationState.getPreviousState(), + computationState.getEvent(), + computationState.getTimestamp(), + version, + computationState.getStartTimestamp() + ) + ); + sharedBuffer.lock( + edge.getTargetState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); + } + } + break; + case TAKE: + final State<T> newState = edge.getTargetState(); + final State<T> consumingState = edge.getSourceState(); + final State<T> previousEventState = computationState.getPreviousState(); + + final T previousEvent = computationState.getEvent(); + final DeweyNumber currentVersion = computationState.getVersion(); + + final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit); + takeBranchesToVisit--; + + final long startTimestamp; + if (computationState.isStartState()) { + startTimestamp = timestamp; + sharedBuffer.put( + consumingState.getName(), + event, + timestamp, + currentVersion); + } else { + startTimestamp = computationState.getStartTimestamp(); + sharedBuffer.put( + consumingState.getName(), + event, + timestamp, + previousEventState.getName(), + previousEvent, + computationState.getTimestamp(), + currentVersion); + } + + // a new computation state is referring to the shared entry + sharedBuffer.lock(consumingState.getName(), event, timestamp); + + resultingComputationStates.add(ComputationState.createState( + newState, + consumingState, + event, + timestamp, + newComputationStateVersion, + startTimestamp + )); + break; + } + } - states.push(state); + if (computationState.isStartState()) { + final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches()); + final ComputationState<T> startState = createStartState(computationState, totalBranches); + resultingComputationStates.add(startState); + } + + if (computationState.getEvent() != null) { + // release the shared entry referenced by the current computation state. + sharedBuffer.release( + computationState.getState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); + // try to remove unnecessary shared buffer entries + sharedBuffer.remove( + computationState.getState().getName(), + computationState.getEvent(), + computationState.getTimestamp()); + } + + return resultingComputationStates; + } + + private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) { + if (takeBranches == 0 && ignoreBranches == 0) { + return 0; + } + + return ignoreBranches + 1; + } + --- End diff -- Done
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---