-----BEGIN PGP SIGNED MESSAGE----- Hash: SHA512 Yes and no.
Kafka allows you to set a retention time for compacted topics, too. Thus, if a key does not get an update for this retention time, it will be deleted, too. See here for details: https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+c ompaction+and+deletion+to+co-exist - -Matthias On 11/7/16 11:44 PM, R Krishna wrote: > There is a problem with tombstoning old entries based on a new > entry, that, the keys which have no new entries will remain there > forever. > > On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax > <matth...@confluent.io> wrote: > > John, > > your thinking is on the right track! > > About infinitely growing KTable: It seems you are extending each > lane with a list of all txnId -- so your view needs infinite memory > as you expend your values... A quick fix might be, to delete older > txnID for this list, each time you update the list (as you > mentioned you only need data for the last two weeks -- you might > need to add a timestamp for each txnID in the list to do the > pruning each time you append or lookup the list). > >>>> Ideally if the topic is set to two weeks retention, then once >>>> an item is 'popped off' I would like to do an aggregate >>>> subtraction for it's value. But I don't think this is how >>>> kafka works. Is this possible? Any other >>>> feedback/suggestion? Perhaps a better approach? > > There is no Kafka support for this. You would need to go with the > suggest as describe above. The only "delete" mechanism Kafka offers > is for compacted topics via tombstone message (ie, message with > <key:null> format; value == null). However, tombstones do delete > the whole record with this key, thus I doubt they are useful for > your case. > > However, reading through your email, I am wondering why you do > need the all old txnIds. You mentioned that you want to get the > previous txnId for each duplicate (and you example results verifies > this). Thus, it would be sufficient to only store the latest tnxId > for each "lane" IMHO. Furhtermore, for this deduplication it seems > sufficient to only use a KTable without a join. > > The ideas would the as follows: You consumer you streams as a > changelog (ie, KTable). For each record, you check if there is an > entry in the view. If not, just put the record itself as result > because there is no duplicate. If you do find an entry, the > current record is a duplicate of the record found. The record > found, does contain it's txnId, you so can use this as "previous > txnId". As result, you store the current record. You data format > would be like <lane:(txnId,txnDate)> (for input) and > <lane:(txnId,txnDate,previoudTxnId)> (for output. > > You stream and view would be like: > > {'c',('03','11/07/2016')} plus state: EMPTY > > => {'c',('03','11/07/2016',''} // this is output and state > update at the same time > > > > {'c',('09','11/07/2016')} plus state: > {'c',('03','11/07/2016',null} > > => {'c',('09','11/07/2016','03')} // this is output and state > update at the same time > > > > {'c',('11','11/08/2016')} plus state: > {'c',('09','11/07/2016','03')} > > => {'c',('11','11/08/2016','09')} // this is output and state > update at the same time > > > -Matthias > > On 11/7/16 8:22 AM, John Hayles wrote: >>>> Thanks for the reply. I really appreciate the insight. >>>> Again newbie here. I want to expand on what I am struggling >>>> with. It may be that I just need to get my mind thinking >>>> more in a streaming mode. Please let me know you thoughts. >>>> Just having problem ‘getting it’ on my own. >>>> >>>> >>>> >>>> Below is a simple topic I want to identify where the 'lane' >>>> duplicates, and when it does get the 'txnId' of the >>>> duplicate record. The txnId is distinct and will never be >>>> duplicate. The lane will seldom have a duplicate. >>>> >>>> >>>> >>>> >>>> >>>> Topic payload {txnId,lane,txnDate} Notice lane 'c' is >>>> dulplicated 3 times. >>>> >>>> >>>> >>>> {'01','wfasd','11/07/2016'} >>>> >>>> {'02','bas','11/07/2016'} >>>> >>>> {'03','c','11/07/2016'} >>>> >>>> {'04','xxwq','11/07/2016'} >>>> >>>> {'05','dasf','11/07/2016'} >>>> >>>> {'06','drdd','11/07/2016'} >>>> >>>> {'07','tasd','11/07/2016'} >>>> >>>> {'08','ywq','11/07/2016'} >>>> >>>> {'09','c','11/07/2016'} >>>> >>>> {'10','jda','11/07/2016'} >>>> >>>> {'11','c','11/08/2016'} >>>> >>>> {'12','ozs','11/09/2016'} >>>> >>>> . . . >>>> >>>> Note txnId and lane keep getting more distinct values. >>>> >>>> >>>> >>>> >>>> >>>> My thought is to join the data to itself, one as kstream the >>>> other as ktable for lookups. >>>> >>>> >>>> >>>> kstream as >>>> >>>> >>>> >>>> {lane:(txnId,txnDate)} >>>> >>>> >>>> >>>> so I visualize like ... >>>> >>>> >>>> >>>> ('wfasd':('01','11/07/2016')), >>>> >>>> ('bas' :('02','11/07/2016')), >>>> >>>> ('c' :('03','11/07/2016')), ... >>>> >>>> >>>> >>>> The ktable (lookup table) is an aggregate view I built to >>>> hold historic data by lane: >>>> >>>> >>>> >>>> (lane:{(txnId1,txnDate1), >>>> >>>> (txnId2,txnDate2), >>>> >>>> . . .}) >>>> >>>> >>>> >>>> I visualize the materialized view as below. >>>> >>>> 'c' being the important key/value for this example... >>>> >>>> Also note this materialized view will keep growing without >>>> bound. >>>> >>>> There will always be new keys and txnIds. >>>> >>>> >>>> >>>> ('wfasd':{('01','11/07/2016')}), >>>> >>>> ('bas' :{('02','11/07/2016')}), >>>> >>>> ('c' :{('03','11/07/2016'), >>>> >>>> ('09','11/07/2016'), >>>> >>>> ('11','11/09/2016')}) >>>> >>>> . . . >>>> >>>> >>>> >>>> Now I can join a kstream to ktable on lane, and duplicates >>>> are easy to identify. I can traverse list from value found >>>> in materialized view to get previous txnId I need. >>>> >>>> >>>> >>>> So I can build resulting stream / topic like… >>>> >>>> >>>> >>>> {txnId,lane,txnDate,duplicateTxnId} >>>> >>>> >>>> >>>> note where c duplicates there is a duplicate txnId... >>>> >>>> >>>> >>>> {'01','wfasd','11/07/2016',''} >>>> >>>> {'02','bas','11/07/2016',''} >>>> >>>> {'03','c','11/07/2016',''} >>>> >>>> {'04','xxwq','11/07/2016',''} >>>> >>>> {'05','dasf','11/07/2016',''} >>>> >>>> {'06','drdd','11/07/2016',''} >>>> >>>> {'07','tasd','11/07/2016',''} >>>> >>>> {'08','ywq','11/07/2016',''} >>>> >>>> {'09','c','11/07/2016','03'} >>>> >>>> {'10','jda','11/07/2016',''} >>>> >>>> {'11','c','11/08/2016','09'} >>>> >>>> {'12','ozs','11/09/2016',''} >>>> >>>> >>>> >>>> >>>> >>>> The issue is the materialized view of the ktable keeps >>>> growing without bound, however by business rule I only need >>>> past 2 weeks, so I think over time there is performance >>>> impact that is not needed regarding the materialized view, >>>> one, the size of materialized view keeps growing, and two, >>>> traversing ever increasing larger value lists. >>>> >>>> >>>> >>>> Ideally if the topic is set to two weeks retention, then once >>>> an item is 'popped off' I would like to do an aggregate >>>> subtraction for it's value. But I don't think this is how >>>> kafka works. Is this possible? Any other >>>> feedback/suggestion? Perhaps a better approach? >>>> >>>> >>>> >>>> Thanks >>>> >>>> John >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> -----Original Message----- >>>> >>>> From: Matthias J. Sax [mailto:matth...@confluent.io] >>>> >>>> Sent: Thursday, November 03, 2016 4:29 PM >>>> >>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org> >>>> >>>> Subject: Re: sliding ktable? >>>> >>>> >>>> >>>> Hash: SHA512 >>>> >>>> >>>> >>>> Hi John, >>>> >>>> >>>> >>>> first of all, a KTable is a (changelog) stream; thus, by >>>> definition it is infinite. >>>> >>>> >>>> >>>> However, I assume you are worried about the internal >>>> materialized view, of the changelog stream (ie, a table >>>> state). This view only contains the latest value for each >>>> key, ie, a single entry for each key. Thus, it's size is >>>> bound by the number of key and does not change as long as you >>>> number of distinct keys does not change. >>>> >>>> >>>> >>>>> At any given time I need at least 2 weeks data in my >>>>> ktable >>>> >>>> >>>> >>>> There is no such think as "data of the last 2 weeks": >>>> >>>> >>>> >>>> Using a KTable for a KStream-KTable join to do lookups, each >>>> lookup will be done on the current state if the KTable and >>>> thus only return a single value for each key. There is no old >>>> data in the materialized view with this regard. Of course, if >>>> a key does not get any update for a long time, you can >>>> consider the corresponding value as old, but it is still the >>>> latest (ie, current) value for the key. >>>> >>>> >>>> >>>>> ktable.foreach >>>> >>>> >>>> >>>> #foreach() is applied to the changelog stream and not the >>>> internally materialized view. Thus, it does not scan over the >>>> key space or is applied to each currently stored key in the >>>> view. It is rather called for each update record that is in >>>> the changelog stream. >>>> >>>> >>>> >>>>> not sure keys can be removed this way >>>> >>>> >>>> >>>> The only way to delete a key-value entry in the materialized >>>> view is to send a so-called tombstone record with format >>>> <key:null> (ie, value is null). By "send" I mean that this >>>> tombstone record must be in the input of the KTable. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>>> >>>> >>>> On 11/3/16 12:39 PM, John Hayles wrote: >>>> >>>>> Newbie here, I am working with Kafka Streams with java >>>>> 1.8. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> I want to use the ktable as a lookup table in a join to a >>>>> kstream. >>>> >>>>> I had no issue implementing this. However, I do not want >>>>> the ktable >>>> >>>>> to grow without bounds, I want to limit the ktable to the >>>>> past 2 weeks >>>> >>>>> data, more of a 'sliding' window ktable. At any given time >>>>> I need at >>>> >>>>> least 2 weeks data in my ktable, so I don’t think solution >>>>> like >>>> >>>>> tumbling table will work since it starts over every time >>>>> it hops. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> A little simplified example. . . >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> KStream<String, GenericRecord> txnStream = >>>>> builder.stream("TXN_DATA"); >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> KStream<String, GenericRecord> txnStreamFull = txnStream >>>> >>>> >>>> >>>>> .map((key, record) -> { >>>> >>>> >>>> >>>>> return new KeyValue<>(record.get("TXN").toString(), >>>>> record); >>>> >>>> >>>> >>>>> }) >>>> >>>> >>>> >>>>> .through("RekeyedIntermediateTopic1") >>>> >>>> >>>> >>>>> ; >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> KTable<String,Long> countTableStream = txnStream // do >>>>> not want >>>> >>>>> this table to grow indefinitely. >>>> >>>> >>>> >>>>> .map((key, record) -> { >>>> >>>> >>>> >>>>> return new KeyValue<>(record.get("TXN").toString(), >>>>> record); >>>> >>>> >>>> >>>>> }) >>>> >>>> >>>> >>>>> .through("RekeyedIntermediateTopic2") >>>> >>>> >>>> >>>>> .countByKey(stringSerdeKey, "DupCountKTable10"); >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> KStream<String, GenericRecord> duplicatesStream = >>>> >>>> >>>> >>>>> txnStreamFull.leftJoin(countTableStream, >>>> >>>>> (vTxnStream,vCountTableStream) -> { >>>> >>>> >>>> >>>>> vTxnStream.put("count", >>>> >>>>> Long.toString(vCountTableStream.longValue())); >>>> >>>> >>>> >>>>> return vTxnStream;}); >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> duplicatesStream.to("DUP_TXNS"); >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> I thought perhaps can schedule ktable.foreach to inspect >>>>> and clean, >>>> >>>>> but not sure keys can be removed this way. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> I may be missing a basic concept here. I have spent some >>>>> time >>>> >>>>> searching but not finding good answer, thanks for any >>>>> tips. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>> Thanks, >>>> >>>> >>>> >>>>> John >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >> > > > -----BEGIN PGP SIGNATURE----- Comment: GPGTools - https://gpgtools.org iQIcBAEBCgAGBQJYIi/FAAoJECnhiMLycopPlRsP/jZbuLWPZmQbODIgPihVxT/0 9iJcb1U1tj/doCAoEvCdbWp/NRYfreElpsqEeWzVIRy83IDKGmkgATnfC9c179P1 KC6YqK/sDVttbbZYy3r99YfKTS+ZfUOg8bl+i9o5asakCSsfCLkrGVSQOhytDyGv UVyz5J/tArW/KP6WsGRaBI+DIx6ImN+uL+VPfxsOyAmV+RHD9bcuOrsHISjgYyiI IIva0qg+vjhzGN5h/gxId8UgdIyJmUNY+IfQypEUd6GqoLXefoVzWEc/4yJjdEYv oZXh8QXjwufsQEDiEjC6oJWB+QGGv2muZCHvXELXuM2nJzmNODCL1Y4o0krasGoE ZG3FwIfGHTNcXpj3xE67Q3JLf0d7ropUMI5X/ZNsLNNy8XMM8DUTk4WzlstXnH2n MiuDmUkCfgfPpYsJzo3fAFJiT3IhU9S6Q4INiNwNTCE8zaJ/WygMDg1dkC7vF6QI yvQthmiM3+6okeT+9Kb+xkmcuaDl9HY0PkLNwe3tUtwmttr9VC2e6zeIoZymA4Nt sn8N0FoC6GzolAUfYhxEoFPY6JHyQKLXAsVxpD0Cf3Kyg7UhsSmlM74P8Fz9Mejr otN44p32SpLYloWJHcREAxQUSnp71wus9W0zCA1tg1Vylqufg4ziwdkw9NxBMG/j 4bPeYCcfGaD9IGhkL6fa =wA01 -----END PGP SIGNATURE-----