Hey Jamie! Key-based progress tracking sounds like local-only progress tracking to me, there is no need to use a low watermarking mechanism at all since all streams of a key are handled by a single partition at a time (per operator). Thus, this could be much easier to implement and support (i.e., no need to broadcast the progress state of each partition all the time). State-wise it should be fine too if it is backed by rocksdb, especially if we have MapState in the future.
Just my quick thoughts on this, to get the discussion going :) cheers Paris > On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> wrote: > > Hi Flink Devs, > > Use cases that I see quite frequently in the real world would benefit from > a different watermarking / event time model than the one currently > implemented in Flink. > > I would call Flink's current approach partition-based watermarking or maybe > subtask-based watermarking. In this model the current "event time" is a > property local to each subtask instance in a dataflow graph. The event > time at any subtask is the minimum of the watermarks it has received on > each of it's input streams. > > There are a couple of issues with this model that are not optimal for some > (maybe many) use cases. > > 1) A single slow subtask (or say source partition) anywhere in the dataflow > can mean no progress can be made on the computation at all. > > 2) In many real world scenarios the time skew across keys can be *many* > times greater than the time skew within the data with the same key. > > In this discussion I'll use "time skew" to refer to the out-of-orderness > with respect to timestamp of the data. Out-of-orderness is a mouthful ;) > > Anyway, let me provide an example or two. > > In IoT applications the source of events is a particular device out in the > world, let's say a device in a connected car application. The data for > some particular device may be very bursty and we will certainly get events > from these devices in Flink out-of-order just because of things like > partitions in Kafka, shuffles in Flink, etc. However, the time skew in the > data for a single device should likely be very small (milliseconds or maybe > seconds).. > > However, in the same application the time skew across different devices can > be huge (hours or even days). An obvious example of this, again using > connected cars as a representative example is the following: Car A is > recording data locally at 12:00 pm on Saturday but doesn't currently have a > network connection. Car B is doing the same thing but does have a network > connection. Car A will transmit it's data when the network comes back on > line. Let's say this is at 4pm. Car B was transmitting it's data > immediately. This creates a huge time skew (4 hours) in the observed > datastream when looked at as a whole. However, the time skew in that data > for Car A or Car B alone could be tiny. It will be out of order of course > but maybe by only milliseconds or seconds. > > What the above means in the end for Flink is that the watermarks must be > delayed by up to 4 hours or more because we're looking at the data stream > as a whole -- otherwise the data for Car A will be considered late. The > time skew in the data stream when looked at as a whole is large even though > the time skew for any key may be tiny. > > This is the problem I would like to see a solution for. The basic idea of > keeping track of watermarks and event time "per-key" rather than per > partition or subtask would solve I think both of these problems stated > above and both of these are real issues for production applications. > > The obvious downside of trying to do this per-key is that the amount of > state you have to track is much larger and potentially unbounded. However, > I could see this approach working if the keyspace isn't growing rapidly but > is stable or grows slowly. The saving grace here is that this may actually > be true of the types of applications where this would be especially > useful. Think IoT use cases. Another approach to keeping state size in > check would be a configurable TTL for a key. > > Anyway, I'm throwing this out here on the mailing list in case anyone is > interested in this discussion, has thought about the problem deeply > already, has use cases of their own they've run into or has ideas for a > solution to this problem. > > Thanks for reading.. > > -Jamie > > > -- > > Jamie Grier > data Artisans, Director of Applications Engineering > @jamiegrier <https://twitter.com/jamiegrier> > ja...@data-artisans.com