Hi Shannon!

Welcome to the Flink community!

You are right, sinks need in general to be idempotent if you want
"exactly-once" semantics, because there can be a replay of elements that
were already written.

However, what you describe later, overwriting of a key with a new value (or
the same value again) is pretty much sufficient. That means that when a
duplicate write happens during replay, the value for the key is simply
overwritten with the same value again.
As long as all computation is purely in Flink and you only write to the
key/value store (rather than read from k/v, modify in Flink, write to k/v),
you get the consistency that for example counts/aggregates never have
duplicates.

If Flink needs to look up state from the database (because it is no longer
in Flink), it is a bit more tricky. I assume that is where you are going
with "Subsequently, when an event is processed, we must be able to quickly
load up any evicted state".  In that case, there are two things you can do:

(1)  Only write to the DB upon a checkpoint, at which point it is known
that no replay of that data will occur any more. Values from partially
successful writes will be overwritten with correct value. I assume that is
what you thought of when referring to the State Backend, because in some
sense, that is what that state backend would do.

I think it is simpler to realize that in a custom sink, than developing a
new state backend.  Another Flink committer (Chesnay) has developed some
nice tooling for that, to be merged into Flink soon.

(2) You could attach version numbers to every write, and increment the
versions upon each checkpoint. That allows you to always refer to a
consistent previous value, if some writes were made, but a failure occurred
before the checkpoint completed.

I hope these answers apply to your case. Let us know if some things are
still unclear, or if I misunderstood your question!


Greetings,
Stephan



On Wed, Apr 6, 2016 at 8:14 AM, Sanne de Roever <sanne.de.roe...@gmail.com>
wrote:

> FYI Cassandra has a TTL on data:
> https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_t.html
>
> On Wed, Apr 6, 2016 at 7:55 AM, Shannon Carey <sca...@expedia.com> wrote:
>
>> Hi, new Flink user here!
>>
>> I found a discussion on user@flink.apache.org about using DynamoDB as a
>> sink. However, as noted, sinks have an at-least-once guarantee so your
>> operations must idempotent.
>>
>> However, another way to go about this (and correct me if I'm wrong) is to
>> write the state to the external store via a custom State Backend. Since the
>> state participates in checkpointing, you don't have to worry about
>> idempotency: every time state is checkpointed, overwrite the value of that
>> key.
>>
>> We are starting a project with Flink, and we are interested in evicting
>> the state from memory once a TTL is reached during which no events have
>> come in for that state. Subsequently, when an event is processed, we must
>> be able to quickly load up any evicted state. Does this sound reasonable?
>> We are considering using DynamoDB for our state backend because it seems
>> like all we will need is a key-value store. The only weakness of this is
>> that if state gets older than, say, 2 years we would like to get rid of it
>> which might not be easy in DynamoDB. I don't suppose Flink has any
>> behind-the-scenes features that deal with getting rid of old state (either
>> evicting from memory or TTL/aging out entirely)?
>>
>> Thanks for your time!
>> Shannon Carey
>>
>
>

Reply via email to