Hi,
After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
could receive the elements(trip). If that is the case
BoundedOutOfOrdernessTimestampExtractor, which does not receive element
would not send the WM. Since that the timeWindowAll operator could not be
triggered.
You could add a shuffle() before the assignTimestampsAndWatermarks in your
second case and check if the window is triggered.  If it could be triggered
you could check the distribution of elements generated by the source.

Best,
Guowei


an0...@gmail.com <an0...@gmail.com> 于2019年4月19日周五 上午4:10写道:

> I don't think it is the watermark. I see the same watermarks from the two
> versions of code.
>
> The processing on the keyed stream doesn't change event time at all. I can
> simply change my code to use `map` on the keyed stream to return back the
> input data, so that the window operator receives the exactly same data. The
> only difference is when I do `assignTimestampsAndWatermarks`. The result is
> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> ```java
> DataStream<Trip> trips =
>         env.addSource(consumer).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>     @Override
>     public long extractTimestamp(Trip trip) {
>         return trip.endTime.getTime();
>     }
> });
> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> featurizedUserTrips.timeWindowAll(Time.days(7),
>         Time.days(1));
> ```
>
> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> ```java
> DataStream<Trip> trips = env.addSource(consumer);
> KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> DataStream<Trip> featurizedUserTrips =
>         userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
>     @Override
>     public long extractTimestamp(Trip trip) {
>         return trip.endTime.getTime();
>     }
> });
> AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> featurizedUserTrips.timeWindowAll(Time.days(7),
>         Time.days(1));
> ```
>
> It feels a bug to me, but I want to confirm it before I file the bug
> report.
>
> On 2019/04/18 03:38:34, Paul Lam <paullin3...@gmail.com> wrote:
> > Hi,
> >
> > Could you check the watermark of the window operator? One possible
> situation would be some of the keys are not getting enough inputs, so their
> watermarks remain below the window end time and hold the window operator
> watermark back. IMO, it’s a good practice to assign watermark earlier in
> the data pipeline.
> >
> > Best,
> > Paul Lam
> >
> > > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > >
> > > `assignTimestampsAndWatermarks` before `keyBy` works:
> > > ```java
> > > DataStream<Trip> trips =
> > >        env.addSource(consumer).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > >            @Override
> > >            public long extractTimestamp(Trip trip) {
> > >                return trip.endTime.getTime();
> > >            }
> > >        });
> > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new
> Featurization());
> > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > >                Time.days(1));
> > > ```
> > >
> > > But not after `keyBy` and `process`:
> > > ```java
> > > DataStream<Trip> trips = env.addSource(consumer);
> > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > DataStream<FeaturizedTrip> featurizedUserTrips =
> > >        userTrips.process(new
> Featurization()).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > >            @Override
> > >            public long extractTimestamp(FeaturizedTrip trip) {
> > >                return trip.endTime.getTime();
> > >            }
> > >        });
> > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > >        featurizedUserTrips.timeWindowAll(Time.days(7),
> > >                Time.days(1));
> > > ```
> > > Windows are never triggered.
> > >
> > > Is it a bug or expected behavior? If the latter, where is it
> documented?
> > >
> >
> >
>

Reply via email to