Thanks for sharing back your findings/code, Nicolas! -Michael
On Mon, Jan 16, 2017 at 11:15 PM, Nicolas Fouché <nfou...@onfocus.io> wrote: > If anyone is interested, here is my custom timestamp extractor: > https://gist.github.com/nfo/54d5830720e163d2e7e848b6e4baac20 . > > 2017-01-16 15:52 GMT+01:00 Nicolas Fouché <nfou...@onfocus.io>: > > > Hi Michael, > > > > got it. I understand that it would be less error-prone to generate the > > final "altered" timestamp on the Producer side, instead of trying to > > compute it each time the record is consumed. > > > > Thanks. > > Nicolas. > > > > 2017-01-16 10:03 GMT+01:00 Michael Noll <mich...@confluent.io>: > > > >> Nicolas, > >> > >> quick feedback on timestamps: > >> > >> > In our system, clients send data to an HTTP API. This API produces the > >> > records in Kafka. I can't rely on the clock of the clients sending the > >> > original data, (so the records' timestamps are set by the servers > >> ingesting > >> > the records in Kafka), but I can rely on a time difference. The client > >> only > >> > gives information about the time spent since the first version of the > >> > record was sent. Via a custom timestamp extractor, I just need to > >> subtract > >> > the time spent to the record's timestamp to ensure that it will fall > in > >> > same window. > >> > >> Alternatively, you can also let the HTTP API handle the timestamp > >> calculations, and then embed the "final" timestamp in the message > payload > >> (like the messave value). Then, in your downstream application, you'd > >> define a custom timestamp extractor that returns this embedded > timestamp. > >> > >> One advantage of the approach I outlined above is that other consumers > of > >> the same data (who may or may not be aware of how you need to compute a > >> timestamp diff to get the "real" timestamp) can simply re-use the > >> timestamp > >> embedded in the payload without having to know/worry about the custom > >> calculation. It might also be easier for Ops personnel to have access > to > >> a > >> ready-to-use timestamp in case they need to debug or troubleshoot. > >> > >> -Michael > >> > >> > >> > >> > >> On Sun, Jan 15, 2017 at 11:10 PM, Nicolas Fouché <nfou...@onfocus.io> > >> wrote: > >> > >> > Hi Eno, > >> > > >> > 2. Well, records could arrive out of order. But it should happen > rarely, > >> > and it's no big deal anyway. So let's forget about the version number > >> if it > >> > makes things easier ! > >> > > >> > 3. I completely missed out on KTable aggregations. Thanks a lot for > the > >> > pointer, that opens new perspectives. > >> > > >> > ... a few hours pass ... > >> > > >> > Ok, in my case, since my input is an infinite stream of new records, I > >> > would have to "window" my KTables, right ? > >> > With `KStream.groupBy().reduce()`, I can generate a windowed KTable of > >> > records, and even use the reducer function to compare the version > >> numbers. > >> > Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder` > >> and > >> > `substractor` mechanisms [1]. > >> > > >> > The last problem is about the record timestamp. If I work on a > one-hour > >> > window, and records are sent between let's say 00:59 and 01:01, they > >> would > >> > live in two different KTables and this would create duplicates. > >> > To deal with this, I could mess with the records timestamps, so each > new > >> > record version is considered by Kafka Streams having the same > timestamp > >> > than the first version seen by the producer. > >> > Here is my idea: > >> > In our system, clients send data to an HTTP API. This API produces the > >> > records in Kafka. I can't rely on the clock of the clients sending the > >> > original data, (so the records' timestamps are set by the servers > >> ingesting > >> > the records in Kafka), but I can rely on a time difference. The client > >> only > >> > gives information about the time spent since the first version of the > >> > record was sent. Via a custom timestamp extractor, I just need to > >> subtract > >> > the time spent to the record's timestamp to ensure that it will fall > in > >> > same window. > >> > Long text, small code: > >> > https://gist.github.com/nfo/6df4d1076af9da5fd1c29b0ad4564f2a .What do > >> you > >> > think ? > >> > > >> > About the windowed KTables in the first step, I guess I should avoid > >> making > >> > them too long, since they store the whole records. We usually > aggregate > >> > with windows size from 1 hour to 1 month. I should compute all the > >> > aggregates covering more than 1 hour from the 1-hour aggregates, > right ? > >> > > >> > [1] > >> > http://docs.confluent.io/3.1.1/streams/javadocs/org/apache/ > >> > kafka/streams/kstream/KGroupedTable.html#aggregate( > >> > org.apache.kafka.streams.kstream.Initializer,%20org. > >> > apache.kafka.streams.kstream.Aggregator,%20org.apache. > >> > kafka.streams.kstream.Aggregator,%20org.apache.kafka.common. > >> serialization. > >> > Serde,%20java.lang.String) > >> > > >> > Thanks (a lot). > >> > Nicolas > >> > > >> > > >> > 2017-01-13 17:32 GMT+01:00 Eno Thereska <eno.there...@gmail.com>: > >> > > >> > > Hi Nicolas, > >> > > > >> > > There is a lot here, so let's try to split the concerns around some > >> > themes: > >> > > > >> > > 1. The Processor API is flexible and can definitely do what you > want, > >> but > >> > > as you mentioned, at the cost of you having to manually craft the > >> code. > >> > > 2. Why are the versions used? I sense there is concern about records > >> > > arriving out of order so the versions give each record with the same > >> ID > >> > an > >> > > order. Is that correct? > >> > > 3. If you didn't have the version and the count requirement I'd say > >> using > >> > > a KTable to interpret the stream and then aggregating on that would > be > >> > > sufficient. There might be a way to do that with a mixture of the > DSL > >> and > >> > > the processor API. > >> > > > >> > > Another alternative might be to use the Interactive Query APIs ( > >> > > https://www.confluent.io/blog/unifying-stream-processing- > >> > and-interactive- > >> > > queries-in-apache-kafka/ <https://www.confluent.io/blog > >> /unifying-stream- > >> > > processing-and-interactive-queries-in-apache-kafka/>) to first get > >> all > >> > > your data in KTables and then query it periodically (you can decide > on > >> > the > >> > > frequency manually). > >> > > > >> > > Thanks > >> > > Eno > >> > > > >> > > > >> > > > On 12 Jan 2017, at 22:19, Nicolas Fouché <nfou...@onfocus.io> > >> wrote: > >> > > > > >> > > > Hi, > >> > > > > >> > > > long long technical story, sorry for that. > >> > > > > >> > > > I'm dealing with a special case. My input topic receives records > >> > > containing > >> > > > an id in the key (and another field for partitioning), and a > version > >> > > number > >> > > > in the value, amongst other metrics. Records with the same id are > >> sent > >> > > > every 5 seconds, and the version number increments. > >> > > > > >> > > > These metrics in the record value are used in aggregations to > >> compute > >> > > > `sums` and `counts` (then stored in a DB to compute averages), and > >> to > >> > > > compute a few other data structures like cumulative time buckets. > If > >> > the > >> > > > aggregation receives the same record with updated metrics, I have > to > >> > > > decrement `sum` by the metric value of the previous record, and > >> > increment > >> > > > `sum` by the new metric value. Also, the `count` would be > >> incremented > >> > by > >> > > 1 > >> > > > only if the record is seen for the first time (which is not the > >> same as > >> > > > "version number = 1"). > >> > > > > >> > > > To implement this, we would write a processor which would compute > >> the > >> > > diff > >> > > > of metrics by storing the last version of each record in its > state. > >> > This > >> > > > diff is sent to the aggregation, this diff also tells if the > record > >> was > >> > > the > >> > > > first (so `count` is incremented). I think this can only written > >> with > >> > the > >> > > > low level API. > >> > > > That could work well, except we have a dozen type of records, > with a > >> > few > >> > > > metrics each, and quite a few fields to compute in aggregations. > >> Each > >> > > time > >> > > > we deal with this type of "duplicate" records, we would have to > >> write > >> > all > >> > > > the code to compute the diffs again, and the aggregation algorithm > >> > > becomes > >> > > > way less trivial (we deal with cumulative time buckets, if one > knows > >> > > what I > >> > > > mean). > >> > > > > >> > > > So we got another idea, which does not seem to feel right in a > >> > > *streaming* > >> > > > environment, and quite inefficient: > >> > > > > >> > > > ==== > >> > > > The goal is to "buffer" records until we're quite sure no new > >> version > >> > > will > >> > > > be received. And if a new version is actually received, it's > >> ignored. > >> > > > A generic low level processor would be used in topologies which > >> receive > >> > > the > >> > > > same records with updated metrics and an incremented version. > >> > > > > >> > > > One state store: contains the records, used to know if a record > was > >> > > already > >> > > > received and when, and if the record was already transferred. > >> > > > > >> > > > Algorithm: > >> > > > > >> > > > On each new record: > >> > > > - GET the previous record in the store by Key > >> > > > - ignore the new record if: > >> > > > -- the record version is lower than the one in the store > >> > > > -- the record timestamp is at least 5 minutes newer than the one > in > >> > store > >> > > > - PUT (and thus replace) the record in the store > >> > > > > >> > > > Every 1 minute: > >> > > > - for each record in the store > >> > > > -- if the record has the field "forwarded == true" > >> > > > --- DELETE it from the store if the record is one hour old > >> > > > -- else > >> > > > --- if the timestamp is more that 5 minutes old > >> > > > ---- PUT the record in the store with the field "forwarded" set to > >> true > >> > > > ---- forward the record > >> > > > === > >> > > > > >> > > > Caveats: > >> > > > - low-level processors don't have access to the record's ingestion > >> > > > timestamp. So we would have to add it to the record value before > >> > > producing > >> > > > the record. > >> > > > - no secondary indexes, so we do complete iterations on each > >> > `ponctuate` > >> > > > - it feels so wrong > >> > > > > >> > > > Any suggestions ? It feels like a KStream of KTable records... > >> > > > > >> > > > Thanks. > >> > > > >> > > > >> > > >> > > > > >