> > What are the other techniques for bootstrapping rocksdb state?
Bootstrapping state involves somehow creating a snapshot (typically a savepoint, but a retained checkpoint can be a better choice in some cases) containing the necessary state -- meaning that the state has the same operator uid and and state descriptor used by the real streaming job. You can do this by either: (1) running a variant of the live streaming job against the data used for bootstrapping and taking a snapshot when the data has been fully ingested, or (2) by using the State Processor API [1]. You'll find a trivial example of the second approach in [2]. Once you have a suitable snapshot, you can run your real job against it. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html [2] https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf Regards, David On Sat, Apr 24, 2021 at 3:01 PM Omngr <sosyalmedya.oguz...@gmail.com> wrote: > Hi David, thank you for your response first! > > The state size is about 1 TB for now, but it will increase fastly, and > also I can not use the TLL for states. It will grow indefinitely. > What are the other techniques for bootstrapping rocksdb state? > > David Anderson <dander...@apache.org>, 24 Nis 2021 Cmt, 15:43 tarihinde > şunu yazdı: > >> Oguzhan, >> >> Note, the state size is very large and I have to feed the state from >>> batch flow firstly. Thus I can not use the internal state like rocksdb. >> >> >> How large is "very large"? Using RocksDB, several users have reported >> working with jobs using many TBs of state. >> >> And there are techniques for bootstrapping the state. That doesn't have >> to be a showstopper. >> >> May be any bottleneck in that flow? I think to use asyncMap functions for >>> state read/write operations. >> >> >> That's a good reason to reconsider using Flink state. >> >> Regards, >> David >> >> >> >> On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır < >> sosyalmedya.oguz...@gmail.com> wrote: >> >>> I'm trying to design a stream flow that checks *de-duplicate* events >>> and sends them to the Kafka topic. >>> >>> Basically, flow looks like that; >>> >>> kafka (multiple topics) => flink (checking de-duplication and event >>> enrichment) => kafka (single topic) >>> >>> For de-duplication, I'm thinking of using Cassandra as an external state >>> store. The details of my job; >>> >>> I have an event payload with *uuid* Field. If the event that has the >>> same uuid will come, this event should be discarded. In my case, two kafka >>> topics are reading. The first topic has a lot of fields, but other topics >>> just have a *uuid* field, thus I have to enrich data using the same >>> uuid for the events coming from the second topic. >>> >>> Stream1: Messages reading from the first topic. Read state from >>> Cassandra using the *uuid*. If a state exists, ignore this event and *do >>> not* emit to the Kafka. If state does not exist, save this event to >>> the Cassandra, then emit this event to the Kafka. >>> >>> Stream2: Messages reading from the second topic. Read state from >>> Cassandra using the *uuid*. If state exists, check a column that >>> represents this event came from topic2. If the value of this column is >>> false, enrich the event using state and update the Cassandra column as >>> true. If true, ignore this event because this event is a duplicate. >>> >>> def checkDeDuplication(event): Option[Event] = { >>> val state = readFromCassandra(state) >>> if (state exist) None //ignore this event >>> else { >>> saveEventToCassandra(event) >>> Some(event) >>> } >>> } >>> >>> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = { >>> val state = readFromCassandra(state) >>> if (state does not exist) None //ignore this event >>> else { >>> if (state.flag == true) None // ignore this event >>> else { >>> updateFlagAsTrueInCassandra(event) >>> Some(event) >>> } >>> } >>> } >>> >>> >>> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate()) >>> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent()) >>> stream1.union(stream2).addSink(kafkaSink) >>> >>> 1- Is that a good approach? >>> >>> 2- Is Cassandra the right choice here? Note, the state size is very >>> large and I have to feed the state from batch flow firstly. Thus I can not >>> use the internal state like rocksdb. >>> >>> 3- Can i improve this logic? >>> >>> 4- May be any bottleneck in that flow? I think to use asyncMap functions >>> for state read/write operations. >>> >>