[ https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139638#comment-16139638 ]
Dawid Wysakowicz commented on FLINK-7129: ----------------------------------------- That was my initial plan to use the {{connect}} as you described, but unfortunately the key is not propagated then, and we cannot use a {{KeyedStateBackend}} in the {{operator}}. Recently I gave it another go and I managed to somehow glue it, but it would require some changes {{flink-streaming-java}} module (as the ctor of {{SingleOutputtStreamOperator}} is protected). Also it is based on the assumption that the {{KeySelector}} of the second {{inputStream}} is only used when accessing {{KeyedState}} in {{processElement2}}. I would love to hear opinions on that approach from [~kkl0u] and [~aljoscha]. This is the interesting part of creating `CoCepOperator`: {code} KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream; TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig()); TwoInputTransformation<T, Pattern<T, ?>, Map<String, List<T>>> transform = new TwoInputTransformation<>( keyedStream.getTransformation(), dynamicPatternsStream.broadcast().getTransformation(), "KeyedCEPCoPatternOperator", new CoCepPatternOperator<>( inputSerializer, isProcessingTime, keySerializer, nfaFactory, true), (TypeInformation<Map<String, List<T>>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class), keyedStream.getExecutionEnvironment().getParallelism()); transform.setStateKeySelectors(keyedStream.getKeySelector(),null); transform.setStateKeyType(keyedStream.getKeyType()); patternStream = new SingleOutputStreamOperator(keyedStream.getExecutionEnvironment(), transform); keyedStream.getExecutionEnvironment().addOperator(transform); {code} I have some WIP dynamic cep patterns in my branch: https://github.com/dawidwys/flink/tree/cep-dynamic-nfa > Dynamically changing patterns > ----------------------------- > > Key: FLINK-7129 > URL: https://issues.apache.org/jira/browse/FLINK-7129 > Project: Flink > Issue Type: New Feature > Components: CEP > Reporter: Dawid Wysakowicz > Assignee: Dawid Wysakowicz > > An umbrella task for introducing mechanism for injecting patterns through > coStream -- This message was sent by Atlassian JIRA (v6.4.14#64029)