Hi Oğuzhan Take a look at bloom filter. You might get better ideas.
Links: https://en.wikipedia.org/wiki/Bloom_filter https://stackoverflow.com/questions/4282375/what-is-the-advantage-to-using-bloom-filters https://redislabs.com/modules/redis-bloom/ Thank you On Fri, Apr 23, 2021 at 3:52 PM Oğuzhan Mangır < sosyalmedya.oguz...@gmail.com> wrote: > I'm trying to design a stream flow that checks *de-duplicate* events and > sends them to the Kafka topic. > > Basically, flow looks like that; > > kafka (multiple topics) => flink (checking de-duplication and event > enrichment) => kafka (single topic) > > For de-duplication, I'm thinking of using Cassandra as an external state > store. The details of my job; > > I have an event payload with *uuid* Field. If the event that has the same > uuid will come, this event should be discarded. In my case, two kafka > topics are reading. The first topic has a lot of fields, but other topics > just have a *uuid* field, thus I have to enrich data using the same uuid > for the events coming from the second topic. > > Stream1: Messages reading from the first topic. Read state from Cassandra > using the *uuid*. If a state exists, ignore this event and *do not* emit > to the Kafka. If state does not exist, save this event to the Cassandra, > then emit this event to the Kafka. > > Stream2: Messages reading from the second topic. Read state from Cassandra > using the *uuid*. If state exists, check a column that represents this > event came from topic2. If the value of this column is false, enrich the > event using state and update the Cassandra column as true. If true, ignore > this event because this event is a duplicate. > > def checkDeDuplication(event): Option[Event] = { > val state = readFromCassandra(state) > if (state exist) None //ignore this event > else { > saveEventToCassandra(event) > Some(event) > } > } > > def checkDeDuplicationAndEnrichEvent(event): Option[Event] = { > val state = readFromCassandra(state) > if (state does not exist) None //ignore this event > else { > if (state.flag == true) None // ignore this event > else { > updateFlagAsTrueInCassandra(event) > Some(event) > } > } > } > > > val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate()) > val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent()) > stream1.union(stream2).addSink(kafkaSink) > > 1- Is that a good approach? > > 2- Is Cassandra the right choice here? Note, the state size is very large > and I have to feed the state from batch flow firstly. Thus I can not use > the internal state like rocksdb. > > 3- Can i improve this logic? > > 4- May be any bottleneck in that flow? I think to use asyncMap functions > for state read/write operations. > -- Raghavendar T S www.teknosrc.com