Hi,
First of all, thank you for the `shuffle()` tip. It works. However, I still
don't understand why it doesn't work without calling `shuffle()`.
Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips? All
the trips has keys and timestamps. As I said in my reply to Paul, I see the
same watermarks being extracted.
How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
matter? My understanding is any specific window for a specific key always
receives the exactly same data, and the calling order of
`assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
To make `keyBy` as irrelevant as possible, I tried letting it always return the
same key so that there is only 1 keyed stream and it is exactly the same as the
original unkeyed stream. It still doesn't trigger windows:
```java
DataStream<Trip> trips = env.addSource(consumer);
KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> 0L);
DataStream<Trip> featurizedUserTrips =
userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```
It makes no sense to me. Please help me understand why it doesn't work. Thanks!
On 2019/04/19 04:14:31, Guowei Ma <[email protected]> wrote:
> Hi,
> After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
> could receive the elements(trip). If that is the case
> BoundedOutOfOrdernessTimestampExtractor, which does not receive element
> would not send the WM. Since that the timeWindowAll operator could not be
> triggered.
> You could add a shuffle() before the assignTimestampsAndWatermarks in your
> second case and check if the window is triggered. If it could be triggered
> you could check the distribution of elements generated by the source.
>
> Best,
> Guowei
>
>
> [email protected] <[email protected]> 于2019年4月19日周五 上午4:10写道:
>
> > I don't think it is the watermark. I see the same watermarks from the two
> > versions of code.
> >
> > The processing on the keyed stream doesn't change event time at all. I can
> > simply change my code to use `map` on the keyed stream to return back the
> > input data, so that the window operator receives the exactly same data. The
> > only difference is when I do `assignTimestampsAndWatermarks`. The result is
> > the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> > ```java
> > DataStream<Trip> trips =
> > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > @Override
> > public long extractTimestamp(Trip trip) {
> > return trip.endTime.getTime();
> > }
> > });
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<Trip> featurizedUserTrips = userTrips.map(trip -> trip);
> > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> > Time.days(1));
> > ```
> >
> > `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> > ```java
> > DataStream<Trip> trips = env.addSource(consumer);
> > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream<Trip> featurizedUserTrips =
> > userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > @Override
> > public long extractTimestamp(Trip trip) {
> > return trip.endTime.getTime();
> > }
> > });
> > AllWindowedStream<Trip, TimeWindow> windowedUserTrips =
> > featurizedUserTrips.timeWindowAll(Time.days(7),
> > Time.days(1));
> > ```
> >
> > It feels a bug to me, but I want to confirm it before I file the bug
> > report.
> >
> > On 2019/04/18 03:38:34, Paul Lam <[email protected]> wrote:
> > > Hi,
> > >
> > > Could you check the watermark of the window operator? One possible
> > situation would be some of the keys are not getting enough inputs, so their
> > watermarks remain below the window end time and hold the window operator
> > watermark back. IMO, it’s a good practice to assign watermark earlier in
> > the data pipeline.
> > >
> > > Best,
> > > Paul Lam
> > >
> > > > 在 2019年4月17日,23:04,[email protected] 写道:
> > > >
> > > > `assignTimestampsAndWatermarks` before `keyBy` works:
> > > > ```java
> > > > DataStream<Trip> trips =
> > > > env.addSource(consumer).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<Trip>(Time.days(1)) {
> > > > @Override
> > > > public long extractTimestamp(Trip trip) {
> > > > return trip.endTime.getTime();
> > > > }
> > > > });
> > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream<FeaturizedTrip> featurizedUserTrips = userTrips.process(new
> > Featurization());
> > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > Time.days(1));
> > > > ```
> > > >
> > > > But not after `keyBy` and `process`:
> > > > ```java
> > > > DataStream<Trip> trips = env.addSource(consumer);
> > > > KeyedStream<Trip, Long> userTrips = trips.keyBy(trip -> trip.userId);
> > > > DataStream<FeaturizedTrip> featurizedUserTrips =
> > > > userTrips.process(new
> > Featurization()).assignTimestampsAndWatermarks(new
> > BoundedOutOfOrdernessTimestampExtractor<FeaturizedTrip>(Time.days(1)) {
> > > > @Override
> > > > public long extractTimestamp(FeaturizedTrip trip) {
> > > > return trip.endTime.getTime();
> > > > }
> > > > });
> > > > AllWindowedStream<FeaturizedTrip, TimeWindow> windowedUserTrips =
> > > > featurizedUserTrips.timeWindowAll(Time.days(7),
> > > > Time.days(1));
> > > > ```
> > > > Windows are never triggered.
> > > >
> > > > Is it a bug or expected behavior? If the latter, where is it
> > documented?
> > > >
> > >
> > >
> >
>