Looks great! Thank you Patrik. On Fri, Nov 2, 2018 at 2:38 AM Patrik Kleindl <pklei...@gmail.com> wrote:
> Hi Andrew > > Did you take a look at > > https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java > ? > We are using this for a case like you described. > Growth should be limited with this approach. > > Best regards > Patrik > > > Am 02.11.2018 um 01:33 schrieb Andrew Wilcox <andrew.wil...@gmail.com>: > > > > Suppose I have a producer which is ingesting a data stream with unique > keys > > from an external service and sending it to a Kafka topic. In my > producer I > > can set enable.idempotence and get exactly-once delivery in the presence > of > > broker crashes. However my producer might crash after it delivers a > batch > > of messages to Kafka but before it records that the batch was delivered. > > After restarting the crashed producer it would re-deliver the same batch, > > resulting in duplicate messages in the topic. > > > > With a streams transformer I can deduplicate the topic by using a state > > store to record previously seen keys and then only creating an output > > record if the key hasn't been seen before. However without a mechanism > to > > remove old keys the state store will grow without bound. > > > > Say I only want to deduplicate over a time period such as one day. (I'm > > confident that I'll be able to restart a crashed producer sooner). Thus > > I'd like keys older than a day to expire out of the state store, so the > > store only needs to keep track of keys seen in the last day or so. > > > > Is there a way to do this with Kafka streams? Or is there another > > recommended mechanism to keep messages with unique keys unduplicated in > the > > presence of producer crashes? > > > > Thanks! > > > > Andrew >