Thinking about this a bit more...

I think it may be interesting to enable two modes for event-time
advancement in Flink

1) The current mode which I'll call partition-based, pessimistic,
event-time advancement
2) Key-based, eager, event-time advancement

In this key-based eager mode it's actually quite simple and it basically
becomes a completely local thing as Paris stated.  In this mode you would
advance event time, per-key, along with the maximum (adjusted) timestamp
you've seen rather than the minimum.  So the current event time at any node
for some key is simply the maximum timestamp you've seen - adjusted (like
now) with the logic from a timestamp extractor -- for example the
BoundedOutOfOrderness extractor.  This is very simple and could possibly
work well as long as the delay used in the event-time calculation is enough
to adjust for the real time skew you're likely to observe for any key.

I wonder how well this might work in practice?

On Tue, Feb 28, 2017 at 6:22 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> @Tzu-Li Yes, the deluxe stream would not allow another keyBy(). Or we could
> allow it but then we would exit the world of the deluxe stream and per-key
> watermarks and go back to the realm of normal streams and keyed streams.
>
> On Tue, 28 Feb 2017 at 10:08 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
> > Throwing in some thoughts:
> >
> > When a source determines that no more data will come for a key (which
> > in itself is a bit of a tricky problem) then it should signal to
> > downstream
> > operations to take the key out of watermark calculations, that is that we
> > can release some space.
> > I don’t think this is possible without exposing API for the UDF to signal
> > there will be no more data for a specific key. We could detect idleness
> of
> > a key at the source operator, but without any help from user logic,
> > essentially it can only be seen as "temporarily idle", which is not
> helpful
> > in reducing the state as the watermark state for that key still needs to
> be
> > kept downstream.
> >
> > So to achieve this, I think the only option would be to expose new APIs
> > here too.
> >
> > It’s like how we recently exposed a new `markAsTemporarilyIdle` method in
> > the SourceFunction.SourceContext interface, but instead a
> > `markKeyTerminated` that must be called by the source UDF to be able to
> > save state space and have no feasible fallback detection strategy.
> >
> > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> > input
> > .map()
> > .window(...) // notice that we don't need keyBy because it is implicit
> > .reduce(...)
> > .map(...)
> > .window(...)
> > ...
> >
> > Would this mean that another `keyBy` isn’t allowed downstream? Or still
> > allowed, but we’re using the keys in `DeluxeKeyedStream` as the “meta
> key”
> > to track key lineage?
> >
> > On February 27, 2017 at 9:37:27 PM, Aljoscha Krettek (
> aljos...@apache.org)
> > wrote:
> >
> > This is indeed an interesting topic, thanks for starting the discussion,
> > Jamie!
> >
> > I now thought about this for a while, since more and more people seem to
> be
> > asking about it lately. First, I thought that per-key watermark handling
> > would not be necessary because it can be done locally (as Paris
> suggested),
> > then I realised that that's not actually the case and thought that this
> > wouldn't be possible. In the end, I came to realise that it is indeed
> > possible (with some caveats), although with a huge overhead in the amount
> > of state that we have to keep and with changes to our API. I'll try and
> > walk you through my thought process.
> >
> > Let's first look at local watermark tracking, that is, tracking the
> > watermark locally at the operator that needs it, for example a
> > WindowOperator. I initially thought that this would be sufficient. Assume
> > we have a pipeline like this:
> >
> > Source -> KeyBy -> WindowOperator -> ...
> >
> > If we have parallelism=1, then all elements for a given key k will be
> read
> > by the same source operator instance and they will arrive (in-order) at
> the
> > WindowOperator. It doesn't matter whether we track the per-key watermarks
> > at the Source or at the WindowOperator because we see the same elements
> in
> > the same order at each operator, per key.
> >
> > Now, think about this pipeline:
> >
> > Source1 --+
> > |-> Union -> KeyBy -> WindowOperator -> ...
> > Source2 --+
> >
> > (you can either think about two sources or once source that has several
> > parallel instances, i.e. parallelism > 1)
> >
> > Here, both Source1 and Source2 can emit elements with our key k. If
> Source1
> > is faster than Source2 and the watermarking logic at the WindowOperator
> > determines the watermark based on the incoming element timestamps (for
> > example, using the BoundedLatenessTimestampExtractor) then the elements
> > coming from Source2 will be considered late at the WindowOperator.
> >
> > From this we know that our WindowOperator needs to calculate the
> watermark
> > similarly to how watermark calculation currently happens in Flink: the
> > watermark is the minimum of the watermark of all upstream operations. In
> > this case it would be: the minimum upstream watermarks of operations that
> > emit elements with key k. For per-partition watermarks this works because
> > the number of upstream operations is know and we simply keep an array
> that
> > has the current upstream watermark for each input operation. For per-key
> > watermarks this would mean that we have to keep k*u upstream watermarks
> > where u is the number of upstream operations. This can be quite large.
> > Another problem is that the observed keys change, i.e. the key space is
> > evolving and we need to retire keys from our calculations lest we run out
> > of space.
> >
> > We could find a solution based on a feature we recently introduced in
> > Flink: https://github.com/apache/flink/pull/2801. The sources keep track
> > of
> > whether they have input and signal to downstream operations whether they
> > should be included in the watermark calculation logic. A similar thing
> > could be done per-key, where each source signals to downstream operations
> > that there is a new key and that we should start calculating watermarks
> for
> > this. When a source determines that no more data will come for a key
> (which
> > in itself is a bit of a tricky problem) then it should signal to
> downstream
> > operations to take the key out of watermark calculations, that is that we
> > can release some space.
> >
> > The above is analysing, on a purely technical level, the feasibility of
> > such a feature. I think it is feasible but can be very expensive in terms
> > of state size requirements. Gabor also pointed this out above and gave a
> > few suggestions on reducing that size.
> >
> > We would also need to change our API to allow tracking the lineage of
> keys
> > or to enforce that a key stays the same throughout a pipeline. Consider
> > this pipeline:
> >
> > Source -> KeyBy1 -> WindowOperator -> KeyBy2 -> WindowOperator
> >
> > where KeyBy1 and KeyBy2 extract a different key, respectively. How would
> > watermarks be tracked across this change of keys? Would we know which of
> > the prior keys and up being keys according to KeyBy2, i.e. do we have
> some
> > kind of key lineage information?
> >
> > One approach for solving this would be to introduce a new API that allows
> > extracting a key at the source and will keep this key on the elements
> until
> > the sink. For example:
> >
> > DeluxeKeyedStream input = env.addSourceWithKey(source, keyExtractor);
> > input
> > .map()
> > .window(...) // notice that we don't need keyBy because it is implicit
> > .reduce(...)
> > .map(...)
> > .window(...)
> > ...
> >
> > The DeluxeKeyedStream (name preliminary ;-) would allow the operations
> that
> > we today have on KeyedStream and on DataStream and it would always
> maintain
> > the key that was assigned at the sources. The result of each operation
> > would again be a DeluxeKeyedStream. This way, we could track watermarks
> per
> > key.
> >
> > I know it's a bit of a (very) lengthy mail, but what do you think?
> >
> >
> > On Thu, 23 Feb 2017 at 11:14 Gábor Hermann <m...@gaborhermann.com>
> wrote:
> >
> > > Hey all,
> > >
> > > Let me share some ideas about this.
> > >
> > > @Paris: The local-only progress tracking indeed seems easier, we do not
> > > need to broadcast anything. Implementation-wise it is easier, but
> > > performance-wise probably not. If one key can come from multiple
> > > sources, there could be a lot more network overhead with per-key
> > > tracking then broadcasting, somewhat paradoxically. Say source instance
> > > S1 sends messages and watermarks to operator instances O1, O2. In the
> > > broadcasting case, S1 would send one message to O1 and one to O2 per
> > > watermark (of course it depends on how fast the watermarks arrive),
> > > total of 2. Although, if we keep track of per-key watermarks, S1 would
> > > need to send watermarks for every key directed to O1, also for O2. So
> if
> > > 10 keys go from S1 to O1, and 10 keys from S1 to O2, then (if
> watermarks
> > > arrive at the same rate per-key as per-source in the previous case) we
> > > S1 would send a total of 20 watermarks.
> > >
> > > Another question is whether how large the state-per-key is? If it's
> > > really small (an integer maybe, or state of a small state machine),
> then
> > > the overhead of keeping track of a (Long) watermark is large
> > > memory-wise. E.g. Int state vs. Long watermark results in 3x as large
> > > state. Also, the checkpointing would be ~3x as slow. Of course, for
> > > large states a Long watermark would not mean much overhead.
> > >
> > > We could resolve the memory issue by using some kind of sketch data
> > > structure. Right now the granularity of watermark handling is
> > > per-operator-instance. On the other hand, per-key granularity might be
> > > costly. What if we increased the granularity of watermarks inside an
> > > operator by keeping more than one watermark tracker in one operator?
> > > This could be quite simply done with a hash table. With a hash table of
> > > size 1, we would yield the current semantics (per-operator-instance
> > > granularity). With a hash table large enough to have at most one key
> per
> > > bucket, we would yield per-key watermark tracking. In between lies the
> > > trade-off between handling time-skew and a lot of memory overhead. This
> > > does not seem hard to implement.
> > >
> > > Of course, at some point we would still need to take care of watermarks
> > > per-key. Imagine that keys A and B would go to the same bucket of the
> > > hash table, and watermarks are coming in like this: (B,20), (A,10),
> > > (A,15), (A,40). Then the watermark of the bucket should be the minimum
> > > as time passes, i.e. 0, 10, 15, 20. For this we need to keep track of
> > > the watermarks of A and B separately. But after we have a correct
> > > watermark for the bucket, all we need to care about is the bucket
> > > watermarks. So somewhere (most probably at the source) we would have to
> > > pay memory overhead of tracking every key, but nowhere else in the
> > > topology.
> > >
> > > Regarding the potentially large network overhead, the same compression
> > > could be useful. I.e. we would not send watermarks from one operator
> > > per-key, but rather per-hash. Again, the trade-off between time skew
> and
> > > memory consumption is configurable by the size of the hash table used.
> > >
> > > Cheers,
> > > Gabor
> > >
> > > On 2017-02-23 08:57, Paris Carbone wrote:
> > >
> > > > Hey Jamie!
> > > >
> > > > Key-based progress tracking sounds like local-only progress tracking
> to
> > > me, there is no need to use a low watermarking mechanism at all since
> all
> > > streams of a key are handled by a single partition at a time (per
> > operator).
> > > > Thus, this could be much easier to implement and support (i.e., no
> need
> > > to broadcast the progress state of each partition all the time).
> > > > State-wise it should be fine too if it is backed by rocksdb,
> especially
> > > if we have MapState in the future.
> > > >
> > > > Just my quick thoughts on this, to get the discussion going :)
> > > >
> > > > cheers
> > > > Paris
> > > >
> > > >> On 23 Feb 2017, at 01:01, Jamie Grier <ja...@data-artisans.com>
> > wrote:
> > > >>
> > > >> Hi Flink Devs,
> > > >>
> > > >> Use cases that I see quite frequently in the real world would
> benefit
> > > from
> > > >> a different watermarking / event time model than the one currently
> > > >> implemented in Flink.
> > > >>
> > > >> I would call Flink's current approach partition-based watermarking
> or
> > > maybe
> > > >> subtask-based watermarking. In this model the current "event time"
> is
> > a
> > > >> property local to each subtask instance in a dataflow graph. The
> event
> > > >> time at any subtask is the minimum of the watermarks it has received
> > on
> > > >> each of it's input streams.
> > > >>
> > > >> There are a couple of issues with this model that are not optimal
> for
> > > some
> > > >> (maybe many) use cases.
> > > >>
> > > >> 1) A single slow subtask (or say source partition) anywhere in the
> > > dataflow
> > > >> can mean no progress can be made on the computation at all.
> > > >>
> > > >> 2) In many real world scenarios the time skew across keys can be
> > *many*
> > > >> times greater than the time skew within the data with the same key.
> > > >>
> > > >> In this discussion I'll use "time skew" to refer to the
> > out-of-orderness
> > > >> with respect to timestamp of the data. Out-of-orderness is a
> mouthful
> > > ;)
> > > >>
> > > >> Anyway, let me provide an example or two.
> > > >>
> > > >> In IoT applications the source of events is a particular device out
> in
> > > the
> > > >> world, let's say a device in a connected car application. The data
> for
> > > >> some particular device may be very bursty and we will certainly get
> > > events
> > > >> from these devices in Flink out-of-order just because of things like
> > > >> partitions in Kafka, shuffles in Flink, etc. However, the time skew
> in
> > > the
> > > >> data for a single device should likely be very small (milliseconds
> or
> > > maybe
> > > >> seconds)..
> > > >>
> > > >> However, in the same application the time skew across different
> > devices
> > > can
> > > >> be huge (hours or even days). An obvious example of this, again
> using
> > > >> connected cars as a representative example is the following: Car A
> is
> > > >> recording data locally at 12:00 pm on Saturday but doesn't currently
> > > have a
> > > >> network connection. Car B is doing the same thing but does have a
> > > network
> > > >> connection. Car A will transmit it's data when the network comes
> back
> > > on
> > > >> line. Let's say this is at 4pm. Car B was transmitting it's data
> > > >> immediately. This creates a huge time skew (4 hours) in the observed
> > > >> datastream when looked at as a whole. However, the time skew in that
> > > data
> > > >> for Car A or Car B alone could be tiny. It will be out of order of
> > > course
> > > >> but maybe by only milliseconds or seconds.
> > > >>
> > > >> What the above means in the end for Flink is that the watermarks
> must
> > be
> > > >> delayed by up to 4 hours or more because we're looking at the data
> > > stream
> > > >> as a whole -- otherwise the data for Car A will be considered late.
> > The
> > > >> time skew in the data stream when looked at as a whole is large even
> > > though
> > > >> the time skew for any key may be tiny.
> > > >>
> > > >> This is the problem I would like to see a solution for. The basic
> idea
> > > of
> > > >> keeping track of watermarks and event time "per-key" rather than per
> > > >> partition or subtask would solve I think both of these problems
> stated
> > > >> above and both of these are real issues for production applications.
> > > >>
> > > >> The obvious downside of trying to do this per-key is that the amount
> > of
> > > >> state you have to track is much larger and potentially unbounded.
> > > However,
> > > >> I could see this approach working if the keyspace isn't growing
> > rapidly
> > > but
> > > >> is stable or grows slowly. The saving grace here is that this may
> > > actually
> > > >> be true of the types of applications where this would be especially
> > > >> useful. Think IoT use cases. Another approach to keeping state size
> in
> > > >> check would be a configurable TTL for a key.
> > > >>
> > > >> Anyway, I'm throwing this out here on the mailing list in case
> anyone
> > is
> > > >> interested in this discussion, has thought about the problem deeply
> > > >> already, has use cases of their own they've run into or has ideas
> for
> > a
> > > >> solution to this problem.
> > > >>
> > > >> Thanks for reading..
> > > >>
> > > >> -Jamie
> > > >>
> > > >>
> > > >> --
> > > >>
> > > >> Jamie Grier
> > > >> data Artisans, Director of Applications Engineering
> > > >> @jamiegrier <https://twitter.com/jamiegrier>
> > > >> ja...@data-artisans.com
> > > >
> > >
> > >
> >
>



-- 

Jamie Grier
data Artisans, Director of Applications Engineering
@jamiegrier <https://twitter.com/jamiegrier>
ja...@data-artisans.com

Reply via email to