>
> What are the other techniques for bootstrapping rocksdb state?

Bootstrapping state involves somehow creating a snapshot (typically a
savepoint, but a retained checkpoint can be a better choice in some cases)
containing the necessary state -- meaning that the state has the same
operator uid and and state descriptor used by the real streaming job.

You can do this by either: (1) running a variant of the live streaming job
against the data used for bootstrapping and taking a snapshot when the data
has been fully ingested, or (2) by using the State Processor API [1].
You'll find a trivial example of the second approach in [2]. Once you have
a suitable snapshot, you can run your real job against it.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
[2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf

Regards,
David

On Sat, Apr 24, 2021 at 3:01 PM Omngr <sosyalmedya.oguz...@gmail.com> wrote:

> Hi David, thank you for your response first!
>
> The state size is about 1 TB for now, but it will increase fastly, and
> also I can not use the TLL for states. It will grow indefinitely.
> What are the other techniques for bootstrapping rocksdb state?
>
> David Anderson <dander...@apache.org>, 24 Nis 2021 Cmt, 15:43 tarihinde
> şunu yazdı:
>
>> Oguzhan,
>>
>> Note, the state size is very large and I have to feed the state from
>>> batch flow firstly. Thus I can not use the internal state like rocksdb.
>>
>>
>> How large is "very large"? Using RocksDB, several users have reported
>> working with jobs using many TBs of state.
>>
>> And there are techniques for bootstrapping the state. That doesn't have
>> to be a showstopper.
>>
>> May be any bottleneck in that flow? I think to use asyncMap functions for
>>> state read/write operations.
>>
>>
>> That's a good reason to reconsider using Flink state.
>>
>> Regards,
>> David
>>
>>
>>
>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
>> sosyalmedya.oguz...@gmail.com> wrote:
>>
>>> I'm trying to design a stream flow that checks *de-duplicate* events
>>> and sends them to the Kafka topic.
>>>
>>> Basically, flow looks like that;
>>>
>>> kafka (multiple topics) =>  flink (checking de-duplication and event
>>> enrichment) => kafka (single topic)
>>>
>>> For de-duplication, I'm thinking of using Cassandra as an external state
>>> store. The details of my job;
>>>
>>> I have an event payload with *uuid* Field. If the event that has the
>>> same uuid will come, this event should be discarded. In my case, two kafka
>>> topics are reading. The first topic has a lot of fields, but other topics
>>> just have a *uuid* field, thus I have to enrich data using the same
>>> uuid for the events coming from the second topic.
>>>
>>> Stream1: Messages reading from the first topic. Read state from
>>> Cassandra using the *uuid*. If a state exists, ignore this event and *do
>>> not* emit to the Kafka. If state does not exist, save  this event to
>>> the Cassandra, then emit this event to the Kafka.
>>>
>>> Stream2: Messages reading from the second topic. Read state from
>>> Cassandra using the *uuid*. If state exists, check a column that
>>> represents this event came from topic2. If the value of this column is
>>> false, enrich the event using state and update the Cassandra column as
>>> true. If true, ignore this event because this event is a duplicate.
>>>
>>> def checkDeDuplication(event): Option[Event] = {
>>>   val state = readFromCassandra(state)
>>>   if (state exist) None //ignore this event
>>>   else {
>>>     saveEventToCassandra(event)
>>>     Some(event)
>>>   }
>>> }
>>>
>>> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>>>       val state = readFromCassandra(state)
>>>       if (state does not exist) None //ignore this event
>>>       else {
>>>         if (state.flag == true) None // ignore this event
>>>         else {
>>>            updateFlagAsTrueInCassandra(event)
>>>            Some(event)
>>>         }
>>>       }
>>>     }
>>>
>>>
>>> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
>>> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
>>> stream1.union(stream2).addSink(kafkaSink)
>>>
>>> 1- Is that a good approach?
>>>
>>> 2- Is Cassandra the right choice here? Note, the state size is very
>>> large and I have to feed the state from batch flow firstly. Thus I can not
>>> use the internal state like rocksdb.
>>>
>>> 3- Can i improve this logic?
>>>
>>> 4- May be any bottleneck in that flow? I think to use asyncMap functions
>>> for state read/write operations.
>>>
>>

Reply via email to