Why not use upserts? Wouldn't that solve the issue of duplicates and there
won't be a need to query database too?

On Sat, Apr 24, 2021, 8:12 PM David Anderson <dander...@apache.org> wrote:

> What are the other techniques for bootstrapping rocksdb state?
>
>
> Bootstrapping state involves somehow creating a snapshot (typically a
> savepoint, but a retained checkpoint can be a better choice in some cases)
> containing the necessary state -- meaning that the state has the same
> operator uid and and state descriptor used by the real streaming job.
>
> You can do this by either: (1) running a variant of the live streaming job
> against the data used for bootstrapping and taking a snapshot when the data
> has been fully ingested, or (2) by using the State Processor API [1].
> You'll find a trivial example of the second approach in [2]. Once you have
> a suitable snapshot, you can run your real job against it.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html
> [2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf
>
> Regards,
> David
>
> On Sat, Apr 24, 2021 at 3:01 PM Omngr <sosyalmedya.oguz...@gmail.com>
> wrote:
>
>> Hi David, thank you for your response first!
>>
>> The state size is about 1 TB for now, but it will increase fastly, and
>> also I can not use the TLL for states. It will grow indefinitely.
>> What are the other techniques for bootstrapping rocksdb state?
>>
>> David Anderson <dander...@apache.org>, 24 Nis 2021 Cmt, 15:43 tarihinde
>> şunu yazdı:
>>
>>> Oguzhan,
>>>
>>> Note, the state size is very large and I have to feed the state from
>>>> batch flow firstly. Thus I can not use the internal state like rocksdb.
>>>
>>>
>>> How large is "very large"? Using RocksDB, several users have reported
>>> working with jobs using many TBs of state.
>>>
>>> And there are techniques for bootstrapping the state. That doesn't have
>>> to be a showstopper.
>>>
>>> May be any bottleneck in that flow? I think to use asyncMap functions
>>>> for state read/write operations.
>>>
>>>
>>> That's a good reason to reconsider using Flink state.
>>>
>>> Regards,
>>> David
>>>
>>>
>>>
>>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır <
>>> sosyalmedya.oguz...@gmail.com> wrote:
>>>
>>>> I'm trying to design a stream flow that checks *de-duplicate* events
>>>> and sends them to the Kafka topic.
>>>>
>>>> Basically, flow looks like that;
>>>>
>>>> kafka (multiple topics) =>  flink (checking de-duplication and event
>>>> enrichment) => kafka (single topic)
>>>>
>>>> For de-duplication, I'm thinking of using Cassandra as an external
>>>> state store. The details of my job;
>>>>
>>>> I have an event payload with *uuid* Field. If the event that has the
>>>> same uuid will come, this event should be discarded. In my case, two kafka
>>>> topics are reading. The first topic has a lot of fields, but other topics
>>>> just have a *uuid* field, thus I have to enrich data using the same
>>>> uuid for the events coming from the second topic.
>>>>
>>>> Stream1: Messages reading from the first topic. Read state from
>>>> Cassandra using the *uuid*. If a state exists, ignore this event and *do
>>>> not* emit to the Kafka. If state does not exist, save  this event to
>>>> the Cassandra, then emit this event to the Kafka.
>>>>
>>>> Stream2: Messages reading from the second topic. Read state from
>>>> Cassandra using the *uuid*. If state exists, check a column that
>>>> represents this event came from topic2. If the value of this column is
>>>> false, enrich the event using state and update the Cassandra column as
>>>> true. If true, ignore this event because this event is a duplicate.
>>>>
>>>> def checkDeDuplication(event): Option[Event] = {
>>>>   val state = readFromCassandra(state)
>>>>   if (state exist) None //ignore this event
>>>>   else {
>>>>     saveEventToCassandra(event)
>>>>     Some(event)
>>>>   }
>>>> }
>>>>
>>>> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>>>>       val state = readFromCassandra(state)
>>>>       if (state does not exist) None //ignore this event
>>>>       else {
>>>>         if (state.flag == true) None // ignore this event
>>>>         else {
>>>>            updateFlagAsTrueInCassandra(event)
>>>>            Some(event)
>>>>         }
>>>>       }
>>>>     }
>>>>
>>>>
>>>> val stream1 = 
>>>> readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
>>>> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
>>>> stream1.union(stream2).addSink(kafkaSink)
>>>>
>>>> 1- Is that a good approach?
>>>>
>>>> 2- Is Cassandra the right choice here? Note, the state size is very
>>>> large and I have to feed the state from batch flow firstly. Thus I can not
>>>> use the internal state like rocksdb.
>>>>
>>>> 3- Can i improve this logic?
>>>>
>>>> 4- May be any bottleneck in that flow? I think to use asyncMap
>>>> functions for state read/write operations.
>>>>
>>>

Reply via email to