Hi,

So when the parallelism of the timestamp assigner is different from the 
parallelism of the map(KeyMapFunc()) or the window then it works? But when the 
parallelism is the same it does not work?

If this is true, then I would assume, that some parallel instances of the 
timestamp assigner don't get any events and therefore don't advance the 
watermark. This, in turn, would mean that the downstream watermark also doesn't 
advance. Could you check in the web interface if all parallel instances of the 
assigner are processing elements when you have the same parallelism for all 
operations?

Best,
Aljoscha

> On 9. Aug 2017, at 11:33, aitozi <gjying1...@gmail.com> wrote:
> 
> Hi, Bellow is my code 
> 
> splitStream.select(duringTime + "")
>                .map(new KeyMapFunc())
>                .assignTimestampsAndWatermarks(new DelaySaltWatermarks())
>                .setParallelism(300)
>                .keyBy(_SQL, _KEY, _SALT)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime/10)))
>                .apply(new WindowSaltFunc())
>                .keyBy(_SQL, _KEY)
> 
> .window(TumblingEventTimeWindows.of(Time.seconds(duringTime)))
>                .apply(new WindowFunc())
>                .addSink(new FlinkKafkaProducer010<>("topic", new
> SimpleSerializationSchema(), this.properties));
> 
> and 
> 
> public class DelaySaltWatermarks implements
> AssignerWithPeriodicWatermarks<ContentMessage> {
> 
>    private long currentMaxTimestamp;
> 
>    @Nullable
>    @Override
>    public Watermark getCurrentWatermark() {
>        return new Watermark(currentMaxTimestamp - MAX_OUT_OF_ORDER);
>    }
> 
>    @Override
>    public long extractTimestamp(ContentMessage contentMessage, long l) {
>        long timestamp = contentMessage.getTimestamp();
>        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>        return timestamp;
>    }
> }
> 
> and when i changed the Parallelism(300) of assigntimestampandwatermarks ,
> the window can be fired.
> 
> thanks,
> aitozi
> 
> 
> Aljoscha Krettek wrote
>> Hi,
>> 
>> So I understood that you have roughly this pipeline:
>> 
>> Input 1 --\
>>           |- CoFlatMap - TimestampAndWatermarkAssigner - KeyBy - Window    
>> Input 2 --/
>> 
>> If the timestamp assigner is after the CoFlatMap the processInput() method
>> of the extractor should still be called. Not by the StreamInputProcessor
>> but by ChainingOutput [1], which basically connects the Two-Input
>> CoFlatMap to the one-input operator that comes after that. The could still
>> be a bug in there somewhere, however.
>> 
>> Could you maybe send me the relevant parts of your code, so that I can
>> have a look. Or provide a minimal example.
>> 
>> Best,
>> Aljoscha
>> 
>> [1]
>> https://github.com/apache/flink/blob/6f5fa7f741538207244368c275bee9958c43a25a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L394
>> 
>>> On 7. Aug 2017, at 19:21, aitozi &lt;
> 
>> gjying1314@
> 
>> &gt; wrote:
>>> 
>>> 
>>> Hi,
>>> 
>>> my flink version is 1.2
>>> 
>>> i am work on this problem these days. Below is my found.
>>> 
>>> when i use "assignTimestampsAndWatermark" with same parallelism as 240 as
>>> the before operator, the before operator has two input(it is a
>>> "connected"
>>> Co-FlatMap operator with parallelism 240), it runs into that the
>>> watermark
>>> didn't update.
>>> 
>>> the i look into the source code, that the
>>> StreamTwoInputProcessor.java#processInput called by TwoInputStreamTask
>>> has
>>> method with processElement1() and processElement2() method, but all of
>>> them
>>> do not run processElement in StreamInputProcessor to
>>> extractTimestamp(shown
>>> in TimestampsAndPeriodicWatermarksOperator)
>>> 
>>> so that, the timestamp is not update, and my waterMark is update just
>>> like
>>> the class BoundedOutOfOrdernessTimestampExtractor .
>>> 
>>> So, is it a bug that the timestamp is not update when deal with a two
>>> input
>>> stream.
>>> 
>>> Ps: my English is not very good , i dont know can you understand me :)
>>> 
>>> thanks,
>>> aitozi
>>> 
>>> 
>>> 
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14727.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/WaterMark-Eventwindow-not-fired-correctly-tp14668p14753.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com <http://nabble.com/>.

Reply via email to