> On Jul 22, 2016, at 2:54 AM, Josh <jof...@gmail.com> wrote: > > Hi all, > > >(1) Only write to the DB upon a checkpoint, at which point it is known that > >no replay of that data will occur any more. Values from partially successful > >writes will be overwritten >with correct value. I assume that is what you > >thought of when referring to the State Backend, because in some sense, that > >is what that state backend would do.
I feel the problem is about how to commit all snapshots as a transaction. Partial writes pose cleanup challenges when job restore. A easy hack would be treat Rocksdb as cache and keep states updates there. Aka aemanifest. do cleanup check before actual restore. > > >I think it is simpler to realize that in a custom sink, than developing a > >new state backend. Another Flink committer (Chesnay) has developed some > >nice tooling for that, to >be merged into Flink soon. > > I am planning to implement something like this: > > Say I have a topology which looks like this: [source => operator => sink], I > would like it to work like this: > 1. Upon receiving an element, the operator retrieves some state from an > external key-value store (would like to put an in-memory cache on top of this > with a TTL) > 2. The operator emits a new state (and updates its in-memory cache with the > new state) > 3. The sink batches up all the new states and upon checkpoint flushes them to > the external store > > Could anyone point me at the work that's already been done on this? Has it > already been merged into Flink? > > Thanks, > Josh > >> On Thu, Apr 7, 2016 at 12:53 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> Hi, >> regarding windows and incremental aggregation. This is already happening in >> Flink as of now. When you give a ReduceFunction on a window, which "sum" >> internally does, the result for a window is incrementally updated whenever a >> new element comes in. This incremental aggregation only happens when you >> specify a ReduceFunction or a FoldFunction, not for the general case of a >> WindowFunction, where all elements in the window are required. >> >> You are right about incremental snapshots. We mainly want to introduce them >> to reduce latency incurred by snapshotting. Right now, processing stalls >> when a checkpoint happens. >> >> Cheers, >> Aljoscha >> >>> On Thu, 7 Apr 2016 at 13:12 Shannon Carey <sca...@expedia.com> wrote: >>> Thanks very kindly for your response, Stephan! >>> >>> We will definitely use a custom sink for persistence of idempotent >>> mutations whenever possible. Exposing state as read-only to external >>> systems is a complication we will try to avoid. Also, we will definitely >>> only write to the DB upon checkpoint, and the write will be synchronous and >>> transactional (no possibility of partial success/failure). >>> >>> However, we do want Flink state to be durable, we want it to be in memory >>> when possible, and we want to avoid running out of memory due to the size >>> of the state. For example, if you have a wide window that hasn't gotten an >>> event for a long time, we want to evict that window state from memory. >>> We're now thinking of using Redis (via AWS Elasticache) which also >>> conveniently has TTL, instead of DynamoDB. >>> >>> I just wanted to check whether eviction of (inactive/quiet) state from >>> memory is something that I should consider implementing, or whether Flink >>> already had some built-in way of doing it. >>> >>> Along the same lines, I am also wondering whether Flink already has means >>> of compacting the state of a window by applying an aggregation function to >>> the elements so-far (eg. every time window is triggered)? For example, if >>> you are only executing a sum on the contents of the window, the window >>> state doesn't need to store all the individual items in the window, it only >>> needs to store the sum. Aggregations other than "sum" might have that >>> characteristic too. I don't know if Flink is already that intelligent or >>> whether I should figure out how to aggregate window contents myself when >>> possible with something like a window fold? Another poster (Aljoscha) was >>> talking about adding incremental snapshots, but it sounds like that would >>> only improve the write throughput not the memory usage. >>> >>> Thanks again! >>> Shannon Carey >>> >>> >>> From: Stephan Ewen <se...@apache.org> >>> Date: Wednesday, April 6, 2016 at 10:37 PM >>> To: <user@flink.apache.org> >>> Subject: Re: State in external db (dynamodb) >>> >>> Hi Shannon! >>> >>> Welcome to the Flink community! >>> >>> You are right, sinks need in general to be idempotent if you want >>> "exactly-once" semantics, because there can be a replay of elements that >>> were already written. >>> >>> However, what you describe later, overwriting of a key with a new value (or >>> the same value again) is pretty much sufficient. That means that when a >>> duplicate write happens during replay, the value for the key is simply >>> overwritten with the same value again. >>> As long as all computation is purely in Flink and you only write to the >>> key/value store (rather than read from k/v, modify in Flink, write to k/v), >>> you get the consistency that for example counts/aggregates never have >>> duplicates. >>> >>> If Flink needs to look up state from the database (because it is no longer >>> in Flink), it is a bit more tricky. I assume that is where you are going >>> with "Subsequently, when an event is processed, we must be able to quickly >>> load up any evicted state". In that case, there are two things you can do: >>> >>> (1) Only write to the DB upon a checkpoint, at which point it is known >>> that no replay of that data will occur any more. Values from partially >>> successful writes will be overwritten with correct value. I assume that is >>> what you thought of when referring to the State Backend, because in some >>> sense, that is what that state backend would do. >>> >>> I think it is simpler to realize that in a custom sink, than developing a >>> new state backend. Another Flink committer (Chesnay) has developed some >>> nice tooling for that, to be merged into Flink soon. >>> >>> (2) You could attach version numbers to every write, and increment the >>> versions upon each checkpoint. That allows you to always refer to a >>> consistent previous value, if some writes were made, but a failure occurred >>> before the checkpoint completed. >>> >>> I hope these answers apply to your case. Let us know if some things are >>> still unclear, or if I misunderstood your question! >>> >>> >>> Greetings, >>> Stephan >>> >>> >>> >>>> On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever >>>> <sanne.de.roe...@gmail.com> wrote: >>>> FYI Cassandra has a TTL on data: >>>> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html >>>> >>>>> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sca...@expedia.com> wrote: >>>>> Hi, new Flink user here! >>>>> >>>>> I found a discussion on user@flink.apache.org about using DynamoDB as a >>>>> sink. However, as noted, sinks have an at-least-once guarantee so your >>>>> operations must idempotent. >>>>> >>>>> However, another way to go about this (and correct me if I'm wrong) is to >>>>> write the state to the external store via a custom State Backend. Since >>>>> the state participates in checkpointing, you don't have to worry about >>>>> idempotency: every time state is checkpointed, overwrite the value of >>>>> that key. >>>>> >>>>> We are starting a project with Flink, and we are interested in evicting >>>>> the state from memory once a TTL is reached during which no events have >>>>> come in for that state. Subsequently, when an event is processed, we must >>>>> be able to quickly load up any evicted state. Does this sound reasonable? >>>>> We are considering using DynamoDB for our state backend because it seems >>>>> like all we will need is a key-value store. The only weakness of this is >>>>> that if state gets older than, say, 2 years we would like to get rid of >>>>> it which might not be easy in DynamoDB. I don't suppose Flink has any >>>>> behind-the-scenes features that deal with getting rid of old state >>>>> (either evicting from memory or TTL/aging out entirely)? >>>>> >>>>> Thanks for your time! >>>>> Shannon Carey >>>> >>> >