Hi, Bellow is my code splitStream.select(duringTime + "") .map(new KeyMapFunc()) .assignTimestampsAndWatermarks(new DelaySaltWatermarks()) .setParallelism(300) .keyBy(_SQL, _KEY, _SALT) .window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10))) .apply(new WindowSaltFunc()) .keyBy(_SQL, _KEY) .window(TumblingEventTimeWindows.of(Time.seconds(duringTime))) .apply(new WindowFunc()) .addSink(new FlinkKafkaProducer010<>("topic", new SimpleSerializationSchema(), this.properties));
and public class DelaySaltWatermarks implements AssignerWithPeriodicWatermarks<ContentMessage> { private long currentMaxTimestamp; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER); } @Override public long extractTimestamp(ContentMessage contentMessage, long l) { long timestamp = contentMessage.getTimestamp(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } } and when i changed the Parallelism(300) of assigntimestampandwatermarks , the window can be fired. thanks, aitozi Aljoscha Krettek wrote > Hi, > > So I understood that you have roughly this pipeline: > > Input 1 --\ > |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window > Input 2 --/ > > If the timestamp assigner is after the CoFlatMap the processInput() method > of the extractor should still be called. Not by the StreamInputProcessor > but by ChainingOutput [1], which basically connects the Two-Input > CoFlatMap to the one-input operator that comes after that. The could still > be a bug in there somewhere, however. > > Could you maybe send me the relevant parts of your code, so that I can > have a look. Or provide a minimal example. > > Best, > Aljoscha > > [1] > https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394 > >> On 7. Aug 2017, at 19:21, aitozi < > gjying1314@ > > wrote: >> >> >> Hi, >> >> my flink version is 1.2 >> >> i am work on this problem these days. Below is my found. >> >> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as >> the before operator, the before operator has two input(it is a >> "connected" >> Co-FlatMap operator with parallelism 240), it runs into that the >> watermark >> didn't update. >> >> the i look into the source code, that the >> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask >> has >> method with processElement1() and processElement2() method, but all of >> them >> do not run processElement in StreamInputProcessor to >> extractTimestamp(shown >> in TimestampsAndPeriodicWatermarksOperator) >> >> so that, the timestamp is not update, and my waterMark is update just >> like >> the class BoundedOutOfOrdernessTimestampExtractor . >> >> So, is it a bug that the timestamp is not update when deal with a two >> input >> stream. >> >> Ps: my English is not very good , i dont know can you understand me :) >> >> thanks, >> aitozi >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.