Newbie here, I am working with Kafka Streams with java 1.8.


I want to use the ktable as a lookup table in a join to a kstream.  I had no 
issue implementing this.  However, I do not want the ktable to grow without 
bounds, I want to limit the ktable to the past 2 weeks data, more of a 
'sliding' window ktable.  At any given time I need at least 2 weeks data in my 
ktable, so I don’t think solution like tumbling table will work since it starts 
over every time it hops.



A little simplified example. . .





             KStream<String, GenericRecord> txnStream = 
builder.stream("TXN_DATA");



             KStream<String, GenericRecord> txnStreamFull = txnStream

               .map((key, record) -> {

              return new KeyValue<>(record.get("TXN").toString(), record);

            })

            .through("RekeyedIntermediateTopic1")

            ;



             KTable<String,Long> countTableStream =  txnStream   //  do not 
want this table to grow indefinitely.

                     .map((key, record) -> {

                         return new KeyValue<>(record.get("TXN").toString(), 
record);

                       })

               .through("RekeyedIntermediateTopic2")

                     .countByKey(stringSerdeKey, "DupCountKTable10");



             KStream<String, GenericRecord> duplicatesStream =

                       txnStreamFull.leftJoin(countTableStream, 
(vTxnStream,vCountTableStream) -> {

                              vTxnStream.put("count", 
Long.toString(vCountTableStream.longValue()));

                              return vTxnStream;});



             duplicatesStream.to("DUP_TXNS");



I thought perhaps can schedule ktable.foreach to inspect and clean, but not 
sure keys can be removed this way.



I may be missing a basic concept here.  I have spent some time searching but 
not finding good answer, thanks for any tips.



Thanks,

John
























Reply via email to