Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/3477#discussion_r106106765 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java --- @@ -74,88 +76,233 @@ */ @SuppressWarnings("unchecked") public static <T> NFAFactory<T> compileFactory( - Pattern<T, ?> pattern, - TypeSerializer<T> inputTypeSerializer, + final Pattern<T, ?> pattern, + final TypeSerializer<T> inputTypeSerializer, boolean timeoutHandling) { if (pattern == null) { // return a factory for empty NFAs - return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling); + return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling); } else { - // set of all generated states - Map<String, State<T>> states = new HashMap<>(); - long windowTime; + final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern); + nfaFactoryCompiler.compileFactory(); + return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling); + } + } - // this is used to enforse pattern name uniqueness. - Set<String> patternNames = new HashSet<>(); + private static class NFAFactoryCompiler<T> { - Pattern<T, ?> succeedingPattern; - State<T> succeedingState; - Pattern<T, ?> currentPattern = pattern; + private final Set<String> usedNames = new HashSet<>(); + private final List<State<T>> states = new ArrayList<>(); + private long windowTime = 0; + private Pattern<T, ?> currentPattern; + + NFAFactoryCompiler(final Pattern<T, ?> pattern) { + this.currentPattern = pattern; + } + + /** + * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create + * multiple NFAs. + */ + void compileFactory() { // we're traversing the pattern from the end to the beginning --> the first state is the final state - State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final); - patternNames.add(currentPattern.getName()); + State<T> sinkState = createEndingState(); + // add all the normal states + sinkState = createMiddleStates(sinkState); + // add the beginning state + createStartState(sinkState); + } + + List<State<T>> getStates() { + return states; + } - states.put(currentPattern.getName(), currentState); + long getWindowTime() { + return windowTime; + } + + private State<T> createEndingState() { + State<T> sinkState = new State<>(ENDING_STATE_NAME, State.StateType.Final); + states.add(sinkState); + usedNames.add(ENDING_STATE_NAME); windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L; + return sinkState; + } - while (currentPattern.getPrevious() != null) { - succeedingPattern = currentPattern; - succeedingState = currentState; - currentPattern = currentPattern.getPrevious(); + private State<T> createMiddleStates(final State<T> sinkState) { - if (!patternNames.add(currentPattern.getName())) { - throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " + - "Pattern names must be unique."); + State<T> lastSink = sinkState; + while (currentPattern.getPrevious() != null) { + State<T> sourceState; --- End diff -- You do not need to declare it here and set it 3 lines below, you can just create the variable later when you set it: `State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);`
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---