Thanks for the reply. I really appreciate the insight. Again newbie here. I want to expand on what I am struggling with. It may be that I just need to get my mind thinking more in a streaming mode. Please let me know you thoughts. Just having problem ‘getting it’ on my own.
Below is a simple topic I want to identify where the 'lane' duplicates, and when it does get the 'txnId' of the duplicate record. The txnId is distinct and will never be duplicate. The lane will seldom have a duplicate. Topic payload {txnId,lane,txnDate} Notice lane 'c' is dulplicated 3 times. {'01','wfasd','11/07/2016'} {'02','bas','11/07/2016'} {'03','c','11/07/2016'} {'04','xxwq','11/07/2016'} {'05','dasf','11/07/2016'} {'06','drdd','11/07/2016'} {'07','tasd','11/07/2016'} {'08','ywq','11/07/2016'} {'09','c','11/07/2016'} {'10','jda','11/07/2016'} {'11','c','11/08/2016'} {'12','ozs','11/09/2016'} . . . Note txnId and lane keep getting more distinct values. My thought is to join the data to itself, one as kstream the other as ktable for lookups. kstream as {lane:(txnId,txnDate)} so I visualize like ... ('wfasd':('01','11/07/2016')), ('bas' :('02','11/07/2016')), ('c' :('03','11/07/2016')), ... The ktable (lookup table) is an aggregate view I built to hold historic data by lane: (lane:{(txnId1,txnDate1), (txnId2,txnDate2), . . .}) I visualize the materialized view as below. 'c' being the important key/value for this example... Also note this materialized view will keep growing without bound. There will always be new keys and txnIds. ('wfasd':{('01','11/07/2016')}), ('bas' :{('02','11/07/2016')}), ('c' :{('03','11/07/2016'), ('09','11/07/2016'), ('11','11/09/2016')}) . . . Now I can join a kstream to ktable on lane, and duplicates are easy to identify. I can traverse list from value found in materialized view to get previous txnId I need. So I can build resulting stream / topic like… {txnId,lane,txnDate,duplicateTxnId} note where c duplicates there is a duplicate txnId... {'01','wfasd','11/07/2016',''} {'02','bas','11/07/2016',''} {'03','c','11/07/2016',''} {'04','xxwq','11/07/2016',''} {'05','dasf','11/07/2016',''} {'06','drdd','11/07/2016',''} {'07','tasd','11/07/2016',''} {'08','ywq','11/07/2016',''} {'09','c','11/07/2016','03'} {'10','jda','11/07/2016',''} {'11','c','11/08/2016','09'} {'12','ozs','11/09/2016',''} The issue is the materialized view of the ktable keeps growing without bound, however by business rule I only need past 2 weeks, so I think over time there is performance impact that is not needed regarding the materialized view, one, the size of materialized view keeps growing, and two, traversing ever increasing larger value lists. Ideally if the topic is set to two weeks retention, then once an item is 'popped off' I would like to do an aggregate subtraction for it's value. But I don't think this is how kafka works. Is this possible? Any other feedback/suggestion? Perhaps a better approach? Thanks John -----Original Message----- From: Matthias J. Sax [mailto:matth...@confluent.io] Sent: Thursday, November 03, 2016 4:29 PM To: users@kafka.apache.org<mailto:users@kafka.apache.org> Subject: Re: sliding ktable? -----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Hi John, first of all, a KTable is a (changelog) stream; thus, by definition it is infinite. However, I assume you are worried about the internal materialized view, of the changelog stream (ie, a table state). This view only contains the latest value for each key, ie, a single entry for each key. Thus, it's size is bound by the number of key and does not change as long as you number of distinct keys does not change. > At any given time I need at least 2 weeks data in my ktable There is no such think as "data of the last 2 weeks": Using a KTable for a KStream-KTable join to do lookups, each lookup will be done on the current state if the KTable and thus only return a single value for each key. There is no old data in the materialized view with this regard. Of course, if a key does not get any update for a long time, you can consider the corresponding value as old, but it is still the latest (ie, current) value for the key. > ktable.foreach #foreach() is applied to the changelog stream and not the internally materialized view. Thus, it does not scan over the key space or is applied to each currently stored key in the view. It is rather called for each update record that is in the changelog stream. > not sure keys can be removed this way The only way to delete a key-value entry in the materialized view is to send a so-called tombstone record with format <key:null> (ie, value is null). By "send" I mean that this tombstone record must be in the input of the KTable. - -Matthias On 11/3/16 12:39 PM, John Hayles wrote: > Newbie here, I am working with Kafka Streams with java 1.8. > > > > I want to use the ktable as a lookup table in a join to a kstream. > I had no issue implementing this. However, I do not want the ktable > to grow without bounds, I want to limit the ktable to the past 2 weeks > data, more of a 'sliding' window ktable. At any given time I need at > least 2 weeks data in my ktable, so I don’t think solution like > tumbling table will work since it starts over every time it hops. > > > > A little simplified example. . . > > > > > > KStream<String, GenericRecord> txnStream = builder.stream("TXN_DATA"); > > > > KStream<String, GenericRecord> txnStreamFull = txnStream > > .map((key, record) -> { > > return new KeyValue<>(record.get("TXN").toString(), record); > > }) > > .through("RekeyedIntermediateTopic1") > > ; > > > > KTable<String,Long> countTableStream = txnStream // do not want > this table to grow indefinitely. > > .map((key, record) -> { > > return new KeyValue<>(record.get("TXN").toString(), record); > > }) > > .through("RekeyedIntermediateTopic2") > > .countByKey(stringSerdeKey, "DupCountKTable10"); > > > > KStream<String, GenericRecord> duplicatesStream = > > txnStreamFull.leftJoin(countTableStream, > (vTxnStream,vCountTableStream) -> { > > vTxnStream.put("count", > Long.toString(vCountTableStream.longValue())); > > return vTxnStream;}); > > > > duplicatesStream.to("DUP_TXNS"); > > > > I thought perhaps can schedule ktable.foreach to inspect and clean, > but not sure keys can be removed this way. > > > > I may be missing a basic concept here. I have spent some time > searching but not finding good answer, thanks for any tips. > > > > Thanks, > > John > > > > > > > > > > > > > > > > > > > > > > > > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYG6wiAAoJECnhiMLycopPXr4QAIPS8zpq06TRwkUyJfCqHzvg xkuaVBDGL5NOG+HvzHi4CHkTYTXV/XPbmDEasEhXjakldZmlnnCv040Zfe63y9g4 tPOUjq1mvoIr7K7gJorZICZFMHwfRfRp1ocyFBW6xjVfryldR/+J7XMEAY+WZmMv cVaa2Mu1ZexyYdBueQ8rR+wdZ4gA5P7rsQmP2nIRTB1TH872ebMUxGWCNF4ORmkS rrjYZ56KSCXKfpgPNaVD5OLXpW4GpJwFkc3fUX+qBx4vmxQKm8cN1MrgZh6xnZCg AR/7gaZsLR2IgEHh8VbMjOVazY1pLqGFLiwEi3gYXWeQ20zDHKa7QCH8O/KpJqQW U3Q/CV8oqSwRLJkQmWnkVUrwUL05hnjSnz7TDYy2yr+jBE0Lp0pA4PyddqsWVQk2 bCGSrtdQoSaXic1C2v/1ODpDSG+aKk2fz9ZjhcAO9jOUGywKkZN34iBeooOobGC7 8gkolNQOVxJE/MKDRbH/0XpyehYfrIxHUuzIw200Ha9XPgIKyqHLxo4n+BxYEFQx G6cEMf7r4GHh5/7zNteBmMZNcDwkyWYZeOdhdlglti44NUaa7TpmdLp+6NTZYp+k UmjO5ZbmGBrmkBBz+2Ma39RR3jDYt8Z+kRlrHBAxWRhgLPwH0ManH3skzPIVY54u v/kZaji6rOUxUFDsUIYw =pWF2 -----END PGP SIGNATURE-----