Oguzhan, Note, the state size is very large and I have to feed the state from batch > flow firstly. Thus I can not use the internal state like rocksdb.
How large is "very large"? Using RocksDB, several users have reported working with jobs using many TBs of state. And there are techniques for bootstrapping the state. That doesn't have to be a showstopper. May be any bottleneck in that flow? I think to use asyncMap functions for > state read/write operations. That's a good reason to reconsider using Flink state. Regards, David On Fri, Apr 23, 2021 at 12:22 PM Oğuzhan Mangır < sosyalmedya.oguz...@gmail.com> wrote: > I'm trying to design a stream flow that checks *de-duplicate* events and > sends them to the Kafka topic. > > Basically, flow looks like that; > > kafka (multiple topics) => flink (checking de-duplication and event > enrichment) => kafka (single topic) > > For de-duplication, I'm thinking of using Cassandra as an external state > store. The details of my job; > > I have an event payload with *uuid* Field. If the event that has the same > uuid will come, this event should be discarded. In my case, two kafka > topics are reading. The first topic has a lot of fields, but other topics > just have a *uuid* field, thus I have to enrich data using the same uuid > for the events coming from the second topic. > > Stream1: Messages reading from the first topic. Read state from Cassandra > using the *uuid*. If a state exists, ignore this event and *do not* emit > to the Kafka. If state does not exist, save this event to the Cassandra, > then emit this event to the Kafka. > > Stream2: Messages reading from the second topic. Read state from Cassandra > using the *uuid*. If state exists, check a column that represents this > event came from topic2. If the value of this column is false, enrich the > event using state and update the Cassandra column as true. If true, ignore > this event because this event is a duplicate. > > def checkDeDuplication(event): Option[Event] = { > val state = readFromCassandra(state) > if (state exist) None //ignore this event > else { > saveEventToCassandra(event) > Some(event) > } > } > > def checkDeDuplicationAndEnrichEvent(event): Option[Event] = { > val state = readFromCassandra(state) > if (state does not exist) None //ignore this event > else { > if (state.flag == true) None // ignore this event > else { > updateFlagAsTrueInCassandra(event) > Some(event) > } > } > } > > > val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate()) > val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent()) > stream1.union(stream2).addSink(kafkaSink) > > 1- Is that a good approach? > > 2- Is Cassandra the right choice here? Note, the state size is very large > and I have to feed the state from batch flow firstly. Thus I can not use > the internal state like rocksdb. > > 3- Can i improve this logic? > > 4- May be any bottleneck in that flow? I think to use asyncMap functions > for state read/write operations. >