Throwing in some thoughts:

When a source determines that no more data will come for a key (which 
in itself is a bit of a tricky problem) then it should signal to downstream 
operations to take the key out of watermark calculations, that is that we 
can release some space. 
I don’t think this is possible without exposing API for the UDF to signal there 
will be no more data for a specific key. We could detect idleness of a key at 
the source operator, but without any help from user logic, essentially it can 
only be seen as "temporarily idle", which is not helpful in reducing the state 
as the watermark state for that key still needs to be kept downstream.

So to achieve this, I think the only option would be to expose new APIs here 
too.

It’s like how we recently exposed a new `markAsTemporarilyIdle` method in the 
SourceFunction.SourceContext interface, but instead a `markKeyTerminated` that 
must be called by the source UDF to be able to save state space and have no 
feasible fallback detection strategy.

DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor); 
input 
.map() 
.window(...) // notice that we don't need keyBy because it is implicit 
.reduce(...) 
.map(...) 
.window(...) 
... 

Would this mean that another `keyBy` isn’t allowed downstream? Or still 
allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta key” to 
track key lineage?

On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek (aljos...@apache.org) 
wrote:

This is indeed an interesting topic, thanks for starting the discussion,  
Jamie!  

I now thought about this for a while, since more and more people seem to be  
asking about it lately. First, I thought that per-key watermark handling  
would not be necessary because it can be done locally (as Paris suggested),  
then I realised that that's not actually the case and thought that this  
wouldn't be possible. In the end, I came to realise that it is indeed  
possible (with some caveats), although with a huge overhead in the amount  
of state that we have to keep and with changes to our API. I'll try and  
walk you through my thought process.  

Let's first look at local watermark tracking, that is, tracking the  
watermark locally at the operator that needs it, for example a  
WindowOperator. I initially thought that this would be sufficient. Assume  
we have a pipeline like this:  

Source -> KeyBy -> WindowOperator -> ...  

If we have parallelism=1, then all elements for a given key k will be read  
by the same source operator instance and they will arrive (in-order) at the  
WindowOperator. It doesn't matter whether we track the per-key watermarks  
at the Source or at the WindowOperator because we see the same elements in  
the same order at each operator, per key.  

Now, think about this pipeline:  

Source1 --+  
|-> Union -> KeyBy -> WindowOperator -> ...  
Source2 --+  

(you can either think about two sources or once source that has several  
parallel instances, i.e. parallelism > 1)  

Here, both Source1 and Source2 can emit elements with our key k. If Source1  
is faster than Source2 and the watermarking logic at the WindowOperator  
determines the watermark based on the incoming element timestamps (for  
example, using the BoundedLatenessTimestampExtractor) then the elements  
coming from Source2 will be considered late at the WindowOperator.  

From this we know that our WindowOperator needs to calculate the watermark  
similarly to how watermark calculation currently happens in Flink: the  
watermark is the minimum of the watermark of all upstream operations. In  
this case it would be: the minimum upstream watermarks of operations that  
emit elements with key k. For per-partition watermarks this works because  
the number of upstream operations is know and we simply keep an array that  
has the current upstream watermark for each input operation. For per-key  
watermarks this would mean that we have to keep k*u upstream watermarks  
where u is the number of upstream operations. This can be quite large.  
Another problem is that the observed keys change, i.e. the key space is  
evolving and we need to retire keys from our calculations lest we run out  
of space.  

We could find a solution based on a feature we recently introduced in  
Flink: https://github.com/apache/flink/pull/2801. The sources keep track of  
whether they have input and signal to downstream operations whether they  
should be included in the watermark calculation logic. A similar thing  
could be done per-key, where each source signals to downstream operations  
that there is a new key and that we should start calculating watermarks for  
this. When a source determines that no more data will come for a key (which  
in itself is a bit of a tricky problem) then it should signal to downstream  
operations to take the key out of watermark calculations, that is that we  
can release some space.  

The above is analysing, on a purely technical level, the feasibility of  
such a feature. I think it is feasible but can be very expensive in terms  
of state size requirements. Gabor also pointed this out above and gave a  
few suggestions on reducing that size.  

We would also need to change our API to allow tracking the lineage of keys  
or to enforce that a key stays the same throughout a pipeline. Consider  
this pipeline:  

Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator  

where KeyBy1 and KeyBy2 extract a different key, respectively. How would  
watermarks be tracked across this change of keys? Would we know which of  
the prior keys and up being keys according to KeyBy2, i.e. do we have some  
kind of key lineage information?  

One approach for solving this would be to introduce a new API that allows  
extracting a key at the source and will keep this key on the elements until  
the sink. For example:  

DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);  
input  
.map()  
.window(...) // notice that we don't need keyBy because it is implicit  
.reduce(...)  
.map(...)  
.window(...)  
...  

The DeluxeKeyedStream (name preliminary ;-) would allow the operations that  
we today have on KeyedStream and on DataStream and it would always maintain  
the key that was assigned at the sources. The result of each operation  
would again be a DeluxeKeyedStream. This way, we could track watermarks per  
key.  

I know it's a bit of a (very) lengthy mail, but what do you think?  


On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <m...@gaborhermann.com> wrote:  

> Hey all,  
>  
> Let me share some ideas about this.  
>  
> @Paris: The local-only progress tracking indeed seems easier, we do not  
> need to broadcast anything. Implementation-wise it is easier, but  
> performance-wise probably not. If one key can come from multiple  
> sources, there could be a lot more network overhead with per-key  
> tracking then broadcasting, somewhat paradoxically. Say source instance  
> S1 sends messages and watermarks to operator instances O1, O2. In the  
> broadcasting case, S1 would send one message to O1 and one to O2 per  
> watermark (of course it depends on how fast the watermarks arrive),  
> total of 2. Although, if we keep track of per-key watermarks, S1 would  
> need to send watermarks for every key directed to O1, also for O2. So if  
> 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if watermarks  
> arrive at the same rate per-key as per-source in the previous case) we  
> S1 would send a total of 20 watermarks.  
>  
> Another question is whether how large the state-per-key is? If it's  
> really small (an integer maybe, or state of a small state machine), then  
> the overhead of keeping track of a (Long) watermark is large  
> memory-wise. E.g. Int state vs. Long watermark results in 3x as large  
> state. Also, the checkpointing would be ~3x as slow. Of course, for  
> large states a Long watermark would not mean much overhead.  
>  
> We could resolve the memory issue by using some kind of sketch data  
> structure. Right now the granularity of watermark handling is  
> per-operator-instance. On the other hand, per-key granularity might be  
> costly. What if we increased the granularity of watermarks inside an  
> operator by keeping more than one watermark tracker in one operator?  
> This could be quite simply done with a hash table. With a hash table of  
> size 1, we would yield the current semantics (per-operator-instance  
> granularity). With a hash table large enough to have at most one key per  
> bucket, we would yield per-key watermark tracking. In between lies the  
> trade-off between handling time-skew and a lot of memory overhead. This  
> does not seem hard to implement.  
>  
> Of course, at some point we would still need to take care of watermarks  
> per-key. Imagine that keys A and B would go to the same bucket of the  
> hash table, and watermarks are coming in like this: (B,20), (A,10),  
> (A,15), (A,40). Then the watermark of the bucket should be the minimum  
> as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of  
> the watermarks of A and B separately. But after we have a correct  
> watermark for the bucket, all we need to care about is the bucket  
> watermarks. So somewhere (most probably at the source) we would have to  
> pay memory overhead of tracking every key, but nowhere else in the  
> topology.  
>  
> Regarding the potentially large network overhead, the same compression  
> could be useful. I.e. we would not send watermarks from one operator  
> per-key, but rather per-hash. Again, the trade-off between time skew and  
> memory consumption is configurable by the size of the hash table used.  
>  
> Cheers,  
> Gabor  
>  
> On 2017-02-23 08:57, Paris Carbone wrote:  
>  
> > Hey Jamie!  
> >  
> > Key-based progress tracking sounds like local-only progress tracking to  
> me, there is no need to use a low watermarking mechanism at all since all  
> streams of a key are handled by a single partition at a time (per operator).  
> > Thus, this could be much easier to implement and support (i.e., no need  
> to broadcast the progress state of each partition all the time).  
> > State-wise it should be fine too if it is backed by rocksdb, especially  
> if we have MapState in the future.  
> >  
> > Just my quick thoughts on this, to get the discussion going :)  
> >  
> > cheers  
> > Paris  
> >  
> >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com> wrote:  
> >>  
> >> Hi Flink Devs,  
> >>  
> >> Use cases that I see quite frequently in the real world would benefit  
> from  
> >> a different watermarking / event time model than the one currently  
> >> implemented in Flink.  
> >>  
> >> I would call Flink's current approach partition-based watermarking or  
> maybe  
> >> subtask-based watermarking. In this model the current "event time" is a  
> >> property local to each subtask instance in a dataflow graph. The event  
> >> time at any subtask is the minimum of the watermarks it has received on  
> >> each of it's input streams.  
> >>  
> >> There are a couple of issues with this model that are not optimal for  
> some  
> >> (maybe many) use cases.  
> >>  
> >> 1) A single slow subtask (or say source partition) anywhere in the  
> dataflow  
> >> can mean no progress can be made on the computation at all.  
> >>  
> >> 2) In many real world scenarios the time skew across keys can be *many*  
> >> times greater than the time skew within the data with the same key.  
> >>  
> >> In this discussion I'll use "time skew" to refer to the out-of-orderness  
> >> with respect to timestamp of the data. Out-of-orderness is a mouthful  
> ;)  
> >>  
> >> Anyway, let me provide an example or two.  
> >>  
> >> In IoT applications the source of events is a particular device out in  
> the  
> >> world, let's say a device in a connected car application. The data for  
> >> some particular device may be very bursty and we will certainly get  
> events  
> >> from these devices in Flink out-of-order just because of things like  
> >> partitions in Kafka, shuffles in Flink, etc. However, the time skew in  
> the  
> >> data for a single device should likely be very small (milliseconds or  
> maybe  
> >> seconds)..  
> >>  
> >> However, in the same application the time skew across different devices  
> can  
> >> be huge (hours or even days). An obvious example of this, again using  
> >> connected cars as a representative example is the following: Car A is  
> >> recording data locally at 12:00 pm on Saturday but doesn't currently  
> have a  
> >> network connection. Car B is doing the same thing but does have a  
> network  
> >> connection. Car A will transmit it's data when the network comes back  
> on  
> >> line. Let's say this is at 4pm. Car B was transmitting it's data  
> >> immediately. This creates a huge time skew (4 hours) in the observed  
> >> datastream when looked at as a whole. However, the time skew in that  
> data  
> >> for Car A or Car B alone could be tiny. It will be out of order of  
> course  
> >> but maybe by only milliseconds or seconds.  
> >>  
> >> What the above means in the end for Flink is that the watermarks must be  
> >> delayed by up to 4 hours or more because we're looking at the data  
> stream  
> >> as a whole -- otherwise the data for Car A will be considered late. The  
> >> time skew in the data stream when looked at as a whole is large even  
> though  
> >> the time skew for any key may be tiny.  
> >>  
> >> This is the problem I would like to see a solution for. The basic idea  
> of  
> >> keeping track of watermarks and event time "per-key" rather than per  
> >> partition or subtask would solve I think both of these problems stated  
> >> above and both of these are real issues for production applications.  
> >>  
> >> The obvious downside of trying to do this per-key is that the amount of  
> >> state you have to track is much larger and potentially unbounded.  
> However,  
> >> I could see this approach working if the keyspace isn't growing rapidly  
> but  
> >> is stable or grows slowly. The saving grace here is that this may  
> actually  
> >> be true of the types of applications where this would be especially  
> >> useful. Think IoT use cases. Another approach to keeping state size in  
> >> check would be a configurable TTL for a key.  
> >>  
> >> Anyway, I'm throwing this out here on the mailing list in case anyone is  
> >> interested in this discussion, has thought about the problem deeply  
> >> already, has use cases of their own they've run into or has ideas for a  
> >> solution to this problem.  
> >>  
> >> Thanks for reading..  
> >>  
> >> -Jamie  
> >>  
> >>  
> >> --  
> >>  
> >> Jamie Grier  
> >> data Artisans, Director of Applications Engineering  
> >> @jamiegrier <https://twitter.com/jamiegrier>  
> >> ja...@data-artisans.com  
> >  
>  
>  

Reply via email to