Hi!

You have an interesting use case that I think comes up in many applications
(in fact I will be working on something very similar shortly).

Stephan has made some nice changes (this
<https://github.com/apache/flink/pull/1239> PR) to the State interfaces
supporting flexible backends, which can be used to implement this
functionality. While the new state backends support more efficient
checkpoints for KV states it still does not support incremental snapshots
and keeping out-of-core states.

My current plan is to implement a caching layer on top of the storage layer
(Cassandra in your case) which will be used to only keep the "hot" keys in
the Flink streaming operators and would evict the cold keys to the external
storage on checkpoints.

Some things we need to worry about this case is that we should only
overwrite elements in our storage when we know that a checkpoint is
complete. (we can use checkpointnotifications for this) This assumes some
sort of versioning in the storage layer.

I think this will definitely not make it to 0.10, but I am confident that
we will have something working for the next (1.0) release. I suggest you
try experimenting with the logic, maybe you can get something working much
quicker :)

Cheers,
Gyula

Krzysztof Zarzycki <k.zarzy...@gmail.com> ezt írta (időpont: 2015. okt.
14., Sze, 11:02):

> Hi guys!
> I'm sorry I have abandoned this thread but I had to give up Flink for some
> time. Now I'm back and would like to resurrect this thread. Flink has
> rapidly evolved in this time too, so maybe new features will allow me what
> I want to do. By the way, I heard really only good stuff about you from
> Flink Forward conference!
>
> First, about back-pressure. As you said, it is working well so I'm taking
> it as granted. Sounds great!
>
> Let's focus now on stateful processing:
>
> To back up what I mean, I'm citing some numbers of the state I'm currently
> holding:
> My stream processing program keeps around 300GB in 1 month state, but it
> will be holding around 2 months, so twice as much (600 GB). The state is
> key-value store, where key is some user id & value is actually a list of
> events correlated with the user. There are tens of millions of keys -
> unique user ids. The stream is partitioned on user id, so my state can be
> partitioned on user id as well.
> Currently I keep this "state" in Cassandra, so externally to the program,
> but this is my biggest pain as the communication cost is large, especially
> when I do reprocessing of my streaming data.
>
> Now what I would like to have is some abstraction available in Flink, that
> allows me to keep the state out-of-core, but embedded. I would use it as
> key-value store and Flink will journal & replicate all the update
> operations, so they are recoverable on failure, when the state (or its
> partition) is lost.
> To describe my idea in code, I imagine the following pseudocode (totally
> abstracted from Flink):
> class MyProcessor {
>   val keyValueState = Flink.createKeyValueState("name-it")
>
>   def processRecord(r: Record) {
>      val userList = keyValueState.get(r.get("userId"))
>      userList += r.get("someData")
>      keyValueState.put(r.get("userId"), userList)
>   }
> }
>
> Something similar is in Samza, with grants:
> - all puts are replicated (by saving the put in separate Kafka topic).
> - on failure & recover, the state is recovered from the saved puts, before
> starting the processing.
>
>
> Last time, you said that you're "working on making incrementally
> backed-up key/value state a first-class citizen in Flink, but is is still
> WIP".  How this change since then? Do you already support the case that I
> just described?
>
>
> Thanks for the idea of MapDB. I couldn't  find any benchmark of MapDB
> out-of-core performance , and I don't know yet if it can match performance
> of RocksDB-like database, but I will try to find time to check it.
> In meantime, this is the performance that attracts me to RocksDb:
>
>> Measure performance to load 1B keys into the database. The keys are
>> inserted in random order.
>>  rocksdb: 103 minutes, 80 MB/sec (total data size 481 GB, 1 billion
>> key-values)
>> Measure performance to load 1B keys into the database. The keys are
>> inserted in sequential order.
>> rocksdb: 36 minutes, 370 MB/sec (total data size 760 GB)
>
>
> [1] https://github.com/facebook/rocksdb/wiki/Performance-Benchmarks
>
> Cheers!
> Krzysiek
>
> 2015-06-30 15:00 GMT+02:00 Ufuk Celebi <u...@apache.org>:
>
>>
>> On 30 Jun 2015, at 14:23, Gyula Fóra <gyula.f...@gmail.com> wrote:
>> > 2. We have support for stateful processing in Flink in many ways you
>> have described in your question. Unfortunately the docs are down currently
>> but you should check out the 'Stateful processing' section in the 0.10 docs
>> (once its back online). We practically support an OperatorState interface
>> which let's you keep partitioned state by some key and access it from
>> runtime operators. The states declared using these interfaces are
>> checkpointed and will be restored on failure. Currently all the states are
>> stored in-memory but we are planning to extend it to allow writing state
>> updates to external systems.
>>
>>
>> http://flink.apache.org/docs/master/apis/streaming_guide.html#stateful-computation
>
>
>

Reply via email to