Thanks, I feel I'm getting closer to the truth. 

So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 
tasks running after `keyBy` if even all elements have the same key so go to 1 
down stream(say task 1)? And it is the other task(task 2) with no incoming data 
that caused the `timeWindowAll` stream unable to progress? Because both task 1 
and task 2 are its input streams and one is idling so its event time cannot 
make progress?

On 2019/04/22 01:57:39, Guowei Ma <guowei....@gmail.com> wrote: 
> HI,
> 
> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
> receives an element.
> 
> For after Keyby:
> Flink uses the HashCode of key and the parallelism of down stream to decide
> which subtask would receive the element. This means if your key is always
> same, all the sources will only send the elements to the same down stream
> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> 
> For before Keyby:
> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
> be chained together, which means every
> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> 
> Best,
> Guowei
> 
> 
> an0 <an0...@gmail.com> 于2019年4月19日周五 下午10:41写道:
> 
> > Hi,
> >
> > First of all, thank you for the `shuffle()` tip. It works. However, I
> > still don't understand why it doesn't work without calling `shuffle()`.
> >
> > Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> > All the trips has keys and timestamps. As I said in my reply to Paul, I see
> > the same watermarks being extracted.
> >
> > How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> > matter? My understanding is any specific window for a specific key always
> > receives the exactly same data, and the calling order of
> > `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
> >
> > To make `keyBy` as irrelevant as possible, I tried letting it always
> > return the same key so that there is only 1 keyed stream and it is exactly
> > the same as the original unkeyed stream. It still doesn't trigger windows:
> > ```java
> > DataStream<Trip> trips = env.addSource(consumer);
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
> > DataStream<Trip> featurizedUserTrips =
> >         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> >     @Override
> >     public long extractTimestamp(Trip trip) {
> >         return trip.endTime.getTime();
> >     }
> > });
> > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> >         Time.days(1));
> > ```
> >
> > It makes no sense to me. Please help me understand why it doesn't work.
> > Thanks!
> >
> > On 2019/04/19 04:14:31, Guowei Ma <guowei....@gmail.com> wrote:
> > > Hi,
> > > After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> > > could receive the elements(trip). If that is the case
> > > BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> > > would not send the WM. Since that the timeWindowAll operator could not be
> > > triggered.
> > > You could add a shuffle() before the assignTimestampsAndWatermarks in
> > your
> > > second case and check if the window is triggered.  If it could be
> > triggered
> > > you could check the distribution of elements generated by the source.
> > >
> > > Best,
> > > Guowei
> > >
> > >
> > > an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道:
> > >
> > > > I don't think it is the watermark. I see the same watermarks from the
> > two
> > > > versions of code.
> > > >
> > > > The processing on the keyed stream doesn't change event time at all. I
> > can
> > > > simply change my code to use `map` on the keyed stream to return back
> > the
> > > > input data, so that the window operator receives the exactly same
> > data. The
> > > > only difference is when I do `assignTimestampsAndWatermarks`. The
> > result is
> > > > the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> > > > ```java
> > > > DataStream<Trip> trips =
> > > >         env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > >     @Override
> > > >     public long extractTimestamp(Trip trip) {
> > > >         return trip.endTime.getTime();
> > > >     }
> > > > });
> > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> > > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >         Time.days(1));
> > > > ```
> > > >
> > > > `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> > > > ```java
> > > > DataStream<Trip> trips = env.addSource(consumer);
> > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream<Trip> featurizedUserTrips =
> > > >         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > >     @Override
> > > >     public long extractTimestamp(Trip trip) {
> > > >         return trip.endTime.getTime();
> > > >     }
> > > > });
> > > > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > > >         Time.days(1));
> > > > ```
> > > >
> > > > It feels a bug to me, but I want to confirm it before I file the bug
> > > > report.
> > > >
> > > > On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote:
> > > > > Hi,
> > > > >
> > > > > Could you check the watermark of the window operator? One possible
> > > > situation would be some of the keys are not getting enough inputs, so
> > their
> > > > watermarks remain below the window end time and hold the window
> > operator
> > > > watermark back. IMO, it’s a good practice to assign watermark earlier
> > in
> > > > the data pipeline.
> > > > >
> > > > > Best,
> > > > > Paul Lam
> > > > >
> > > > > > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > > > > >
> > > > > > `assignTimestampsAndWatermarks` before `keyBy` works:
> > > > > > ```java
> > > > > > DataStream<Trip> trips =
> > > > > >        env.addSource(consumer).assignTimestampsAndWatermarks(new
> > > > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > > >            @Override
> > > > > >            public long extractTimestamp(Trip trip) {
> > > > > >                return trip.endTime.getTime();
> > > > > >            }
> > > > > >        });
> > > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> > trip.userId);
> > > > > > DataStream<FeaturizedTrip> featurizedUserTrips =
> > userTrips.process(new
> > > > Featurization());
> > > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > > > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > >                Time.days(1));
> > > > > > ```
> > > > > >
> > > > > > But not after `keyBy` and `process`:
> > > > > > ```java
> > > > > > DataStream<Trip> trips = env.addSource(consumer);
> > > > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip ->
> > trip.userId);
> > > > > > DataStream<FeaturizedTrip> featurizedUserTrips =
> > > > > >        userTrips.process(new
> > > > Featurization()).assignTimestampsAndWatermarks(new
> > > > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > > > > >            @Override
> > > > > >            public long extractTimestamp(FeaturizedTrip trip) {
> > > > > >                return trip.endTime.getTime();
> > > > > >            }
> > > > > >        });
> > > > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > > > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > > >                Time.days(1));
> > > > > > ```
> > > > > > Windows are never triggered.
> > > > > >
> > > > > > Is it a bug or expected behavior? If the latter, where is it
> > > > documented?
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 

Reply via email to