There is a problem with tombstoning old entries based on a new entry, that,
the keys which have no new entries will remain there forever.

On Mon, Nov 7, 2016 at 9:38 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> John,
>
> your thinking is on the right track!
>
> About infinitely growing KTable: It seems you are extending each lane
> with a list of all txnId -- so your view needs infinite memory as you
> expend your values... A quick fix might be, to delete older txnID for
> this list, each time you update the list (as you mentioned you only
> need data for the last two weeks -- you might need to add a timestamp
> for each txnID in the list to do the pruning each time you append or
> lookup the list).
>
> > Ideally if the topic is set to two weeks retention, then once an
> > item is 'popped off' I would like to do an aggregate subtraction
> > for it's value.  But I don't think this is how kafka works.  Is
> > this possible?  Any other feedback/suggestion?    Perhaps a better
> > approach?
>
> There is no Kafka support for this. You would need to go with the
> suggest as describe above. The only "delete" mechanism Kafka offers is
> for compacted topics via tombstone message (ie, message with
> <key:null> format; value == null). However, tombstones do delete the
> whole record with this key, thus I doubt they are useful for your case.
>
> However, reading through your email, I am wondering why you do need
> the all old txnIds. You mentioned that you want to get the previous
> txnId for each duplicate (and you example results verifies this).
> Thus, it would be sufficient to only store the latest tnxId for each
> "lane" IMHO. Furhtermore, for this deduplication it seems sufficient
> to only use a KTable without a join.
>
> The ideas would the as follows: You consumer you streams as a
> changelog (ie, KTable). For each record, you check if there is an
> entry in the view. If not, just put the record itself as result
> because there is no duplicate. If you do find an entry, the current
> record is a duplicate of the record found. The record found, does
> contain it's txnId, you so can use this as "previous txnId". As
> result, you store the current record. You data format would be like
> <lane:(txnId,txnDate)> (for input) and
> <lane:(txnId,txnDate,previoudTxnId)> (for output.
>
> You stream and view would be like:
>
> {'c',('03','11/07/2016')} plus state: EMPTY
>
>   => {'c',('03','11/07/2016',''}    // this is output and state update
> at the same time
>
>
>
> {'c',('09','11/07/2016')} plus state: {'c',('03','11/07/2016',null}
>
>   => {'c',('09','11/07/2016','03')}     // this is output and state
> update at the same time
>
>
>
> {'c',('11','11/08/2016')} plus state: {'c',('09','11/07/2016','03')}
>
>  => {'c',('11','11/08/2016','09')}     // this is output and state
> update at the same time
>
>
> - -Matthias
>
> On 11/7/16 8:22 AM, John Hayles wrote:
> > Thanks for the reply.  I really appreciate the insight. Again
> > newbie here.  I want to expand on what I am struggling with.  It
> > may be that I just need to get my mind thinking more in a streaming
> > mode.  Please let me know you thoughts.  Just having problem
> > ‘getting it’ on my own.
> >
> >
> >
> > Below is a simple topic I want to identify where the 'lane'
> > duplicates, and when it does get the 'txnId' of the duplicate
> > record.  The txnId is distinct and will never be duplicate.  The
> > lane will seldom have a duplicate.
> >
> >
> >
> >
> >
> > Topic payload {txnId,lane,txnDate}  Notice lane 'c' is dulplicated
> > 3 times.
> >
> >
> >
> > {'01','wfasd','11/07/2016'}
> >
> > {'02','bas','11/07/2016'}
> >
> > {'03','c','11/07/2016'}
> >
> > {'04','xxwq','11/07/2016'}
> >
> > {'05','dasf','11/07/2016'}
> >
> > {'06','drdd','11/07/2016'}
> >
> > {'07','tasd','11/07/2016'}
> >
> > {'08','ywq','11/07/2016'}
> >
> > {'09','c','11/07/2016'}
> >
> > {'10','jda','11/07/2016'}
> >
> > {'11','c','11/08/2016'}
> >
> > {'12','ozs','11/09/2016'}
> >
> > . . .
> >
> > Note txnId and lane keep getting more distinct values.
> >
> >
> >
> >
> >
> > My thought is to join the data to itself,  one as kstream the other
> > as ktable for lookups.
> >
> >
> >
> > kstream as
> >
> >
> >
> > {lane:(txnId,txnDate)}
> >
> >
> >
> > so I visualize like ...
> >
> >
> >
> > ('wfasd':('01','11/07/2016')),
> >
> > ('bas'  :('02','11/07/2016')),
> >
> > ('c'    :('03','11/07/2016')), ...
> >
> >
> >
> > The ktable (lookup table) is an aggregate view I built to hold
> > historic data by lane:
> >
> >
> >
> > (lane:{(txnId1,txnDate1),
> >
> > (txnId2,txnDate2),
> >
> > . . .})
> >
> >
> >
> > I visualize the materialized view as below.
> >
> > 'c' being the important key/value for this example...
> >
> > Also note this materialized view will keep growing without bound.
> >
> > There will always be new keys and txnIds.
> >
> >
> >
> > ('wfasd':{('01','11/07/2016')}),
> >
> > ('bas'  :{('02','11/07/2016')}),
> >
> > ('c'    :{('03','11/07/2016'),
> >
> > ('09','11/07/2016'),
> >
> > ('11','11/09/2016')})
> >
> > . . .
> >
> >
> >
> > Now I can join a kstream to ktable on lane, and duplicates are easy
> > to identify.  I can traverse list from value found in materialized
> > view to get previous txnId I need.
> >
> >
> >
> > So I can build resulting stream / topic like…
> >
> >
> >
> > {txnId,lane,txnDate,duplicateTxnId}
> >
> >
> >
> > note where c duplicates there is a duplicate txnId...
> >
> >
> >
> > {'01','wfasd','11/07/2016',''}
> >
> > {'02','bas','11/07/2016',''}
> >
> > {'03','c','11/07/2016',''}
> >
> > {'04','xxwq','11/07/2016',''}
> >
> > {'05','dasf','11/07/2016',''}
> >
> > {'06','drdd','11/07/2016',''}
> >
> > {'07','tasd','11/07/2016',''}
> >
> > {'08','ywq','11/07/2016',''}
> >
> > {'09','c','11/07/2016','03'}
> >
> > {'10','jda','11/07/2016',''}
> >
> > {'11','c','11/08/2016','09'}
> >
> > {'12','ozs','11/09/2016',''}
> >
> >
> >
> >
> >
> > The issue is the materialized view of the ktable keeps growing
> > without bound, however by business rule I only need past 2 weeks,
> > so I think over time there is performance impact that is not needed
> > regarding the materialized view, one, the size of materialized view
> > keeps growing, and two, traversing ever increasing larger value
> > lists.
> >
> >
> >
> > Ideally if the topic is set to two weeks retention, then once an
> > item is 'popped off' I would like to do an aggregate subtraction
> > for it's value.  But I don't think this is how kafka works.  Is
> > this possible?  Any other feedback/suggestion?    Perhaps a better
> > approach?
> >
> >
> >
> > Thanks
> >
> > John
> >
> >
> >
> >
> >
> >
> >
> > -----Original Message-----
> >
> > From: Matthias J. Sax [mailto:matth...@confluent.io]
> >
> > Sent: Thursday, November 03, 2016 4:29 PM
> >
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >
> > Subject: Re: sliding ktable?
> >
> >
> >
> > Hash: SHA512
> >
> >
> >
> > Hi John,
> >
> >
> >
> > first of all, a KTable is a (changelog) stream; thus, by definition
> > it is infinite.
> >
> >
> >
> > However, I assume you are worried about the internal materialized
> > view, of the changelog stream (ie, a table state). This view only
> > contains the latest value for each key, ie, a single entry for each
> > key. Thus, it's size is bound by the number of key and does not
> > change as long as you number of distinct keys does not change.
> >
> >
> >
> >> At any given time I need at least 2 weeks data in my ktable
> >
> >
> >
> > There is no such think as "data of the last 2 weeks":
> >
> >
> >
> > Using a KTable for a KStream-KTable join to do lookups, each lookup
> > will be done on the current state if the KTable and thus only
> > return a single value for each key. There is no old data in the
> > materialized view with this regard. Of course, if a key does not
> > get any update for a long time, you can consider the corresponding
> > value as old, but it is still the latest (ie, current) value for
> > the key.
> >
> >
> >
> >> ktable.foreach
> >
> >
> >
> > #foreach() is applied to the changelog stream and not the
> > internally materialized view. Thus, it does not scan over the key
> > space or is applied to each currently stored key in the view. It is
> > rather called for each update record that is in the changelog
> > stream.
> >
> >
> >
> >> not sure keys can be removed this way
> >
> >
> >
> > The only way to delete a key-value entry in the materialized view
> > is to send a so-called tombstone record with format <key:null> (ie,
> > value is null). By "send" I mean that this tombstone record must be
> > in the input of the KTable.
> >
> >
> >
> >
> >
> >
> >
> > -Matthias
> >
> >
> >
> >
> >
> > On 11/3/16 12:39 PM, John Hayles wrote:
> >
> >> Newbie here, I am working with Kafka Streams with java 1.8.
> >
> >
> >
> >
> >
> >
> >
> >> I want to use the ktable as a lookup table in a join to a
> >> kstream.
> >
> >> I had no issue implementing this.  However, I do not want the
> >> ktable
> >
> >> to grow without bounds, I want to limit the ktable to the past 2
> >> weeks
> >
> >> data, more of a 'sliding' window ktable.  At any given time I
> >> need at
> >
> >> least 2 weeks data in my ktable, so I don’t think solution like
> >
> >> tumbling table will work since it starts over every time it
> >> hops.
> >
> >
> >
> >
> >
> >
> >
> >> A little simplified example. . .
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >> KStream<String, GenericRecord> txnStream =
> >> builder.stream("TXN_DATA");
> >
> >
> >
> >
> >
> >
> >
> >> KStream<String, GenericRecord> txnStreamFull = txnStream
> >
> >
> >
> >> .map((key, record) -> {
> >
> >
> >
> >> return new KeyValue<>(record.get("TXN").toString(), record);
> >
> >
> >
> >> })
> >
> >
> >
> >> .through("RekeyedIntermediateTopic1")
> >
> >
> >
> >> ;
> >
> >
> >
> >
> >
> >
> >
> >> KTable<String,Long> countTableStream =  txnStream   //  do not
> >> want
> >
> >> this table to grow indefinitely.
> >
> >
> >
> >> .map((key, record) -> {
> >
> >
> >
> >> return new KeyValue<>(record.get("TXN").toString(), record);
> >
> >
> >
> >> })
> >
> >
> >
> >> .through("RekeyedIntermediateTopic2")
> >
> >
> >
> >> .countByKey(stringSerdeKey, "DupCountKTable10");
> >
> >
> >
> >
> >
> >
> >
> >> KStream<String, GenericRecord> duplicatesStream =
> >
> >
> >
> >> txnStreamFull.leftJoin(countTableStream,
> >
> >> (vTxnStream,vCountTableStream) -> {
> >
> >
> >
> >> vTxnStream.put("count",
> >
> >> Long.toString(vCountTableStream.longValue()));
> >
> >
> >
> >> return vTxnStream;});
> >
> >
> >
> >
> >
> >
> >
> >> duplicatesStream.to("DUP_TXNS");
> >
> >
> >
> >
> >
> >
> >
> >> I thought perhaps can schedule ktable.foreach to inspect and
> >> clean,
> >
> >> but not sure keys can be removed this way.
> >
> >
> >
> >
> >
> >
> >
> >> I may be missing a basic concept here.  I have spent some time
> >
> >> searching but not finding good answer, thanks for any tips.
> >
> >
> >
> >
> >
> >
> >
> >> Thanks,
> >
> >
> >
> >> John
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYILwEAAoJECnhiMLycopPaAoP/3Nl6RuGm+FxtKG1QopgoJQ1
> ur1Tvejkuvce56QVjEbkCwg/plEWFqyjZow4nELUtJwZ8BNSqWvgbCZ4JYSdjqMT
> 264UPWMGBWfJ7YFDVszzU+dBriyNk4rLBwiYuW5/093YyUScN0aDSu1igwf72crr
> +kFnl9DyRwF7kdYFEpgqpisB+wCvc4ThXGJhFsflbs0yJVDko2FletIEL6vXF/ww
> 6Kgl6QxRoAseqHdiwkYrlMqFSq7OnuQweRhqOpAf6pnIEC4nZEoKImJHokfG9tH0
> 2pgcIvVhz/MD6C52nBBkB3WytF8xrh/grxnb3UU6NIEWAbJ5mcwnaweKULoVH4xp
> Tce5oGa7ADtZCBfanqC/28ed8nnECXtsM+qVNGkD2UjtbaB/nzsiha3fWKTH1Huy
> DfPsnr5kYLqP57Lc55ZFqNN5r6kQH0iggEXIV6MHMJBLI9zbDbqYBOcWcDlPLVaE
> QXUad6fKrM7SjpeEP38hNC1Wyx9W/NOSW+GEWotzPvxWLORMIBySTLBV5GCm3RVl
> dwyvfRBDf/bN3MT5+okBnOTfvIv9czCfNgBtfLFXa6ZGD531CqGRlJ454ET7Z8ON
> 8R5bYW0Eqz3cZKGf5A6IEzGz9H+/3mIymzrWI4ZDQ/D9GTnzA/9vqFsUgzh6V1kE
> 0+s9Dljyfws1bzXe+g49
> =Roop
> -----END PGP SIGNATURE-----
>



-- 
Radha Krishna, Proddaturi
253-234-5657

Reply via email to