Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3477#discussion_r105700201
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -247,27 +275,148 @@ public int compare(final StateTransition<T> o1, 
final StateTransition<T> o2) {
         * @return Collection of computation states which result from the 
current one
         */
        private Collection<ComputationState<T>> computeNextStates(
    -                   final ComputationState<T> computationState,
    -                   final T event,
    -                   final long timestamp) {
    -           Stack<State<T>> states = new Stack<>();
    -           ArrayList<ComputationState<T>> resultingComputationStates = new 
ArrayList<>();
    -           State<T> state = computationState.getState();
    +           final ComputationState<T> computationState,
    +           final T event,
    +           final long timestamp) {
    +           final ArrayList<ComputationState<T>> resultingComputationStates 
= new ArrayList<>();
    +
    +           final OutgoingEdges<T> outgoingEdges = 
createDecisionGraph(computationState, event);
    +
    +           // Create the computing version based on the previously 
computed edges
    +           // We need to defer the creation of computation states until we 
know how many edges start
    +           // at this computation state so that we can assign proper 
version
    +           final List<StateTransition<T>> edges = outgoingEdges.getEdges();
    +           Integer takeBranchesToVisit = Math.max(0, 
outgoingEdges.getTotalTakeBranches() - 1);
    +           Integer ignoreBranchesToVisit = 
outgoingEdges.getTotalIgnoreBranches();
    +           for (StateTransition<T> edge : edges) {
    +                   switch (edge.getAction()) {
    +                           case IGNORE: {
    +                                   if (!computationState.isStartState()) {
    +                                           final DeweyNumber version;
    +                                           if 
(!isEquivalentState(edge.getTargetState(), computationState.getState())) {
    +                                                   version = 
computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
    +                                                   ignoreBranchesToVisit--;
    +                                           } else {
    +                                                   final int toIncrease = 
calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(),
    --- End diff --
    
    Move both the arguments to different lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to