We couldn't put the map phase in between working with stream transformation classes and it created a dangling Mapper - but doing partitioner/tranformation with the window operator worked.
WindowOperator operator = ... KeyGroupStreamPartitioner<MetricSignalSet, String> partitioner = new KeyGroupStreamPartitioner<MetricSignalSet, String>(new DimensionKeySelector<MetricSignalSet>(config), parallel); PartitionTransformation<MetricSignalSet> partitioned = new PartitionTransformation<MetricSignalSet> (inputs, partitioner) ; OneInputTransformation<MetricSignalSet, MetricSignalSet> trans = new OneInputTransformation<MetricSignalSet, MetricSignalSet>( partitioned, name, operator, ess, parallel); trans.setStateKeySelector(new DimensionKeySelector<MetricSignalSet>(config)); trans.setStateKeyType(new GenericTypeInfo<String>(String.class)); -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Function-on-AllWindowed-Stream-Combining-Kafka-Topics-tp12941p14373.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.