There is a problem with tombstoning old entries based on a new entry, that, the keys which have no new entries will remain there forever.
On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > John, > > your thinking is on the right track! > > About infinitely growing KTable: It seems you are extending each lane > with a list of all txnId -- so your view needs infinite memory as you > expend your values... A quick fix might be, to delete older txnID for > this list, each time you update the list (as you mentioned you only > need data for the last two weeks -- you might need to add a timestamp > for each txnID in the list to do the pruning each time you append or > lookup the list). > > > Ideally if the topic is set to two weeks retention, then once an > > item is 'popped off' I would like to do an aggregate subtraction > > for it's value. But I don't think this is how kafka works. Is > > this possible? Any other feedback/suggestion? Perhaps a better > > approach? > > There is no Kafka support for this. You would need to go with the > suggest as describe above. The only "delete" mechanism Kafka offers is > for compacted topics via tombstone message (ie, message with > <key:null> format; value == null). However, tombstones do delete the > whole record with this key, thus I doubt they are useful for your case. > > However, reading through your email, I am wondering why you do need > the all old txnIds. You mentioned that you want to get the previous > txnId for each duplicate (and you example results verifies this). > Thus, it would be sufficient to only store the latest tnxId for each > "lane" IMHO. Furhtermore, for this deduplication it seems sufficient > to only use a KTable without a join. > > The ideas would the as follows: You consumer you streams as a > changelog (ie, KTable). For each record, you check if there is an > entry in the view. If not, just put the record itself as result > because there is no duplicate. If you do find an entry, the current > record is a duplicate of the record found. The record found, does > contain it's txnId, you so can use this as "previous txnId". As > result, you store the current record. You data format would be like > <lane:(txnId,txnDate)> (for input) and > <lane:(txnId,txnDate,previoudTxnId)> (for output. > > You stream and view would be like: > > {'c',('03','11/07/2016')} plus state: EMPTY > > => {'c',('03','11/07/2016',''} // this is output and state update > at the same time > > > > {'c',('09','11/07/2016')} plus state: {'c',('03','11/07/2016',null} > > => {'c',('09','11/07/2016','03')} // this is output and state > update at the same time > > > > {'c',('11','11/08/2016')} plus state: {'c',('09','11/07/2016','03')} > > => {'c',('11','11/08/2016','09')} // this is output and state > update at the same time > > > - -Matthias > > On 11/7/16 8:22 AM, John Hayles wrote: > > Thanks for the reply. I really appreciate the insight. Again > > newbie here. I want to expand on what I am struggling with. It > > may be that I just need to get my mind thinking more in a streaming > > mode. Please let me know you thoughts. Just having problem > > ‘getting it’ on my own. > > > > > > > > Below is a simple topic I want to identify where the 'lane' > > duplicates, and when it does get the 'txnId' of the duplicate > > record. The txnId is distinct and will never be duplicate. The > > lane will seldom have a duplicate. > > > > > > > > > > > > Topic payload {txnId,lane,txnDate} Notice lane 'c' is dulplicated > > 3 times. > > > > > > > > {'01','wfasd','11/07/2016'} > > > > {'02','bas','11/07/2016'} > > > > {'03','c','11/07/2016'} > > > > {'04','xxwq','11/07/2016'} > > > > {'05','dasf','11/07/2016'} > > > > {'06','drdd','11/07/2016'} > > > > {'07','tasd','11/07/2016'} > > > > {'08','ywq','11/07/2016'} > > > > {'09','c','11/07/2016'} > > > > {'10','jda','11/07/2016'} > > > > {'11','c','11/08/2016'} > > > > {'12','ozs','11/09/2016'} > > > > . . . > > > > Note txnId and lane keep getting more distinct values. > > > > > > > > > > > > My thought is to join the data to itself, one as kstream the other > > as ktable for lookups. > > > > > > > > kstream as > > > > > > > > {lane:(txnId,txnDate)} > > > > > > > > so I visualize like ... > > > > > > > > ('wfasd':('01','11/07/2016')), > > > > ('bas' :('02','11/07/2016')), > > > > ('c' :('03','11/07/2016')), ... > > > > > > > > The ktable (lookup table) is an aggregate view I built to hold > > historic data by lane: > > > > > > > > (lane:{(txnId1,txnDate1), > > > > (txnId2,txnDate2), > > > > . . .}) > > > > > > > > I visualize the materialized view as below. > > > > 'c' being the important key/value for this example... > > > > Also note this materialized view will keep growing without bound. > > > > There will always be new keys and txnIds. > > > > > > > > ('wfasd':{('01','11/07/2016')}), > > > > ('bas' :{('02','11/07/2016')}), > > > > ('c' :{('03','11/07/2016'), > > > > ('09','11/07/2016'), > > > > ('11','11/09/2016')}) > > > > . . . > > > > > > > > Now I can join a kstream to ktable on lane, and duplicates are easy > > to identify. I can traverse list from value found in materialized > > view to get previous txnId I need. > > > > > > > > So I can build resulting stream / topic like… > > > > > > > > {txnId,lane,txnDate,duplicateTxnId} > > > > > > > > note where c duplicates there is a duplicate txnId... > > > > > > > > {'01','wfasd','11/07/2016',''} > > > > {'02','bas','11/07/2016',''} > > > > {'03','c','11/07/2016',''} > > > > {'04','xxwq','11/07/2016',''} > > > > {'05','dasf','11/07/2016',''} > > > > {'06','drdd','11/07/2016',''} > > > > {'07','tasd','11/07/2016',''} > > > > {'08','ywq','11/07/2016',''} > > > > {'09','c','11/07/2016','03'} > > > > {'10','jda','11/07/2016',''} > > > > {'11','c','11/08/2016','09'} > > > > {'12','ozs','11/09/2016',''} > > > > > > > > > > > > The issue is the materialized view of the ktable keeps growing > > without bound, however by business rule I only need past 2 weeks, > > so I think over time there is performance impact that is not needed > > regarding the materialized view, one, the size of materialized view > > keeps growing, and two, traversing ever increasing larger value > > lists. > > > > > > > > Ideally if the topic is set to two weeks retention, then once an > > item is 'popped off' I would like to do an aggregate subtraction > > for it's value. But I don't think this is how kafka works. Is > > this possible? Any other feedback/suggestion? Perhaps a better > > approach? > > > > > > > > Thanks > > > > John > > > > > > > > > > > > > > > > -----Original Message----- > > > > From: Matthias J. Sax [mailto:matth...@confluent.io] > > > > Sent: Thursday, November 03, 2016 4:29 PM > > > > To: users@kafka.apache.org<mailto:users@kafka.apache.org> > > > > Subject: Re: sliding ktable? > > > > > > > > Hash: SHA512 > > > > > > > > Hi John, > > > > > > > > first of all, a KTable is a (changelog) stream; thus, by definition > > it is infinite. > > > > > > > > However, I assume you are worried about the internal materialized > > view, of the changelog stream (ie, a table state). This view only > > contains the latest value for each key, ie, a single entry for each > > key. Thus, it's size is bound by the number of key and does not > > change as long as you number of distinct keys does not change. > > > > > > > >> At any given time I need at least 2 weeks data in my ktable > > > > > > > > There is no such think as "data of the last 2 weeks": > > > > > > > > Using a KTable for a KStream-KTable join to do lookups, each lookup > > will be done on the current state if the KTable and thus only > > return a single value for each key. There is no old data in the > > materialized view with this regard. Of course, if a key does not > > get any update for a long time, you can consider the corresponding > > value as old, but it is still the latest (ie, current) value for > > the key. > > > > > > > >> ktable.foreach > > > > > > > > #foreach() is applied to the changelog stream and not the > > internally materialized view. Thus, it does not scan over the key > > space or is applied to each currently stored key in the view. It is > > rather called for each update record that is in the changelog > > stream. > > > > > > > >> not sure keys can be removed this way > > > > > > > > The only way to delete a key-value entry in the materialized view > > is to send a so-called tombstone record with format <key:null> (ie, > > value is null). By "send" I mean that this tombstone record must be > > in the input of the KTable. > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 11/3/16 12:39 PM, John Hayles wrote: > > > >> Newbie here, I am working with Kafka Streams with java 1.8. > > > > > > > > > > > > > > > >> I want to use the ktable as a lookup table in a join to a > >> kstream. > > > >> I had no issue implementing this. However, I do not want the > >> ktable > > > >> to grow without bounds, I want to limit the ktable to the past 2 > >> weeks > > > >> data, more of a 'sliding' window ktable. At any given time I > >> need at > > > >> least 2 weeks data in my ktable, so I don’t think solution like > > > >> tumbling table will work since it starts over every time it > >> hops. > > > > > > > > > > > > > > > >> A little simplified example. . . > > > > > > > > > > > > > > > > > > > > > > > >> KStream<String, GenericRecord> txnStream = > >> builder.stream("TXN_DATA"); > > > > > > > > > > > > > > > >> KStream<String, GenericRecord> txnStreamFull = txnStream > > > > > > > >> .map((key, record) -> { > > > > > > > >> return new KeyValue<>(record.get("TXN").toString(), record); > > > > > > > >> }) > > > > > > > >> .through("RekeyedIntermediateTopic1") > > > > > > > >> ; > > > > > > > > > > > > > > > >> KTable<String,Long> countTableStream = txnStream // do not > >> want > > > >> this table to grow indefinitely. > > > > > > > >> .map((key, record) -> { > > > > > > > >> return new KeyValue<>(record.get("TXN").toString(), record); > > > > > > > >> }) > > > > > > > >> .through("RekeyedIntermediateTopic2") > > > > > > > >> .countByKey(stringSerdeKey, "DupCountKTable10"); > > > > > > > > > > > > > > > >> KStream<String, GenericRecord> duplicatesStream = > > > > > > > >> txnStreamFull.leftJoin(countTableStream, > > > >> (vTxnStream,vCountTableStream) -> { > > > > > > > >> vTxnStream.put("count", > > > >> Long.toString(vCountTableStream.longValue())); > > > > > > > >> return vTxnStream;}); > > > > > > > > > > > > > > > >> duplicatesStream.to("DUP_TXNS"); > > > > > > > > > > > > > > > >> I thought perhaps can schedule ktable.foreach to inspect and > >> clean, > > > >> but not sure keys can be removed this way. > > > > > > > > > > > > > > > >> I may be missing a basic concept here. I have spent some time > > > >> searching but not finding good answer, thanks for any tips. > > > > > > > > > > > > > > > >> Thanks, > > > > > > > >> John > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYILwEAAoJECnhiMLycopPaAoP/3Nl6RuGm+FxtKG1QopgoJQ1 > ur1Tvejkuvce56QVjEbkCwg/plEWFqyjZow4nELUtJwZ8BNSqWvgbCZ4JYSdjqMT > 264UPWMGBWfJ7YFDVszzU+dBriyNk4rLBwiYuW5/093YyUScN0aDSu1igwf72crr > +kFnl9DyRwF7kdYFEpgqpisB+wCvc4ThXGJhFsflbs0yJVDko2FletIEL6vXF/ww > 6Kgl6QxRoAseqHdiwkYrlMqFSq7OnuQweRhqOpAf6pnIEC4nZEoKImJHokfG9tH0 > 2pgcIvVhz/MD6C52nBBkB3WytF8xrh/grxnb3UU6NIEWAbJ5mcwnaweKULoVH4xp > Tce5oGa7ADtZCBfanqC/28ed8nnECXtsM+qVNGkD2UjtbaB/nzsiha3fWKTH1Huy > DfPsnr5kYLqP57Lc55ZFqNN5r6kQH0iggEXIV6MHMJBLI9zbDbqYBOcWcDlPLVaE > QXUad6fKrM7SjpeEP38hNC1Wyx9W/NOSW+GEWotzPvxWLORMIBySTLBV5GCm3RVl > dwyvfRBDf/bN3MT5+okBnOTfvIv9czCfNgBtfLFXa6ZGD531CqGRlJ454ET7Z8ON > 8R5bYW0Eqz3cZKGf5A6IEzGz9H+/3mIymzrWI4ZDQ/D9GTnzA/9vqFsUgzh6V1kE > 0+s9Dljyfws1bzXe+g49 > =Roop > -----END PGP SIGNATURE----- > -- Radha Krishna, Proddaturi 253-234-5657