Hi Oğuzhan

Take a look at bloom filter. You might get better ideas.

Links:
https://en.wikipedia.org/wiki/Bloom_filter
https://stackoverflow.com/questions/4282375/what-is-the-advantage-to-using-bloom-filters
https://redislabs.com/modules/redis-bloom/

Thank you

On Fri, Apr 23, 2021 at 3:52 PM Oğuzhan Mangır <
sosyalmedya.oguz...@gmail.com> wrote:

> I'm trying to design a stream flow that checks *de-duplicate* events and
> sends them to the Kafka topic.
>
> Basically, flow looks like that;
>
> kafka (multiple topics) =>  flink (checking de-duplication and event
> enrichment) => kafka (single topic)
>
> For de-duplication, I'm thinking of using Cassandra as an external state
> store. The details of my job;
>
> I have an event payload with *uuid* Field. If the event that has the same
> uuid will come, this event should be discarded. In my case, two kafka
> topics are reading. The first topic has a lot of fields, but other topics
> just have a *uuid* field, thus I have to enrich data using the same uuid
> for the events coming from the second topic.
>
> Stream1: Messages reading from the first topic. Read state from Cassandra
> using the *uuid*. If a state exists, ignore this event and *do not* emit
> to the Kafka. If state does not exist, save  this event to the Cassandra,
> then emit this event to the Kafka.
>
> Stream2: Messages reading from the second topic. Read state from Cassandra
> using the *uuid*. If state exists, check a column that represents this
> event came from topic2. If the value of this column is false, enrich the
> event using state and update the Cassandra column as true. If true, ignore
> this event because this event is a duplicate.
>
> def checkDeDuplication(event): Option[Event] = {
>   val state = readFromCassandra(state)
>   if (state exist) None //ignore this event
>   else {
>     saveEventToCassandra(event)
>     Some(event)
>   }
> }
>
> def checkDeDuplicationAndEnrichEvent(event): Option[Event] = {
>       val state = readFromCassandra(state)
>       if (state does not exist) None //ignore this event
>       else {
>         if (state.flag == true) None // ignore this event
>         else {
>            updateFlagAsTrueInCassandra(event)
>            Some(event)
>         }
>       }
>     }
>
>
> val stream1 = readKafkaTopic1().flatMap(checkDeDuplicationAndSaveToSatate())
> val stream2 = readKafkaTopic2().flatMap(checkDeDuplicationAndEnrichEvent())
> stream1.union(stream2).addSink(kafkaSink)
>
> 1- Is that a good approach?
>
> 2- Is Cassandra the right choice here? Note, the state size is very large
> and I have to feed the state from batch flow firstly. Thus I can not use
> the internal state like rocksdb.
>
> 3- Can i improve this logic?
>
> 4- May be any bottleneck in that flow? I think to use asyncMap functions
> for state read/write operations.
>


-- 
Raghavendar T S
www.teknosrc.com

Reply via email to