Thinking about this a bit more... I think it may be interesting to enable two modes for event-time advancement in Flink
1) The current mode which I'll call partition-based, pessimistic, event-time advancement 2) Key-based, eager, event-time advancement In this key-based eager mode it's actually quite simple and it basically becomes a completely local thing as Paris stated. In this mode you would advance event time, per-key, along with the maximum (adjusted) timestamp you've seen rather than the minimum. So the current event time at any node for some key is simply the maximum timestamp you've seen - adjusted (like now) with the logic from a timestamp extractor -- for example the BoundedOutOfOrderness extractor. This is very simple and could possibly work well as long as the delay used in the event-time calculation is enough to adjust for the real time skew you're likely to observe for any key. I wonder how well this might work in practice? On Tue, Feb 28, 2017 at 6:22 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > @Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could > allow it but then we would exit the world of the deluxe stream and per-key > watermarks and go back to the realm of normal streams and keyed streams. > > On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > > > Throwing in some thoughts: > > > > When a source determines that no more data will come for a key (which > > in itself is a bit of a tricky problem) then it should signal to > > downstream > > operations to take the key out of watermark calculations, that is that we > > can release some space. > > I don’t think this is possible without exposing API for the UDF to signal > > there will be no more data for a specific key. We could detect idleness > of > > a key at the source operator, but without any help from user logic, > > essentially it can only be seen as "temporarily idle", which is not > helpful > > in reducing the state as the watermark state for that key still needs to > be > > kept downstream. > > > > So to achieve this, I think the only option would be to expose new APIs > > here too. > > > > It’s like how we recently exposed a new `markAsTemporarilyIdle` method in > > the SourceFunction.SourceContext interface, but instead a > > `markKeyTerminated` that must be called by the source UDF to be able to > > save state space and have no feasible fallback detection strategy. > > > > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); > > input > > .map() > > .window(...) // notice that we don't need keyBy because it is implicit > > .reduce(...) > > .map(...) > > .window(...) > > ... > > > > Would this mean that another `keyBy` isn’t allowed downstream? Or still > > allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta > key” > > to track key lineage? > > > > On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek ( > aljos...@apache.org) > > wrote: > > > > This is indeed an interesting topic, thanks for starting the discussion, > > Jamie! > > > > I now thought about this for a while, since more and more people seem to > be > > asking about it lately. First, I thought that per-key watermark handling > > would not be necessary because it can be done locally (as Paris > suggested), > > then I realised that that's not actually the case and thought that this > > wouldn't be possible. In the end, I came to realise that it is indeed > > possible (with some caveats), although with a huge overhead in the amount > > of state that we have to keep and with changes to our API. I'll try and > > walk you through my thought process. > > > > Let's first look at local watermark tracking, that is, tracking the > > watermark locally at the operator that needs it, for example a > > WindowOperator. I initially thought that this would be sufficient. Assume > > we have a pipeline like this: > > > > Source -> KeyBy -> WindowOperator -> ... > > > > If we have parallelism=1, then all elements for a given key k will be > read > > by the same source operator instance and they will arrive (in-order) at > the > > WindowOperator. It doesn't matter whether we track the per-key watermarks > > at the Source or at the WindowOperator because we see the same elements > in > > the same order at each operator, per key. > > > > Now, think about this pipeline: > > > > Source1 --+ > > |-> Union -> KeyBy -> WindowOperator -> ... > > Source2 --+ > > > > (you can either think about two sources or once source that has several > > parallel instances, i.e. parallelism > 1) > > > > Here, both Source1 and Source2 can emit elements with our key k. If > Source1 > > is faster than Source2 and the watermarking logic at the WindowOperator > > determines the watermark based on the incoming element timestamps (for > > example, using the BoundedLatenessTimestampExtractor) then the elements > > coming from Source2 will be considered late at the WindowOperator. > > > > From this we know that our WindowOperator needs to calculate the > watermark > > similarly to how watermark calculation currently happens in Flink: the > > watermark is the minimum of the watermark of all upstream operations. In > > this case it would be: the minimum upstream watermarks of operations that > > emit elements with key k. For per-partition watermarks this works because > > the number of upstream operations is know and we simply keep an array > that > > has the current upstream watermark for each input operation. For per-key > > watermarks this would mean that we have to keep k*u upstream watermarks > > where u is the number of upstream operations. This can be quite large. > > Another problem is that the observed keys change, i.e. the key space is > > evolving and we need to retire keys from our calculations lest we run out > > of space. > > > > We could find a solution based on a feature we recently introduced in > > Flink: https://github.com/apache/flink/pull/2801. The sources keep track > > of > > whether they have input and signal to downstream operations whether they > > should be included in the watermark calculation logic. A similar thing > > could be done per-key, where each source signals to downstream operations > > that there is a new key and that we should start calculating watermarks > for > > this. When a source determines that no more data will come for a key > (which > > in itself is a bit of a tricky problem) then it should signal to > downstream > > operations to take the key out of watermark calculations, that is that we > > can release some space. > > > > The above is analysing, on a purely technical level, the feasibility of > > such a feature. I think it is feasible but can be very expensive in terms > > of state size requirements. Gabor also pointed this out above and gave a > > few suggestions on reducing that size. > > > > We would also need to change our API to allow tracking the lineage of > keys > > or to enforce that a key stays the same throughout a pipeline. Consider > > this pipeline: > > > > Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator > > > > where KeyBy1 and KeyBy2 extract a different key, respectively. How would > > watermarks be tracked across this change of keys? Would we know which of > > the prior keys and up being keys according to KeyBy2, i.e. do we have > some > > kind of key lineage information? > > > > One approach for solving this would be to introduce a new API that allows > > extracting a key at the source and will keep this key on the elements > until > > the sink. For example: > > > > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); > > input > > .map() > > .window(...) // notice that we don't need keyBy because it is implicit > > .reduce(...) > > .map(...) > > .window(...) > > ... > > > > The DeluxeKeyedStream (name preliminary ;-) would allow the operations > that > > we today have on KeyedStream and on DataStream and it would always > maintain > > the key that was assigned at the sources. The result of each operation > > would again be a DeluxeKeyedStream. This way, we could track watermarks > per > > key. > > > > I know it's a bit of a (very) lengthy mail, but what do you think? > > > > > > On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <m...@gaborhermann.com> > wrote: > > > > > Hey all, > > > > > > Let me share some ideas about this. > > > > > > @Paris: The local-only progress tracking indeed seems easier, we do not > > > need to broadcast anything. Implementation-wise it is easier, but > > > performance-wise probably not. If one key can come from multiple > > > sources, there could be a lot more network overhead with per-key > > > tracking then broadcasting, somewhat paradoxically. Say source instance > > > S1 sends messages and watermarks to operator instances O1, O2. In the > > > broadcasting case, S1 would send one message to O1 and one to O2 per > > > watermark (of course it depends on how fast the watermarks arrive), > > > total of 2. Although, if we keep track of per-key watermarks, S1 would > > > need to send watermarks for every key directed to O1, also for O2. So > if > > > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if > watermarks > > > arrive at the same rate per-key as per-source in the previous case) we > > > S1 would send a total of 20 watermarks. > > > > > > Another question is whether how large the state-per-key is? If it's > > > really small (an integer maybe, or state of a small state machine), > then > > > the overhead of keeping track of a (Long) watermark is large > > > memory-wise. E.g. Int state vs. Long watermark results in 3x as large > > > state. Also, the checkpointing would be ~3x as slow. Of course, for > > > large states a Long watermark would not mean much overhead. > > > > > > We could resolve the memory issue by using some kind of sketch data > > > structure. Right now the granularity of watermark handling is > > > per-operator-instance. On the other hand, per-key granularity might be > > > costly. What if we increased the granularity of watermarks inside an > > > operator by keeping more than one watermark tracker in one operator? > > > This could be quite simply done with a hash table. With a hash table of > > > size 1, we would yield the current semantics (per-operator-instance > > > granularity). With a hash table large enough to have at most one key > per > > > bucket, we would yield per-key watermark tracking. In between lies the > > > trade-off between handling time-skew and a lot of memory overhead. This > > > does not seem hard to implement. > > > > > > Of course, at some point we would still need to take care of watermarks > > > per-key. Imagine that keys A and B would go to the same bucket of the > > > hash table, and watermarks are coming in like this: (B,20), (A,10), > > > (A,15), (A,40). Then the watermark of the bucket should be the minimum > > > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of > > > the watermarks of A and B separately. But after we have a correct > > > watermark for the bucket, all we need to care about is the bucket > > > watermarks. So somewhere (most probably at the source) we would have to > > > pay memory overhead of tracking every key, but nowhere else in the > > > topology. > > > > > > Regarding the potentially large network overhead, the same compression > > > could be useful. I.e. we would not send watermarks from one operator > > > per-key, but rather per-hash. Again, the trade-off between time skew > and > > > memory consumption is configurable by the size of the hash table used. > > > > > > Cheers, > > > Gabor > > > > > > On 2017-02-23 08:57, Paris Carbone wrote: > > > > > > > Hey Jamie! > > > > > > > > Key-based progress tracking sounds like local-only progress tracking > to > > > me, there is no need to use a low watermarking mechanism at all since > all > > > streams of a key are handled by a single partition at a time (per > > operator). > > > > Thus, this could be much easier to implement and support (i.e., no > need > > > to broadcast the progress state of each partition all the time). > > > > State-wise it should be fine too if it is backed by rocksdb, > especially > > > if we have MapState in the future. > > > > > > > > Just my quick thoughts on this, to get the discussion going :) > > > > > > > > cheers > > > > Paris > > > > > > > >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> > > wrote: > > > >> > > > >> Hi Flink Devs, > > > >> > > > >> Use cases that I see quite frequently in the real world would > benefit > > > from > > > >> a different watermarking / event time model than the one currently > > > >> implemented in Flink. > > > >> > > > >> I would call Flink's current approach partition-based watermarking > or > > > maybe > > > >> subtask-based watermarking. In this model the current "event time" > is > > a > > > >> property local to each subtask instance in a dataflow graph. The > event > > > >> time at any subtask is the minimum of the watermarks it has received > > on > > > >> each of it's input streams. > > > >> > > > >> There are a couple of issues with this model that are not optimal > for > > > some > > > >> (maybe many) use cases. > > > >> > > > >> 1) A single slow subtask (or say source partition) anywhere in the > > > dataflow > > > >> can mean no progress can be made on the computation at all. > > > >> > > > >> 2) In many real world scenarios the time skew across keys can be > > *many* > > > >> times greater than the time skew within the data with the same key. > > > >> > > > >> In this discussion I'll use "time skew" to refer to the > > out-of-orderness > > > >> with respect to timestamp of the data. Out-of-orderness is a > mouthful > > > ;) > > > >> > > > >> Anyway, let me provide an example or two. > > > >> > > > >> In IoT applications the source of events is a particular device out > in > > > the > > > >> world, let's say a device in a connected car application. The data > for > > > >> some particular device may be very bursty and we will certainly get > > > events > > > >> from these devices in Flink out-of-order just because of things like > > > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew > in > > > the > > > >> data for a single device should likely be very small (milliseconds > or > > > maybe > > > >> seconds).. > > > >> > > > >> However, in the same application the time skew across different > > devices > > > can > > > >> be huge (hours or even days). An obvious example of this, again > using > > > >> connected cars as a representative example is the following: Car A > is > > > >> recording data locally at 12:00 pm on Saturday but doesn't currently > > > have a > > > >> network connection. Car B is doing the same thing but does have a > > > network > > > >> connection. Car A will transmit it's data when the network comes > back > > > on > > > >> line. Let's say this is at 4pm. Car B was transmitting it's data > > > >> immediately. This creates a huge time skew (4 hours) in the observed > > > >> datastream when looked at as a whole. However, the time skew in that > > > data > > > >> for Car A or Car B alone could be tiny. It will be out of order of > > > course > > > >> but maybe by only milliseconds or seconds. > > > >> > > > >> What the above means in the end for Flink is that the watermarks > must > > be > > > >> delayed by up to 4 hours or more because we're looking at the data > > > stream > > > >> as a whole -- otherwise the data for Car A will be considered late. > > The > > > >> time skew in the data stream when looked at as a whole is large even > > > though > > > >> the time skew for any key may be tiny. > > > >> > > > >> This is the problem I would like to see a solution for. The basic > idea > > > of > > > >> keeping track of watermarks and event time "per-key" rather than per > > > >> partition or subtask would solve I think both of these problems > stated > > > >> above and both of these are real issues for production applications. > > > >> > > > >> The obvious downside of trying to do this per-key is that the amount > > of > > > >> state you have to track is much larger and potentially unbounded. > > > However, > > > >> I could see this approach working if the keyspace isn't growing > > rapidly > > > but > > > >> is stable or grows slowly. The saving grace here is that this may > > > actually > > > >> be true of the types of applications where this would be especially > > > >> useful. Think IoT use cases. Another approach to keeping state size > in > > > >> check would be a configurable TTL for a key. > > > >> > > > >> Anyway, I'm throwing this out here on the mailing list in case > anyone > > is > > > >> interested in this discussion, has thought about the problem deeply > > > >> already, has use cases of their own they've run into or has ideas > for > > a > > > >> solution to this problem. > > > >> > > > >> Thanks for reading.. > > > >> > > > >> -Jamie > > > >> > > > >> > > > >> -- > > > >> > > > >> Jamie Grier > > > >> data Artisans, Director of Applications Engineering > > > >> @jamiegrier <https://twitter.com/jamiegrier> > > > >> ja...@data-artisans.com > > > > > > > > > > > > > -- Jamie Grier data Artisans, Director of Applications Engineering @jamiegrier <https://twitter.com/jamiegrier> ja...@data-artisans.com