Why not use upserts? Wouldn't that solve the issue of duplicates and there won't be a need to query database too?
On Sat, Apr 24, 2021, 8:12 PM David Anderson <dander...@apache.org> wrote: > What are the other techniques for bootstrapping rocksdb state? > > > Bootstrapping state involves somehow creating a snapshot (typically a > savepoint, but a retained checkpoint can be a better choice in some cases) > containing the necessary state -- meaning that the state has the same > operator uid and and state descriptor used by the real streaming job. > > You can do this by either: (1) running a variant of the live streaming job > against the data used for bootstrapping and taking a snapshot when the data > has been fully ingested, or (2) by using the State Processor API [1]. > You'll find a trivial example of the second approach in [2]. Once you have > a suitable snapshot, you can run your real job against it. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html > [2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf > > Regards, > David > > On Sat, Apr 24, 2021 at 3:01 PM Omngr <sosyalmedya.oguz...@gmail.com> > wrote: > >> Hi David, thank you for your response first! >> >> The state size is about 1 TB for now, but it will increase fastly, and >> also I can not use the TLL for states. It will grow indefinitely. >> What are the other techniques for bootstrapping rocksdb state? >> >> David Anderson <dander...@apache.org>, 24 Nis 2021 Cmt, 15:43 tarihinde >> şunu yazdı: >> >>> Oguzhan, >>> >>> Note, the state size is very large and I have to feed the state from >>>> batch flow firstly. Thus I can not use the internal state like rocksdb. >>> >>> >>> How large is "very large"? Using RocksDB, several users have reported >>> working with jobs using many TBs of state. >>> >>> And there are techniques for bootstrapping the state. That doesn't have >>> to be a showstopper. >>> >>> May be any bottleneck in that flow? I think to use asyncMap functions >>>> for state read/write operations. >>> >>> >>> That's a good reason to reconsider using Flink state. >>> >>> Regards, >>> David >>> >>> >>> >>> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır < >>> sosyalmedya.oguz...@gmail.com> wrote: >>> >>>> I'm trying to design a stream flow that checks *de-duplicate* events >>>> and sends them to the Kafka topic. >>>> >>>> Basically, flow looks like that; >>>> >>>> kafka (multiple topics) => flink (checking de-duplication and event >>>> enrichment) => kafka (single topic) >>>> >>>> For de-duplication, I'm thinking of using Cassandra as an external >>>> state store. The details of my job; >>>> >>>> I have an event payload with *uuid* Field. If the event that has the >>>> same uuid will come, this event should be discarded. In my case, two kafka >>>> topics are reading. The first topic has a lot of fields, but other topics >>>> just have a *uuid* field, thus I have to enrich data using the same >>>> uuid for the events coming from the second topic. >>>> >>>> Stream1: Messages reading from the first topic. Read state from >>>> Cassandra using the *uuid*. If a state exists, ignore this event and *do >>>> not* emit to the Kafka. If state does not exist, save this event to >>>> the Cassandra, then emit this event to the Kafka. >>>> >>>> Stream2: Messages reading from the second topic. Read state from >>>> Cassandra using the *uuid*. If state exists, check a column that >>>> represents this event came from topic2. If the value of this column is >>>> false, enrich the event using state and update the Cassandra column as >>>> true. If true, ignore this event because this event is a duplicate. >>>> >>>> def checkDeDuplication(event): Option[Event] = { >>>> val state = readFromCassandra(state) >>>> if (state exist) None //ignore this event >>>> else { >>>> saveEventToCassandra(event) >>>> Some(event) >>>> } >>>> } >>>> >>>> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = { >>>> val state = readFromCassandra(state) >>>> if (state does not exist) None //ignore this event >>>> else { >>>> if (state.flag == true) None // ignore this event >>>> else { >>>> updateFlagAsTrueInCassandra(event) >>>> Some(event) >>>> } >>>> } >>>> } >>>> >>>> >>>> val stream1 = >>>> readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate()) >>>> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent()) >>>> stream1.union(stream2).addSink(kafkaSink) >>>> >>>> 1- Is that a good approach? >>>> >>>> 2- Is Cassandra the right choice here? Note, the state size is very >>>> large and I have to feed the state from batch flow firstly. Thus I can not >>>> use the internal state like rocksdb. >>>> >>>> 3- Can i improve this logic? >>>> >>>> 4- May be any bottleneck in that flow? I think to use asyncMap >>>> functions for state read/write operations. >>>> >>>