Each parallel instance of a TimestampAssigner independently assigns timestamps. After a shuffle, operators forward the minimum watermark across all input connections. For details have a look at the watermarks documentation [1].
Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#watermarks-in-parallel-streams 2017-06-11 17:22 GMT+02:00 Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>: > Thanks for the explanation, Fabian. > > > > Suppose I have a parallel source that does not inject watermarks, and the > first operation on the DataStream is assignTimestampsAndWatermarks. Does > each parallel task that makes up the source independently inject watermarks > for the records that it has read? Suppose I then call keyBy and a shuffle > ensues. Will the resulting partitions after the shuffle have interleaved > watermarks from the various source tasks? > > > > More concretely, suppose s source has a degree of parallelism of two. One > of the source tasks injects the watermarks 2 and 5, while the other injects > 3 and 10. There is then a shuffle, creating two different partitions. Will > all the watermarks be broadcast to all the partitions? Or is it possible > for, say, one partition to end up with watermarks 2 and 10 and another with > 3 and 5? And after the shuffle, how do we ensure that the watermarks are > processed in order by the operators receiving them? > > > > Thanks, > > > > Ray > > > > *From: *Fabian Hueske <fhue...@gmail.com> > *Date: *Saturday, June 10, 2017 at 3:56 PM > *To: *Ray Ruvinskiy <ray.ruvins...@arcticwolf.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: At what point do watermarks get injected into the stream? > > > > Hi Ray, > > in principle, watermarks can be injected anywhere in a stream by calling > DataStream.assignTimestampsAndWatermarks(). > > However, timestamps are usually injected as soon as possible after a > stream in ingested (before the first shuffle). The reason is that > watermarks depend on the order of events (and their timestamps) in the > stream. While Flink guarantees the order of events within a partition, a > shuffle interleaves events of different partitions in an unpredictable way > such that it is not possible to reason about the order of timestamps > afterwards. > > The most common way to inject watermarks is directly inside of a > SourceFunction or with a TimestampAssigner before the first shuffle. > > Best, Fabian > > > > 2017-06-09 0:46 GMT+02:00 Ray Ruvinskiy <ray.ruvins...@arcticwolf.com>: > > I’m trying to build a mental model of how watermarks get injected into the > stream. Suppose I have a stream with a parallel source, and I’m running a > cluster with multiple task managers. Does each parallel source reader > inject watermarks, which are then forwarded to downstream consumers and > shuffled between task managers? Or are watermarks created after the > shuffle, when the stream records reach their destined task manager and > right before they’re processed by the operator? > > > > Thanks, > > > > Ray > > >