Nicolas, quick feedback on timestamps:
> In our system, clients send data to an HTTP API. This API produces the > records in Kafka. I can't rely on the clock of the clients sending the > original data, (so the records' timestamps are set by the servers ingesting > the records in Kafka), but I can rely on a time difference. The client only > gives information about the time spent since the first version of the > record was sent. Via a custom timestamp extractor, I just need to subtract > the time spent to the record's timestamp to ensure that it will fall in > same window. Alternatively, you can also let the HTTP API handle the timestamp calculations, and then embed the "final" timestamp in the message payload (like the messave value). Then, in your downstream application, you'd define a custom timestamp extractor that returns this embedded timestamp. One advantage of the approach I outlined above is that other consumers of the same data (who may or may not be aware of how you need to compute a timestamp diff to get the "real" timestamp) can simply re-use the timestamp embedded in the payload without having to know/worry about the custom calculation. It might also be easier for Ops personnel to have access to a ready-to-use timestamp in case they need to debug or troubleshoot. -Michael On Sun, Jan 15, 2017 at 11:10 PM, Nicolas Fouché <nfou...@onfocus.io> wrote: > Hi Eno, > > 2. Well, records could arrive out of order. But it should happen rarely, > and it's no big deal anyway. So let's forget about the version number if it > makes things easier ! > > 3. I completely missed out on KTable aggregations. Thanks a lot for the > pointer, that opens new perspectives. > > ... a few hours pass ... > > Ok, in my case, since my input is an infinite stream of new records, I > would have to "window" my KTables, right ? > With `KStream.groupBy().reduce()`, I can generate a windowed KTable of > records, and even use the reducer function to compare the version numbers. > Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder` and > `substractor` mechanisms [1]. > > The last problem is about the record timestamp. If I work on a one-hour > window, and records are sent between let's say 00:59 and 01:01, they would > live in two different KTables and this would create duplicates. > To deal with this, I could mess with the records timestamps, so each new > record version is considered by Kafka Streams having the same timestamp > than the first version seen by the producer. > Here is my idea: > In our system, clients send data to an HTTP API. This API produces the > records in Kafka. I can't rely on the clock of the clients sending the > original data, (so the records' timestamps are set by the servers ingesting > the records in Kafka), but I can rely on a time difference. The client only > gives information about the time spent since the first version of the > record was sent. Via a custom timestamp extractor, I just need to subtract > the time spent to the record's timestamp to ensure that it will fall in > same window. > Long text, small code: > https://gist.github.com/nfo/6df4d1076af9da5fd1c29b0ad4564f2a .What do you > think ? > > About the windowed KTables in the first step, I guess I should avoid making > them too long, since they store the whole records. We usually aggregate > with windows size from 1 hour to 1 month. I should compute all the > aggregates covering more than 1 hour from the 1-hour aggregates, right ? > > [1] > http://docs.confluent.io/3.1.1/streams/javadocs/org/apache/ > kafka/streams/kstream/KGroupedTable.html#aggregate( > org.apache.kafka.streams.kstream.Initializer,%20org. > apache.kafka.streams.kstream.Aggregator,%20org.apache. > kafka.streams.kstream.Aggregator,%20org.apache.kafka.common.serialization. > Serde,%20java.lang.String) > > Thanks (a lot). > Nicolas > > > 2017-01-13 17:32 GMT+01:00 Eno Thereska <eno.there...@gmail.com>: > > > Hi Nicolas, > > > > There is a lot here, so let's try to split the concerns around some > themes: > > > > 1. The Processor API is flexible and can definitely do what you want, but > > as you mentioned, at the cost of you having to manually craft the code. > > 2. Why are the versions used? I sense there is concern about records > > arriving out of order so the versions give each record with the same ID > an > > order. Is that correct? > > 3. If you didn't have the version and the count requirement I'd say using > > a KTable to interpret the stream and then aggregating on that would be > > sufficient. There might be a way to do that with a mixture of the DSL and > > the processor API. > > > > Another alternative might be to use the Interactive Query APIs ( > > https://www.confluent.io/blog/unifying-stream-processing- > and-interactive- > > queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream- > > processing-and-interactive-queries-in-apache-kafka/>) to first get all > > your data in KTables and then query it periodically (you can decide on > the > > frequency manually). > > > > Thanks > > Eno > > > > > > > On 12 Jan 2017, at 22:19, Nicolas Fouché <nfou...@onfocus.io> wrote: > > > > > > Hi, > > > > > > long long technical story, sorry for that. > > > > > > I'm dealing with a special case. My input topic receives records > > containing > > > an id in the key (and another field for partitioning), and a version > > number > > > in the value, amongst other metrics. Records with the same id are sent > > > every 5 seconds, and the version number increments. > > > > > > These metrics in the record value are used in aggregations to compute > > > `sums` and `counts` (then stored in a DB to compute averages), and to > > > compute a few other data structures like cumulative time buckets. If > the > > > aggregation receives the same record with updated metrics, I have to > > > decrement `sum` by the metric value of the previous record, and > increment > > > `sum` by the new metric value. Also, the `count` would be incremented > by > > 1 > > > only if the record is seen for the first time (which is not the same as > > > "version number = 1"). > > > > > > To implement this, we would write a processor which would compute the > > diff > > > of metrics by storing the last version of each record in its state. > This > > > diff is sent to the aggregation, this diff also tells if the record was > > the > > > first (so `count` is incremented). I think this can only written with > the > > > low level API. > > > That could work well, except we have a dozen type of records, with a > few > > > metrics each, and quite a few fields to compute in aggregations. Each > > time > > > we deal with this type of "duplicate" records, we would have to write > all > > > the code to compute the diffs again, and the aggregation algorithm > > becomes > > > way less trivial (we deal with cumulative time buckets, if one knows > > what I > > > mean). > > > > > > So we got another idea, which does not seem to feel right in a > > *streaming* > > > environment, and quite inefficient: > > > > > > ==== > > > The goal is to "buffer" records until we're quite sure no new version > > will > > > be received. And if a new version is actually received, it's ignored. > > > A generic low level processor would be used in topologies which receive > > the > > > same records with updated metrics and an incremented version. > > > > > > One state store: contains the records, used to know if a record was > > already > > > received and when, and if the record was already transferred. > > > > > > Algorithm: > > > > > > On each new record: > > > - GET the previous record in the store by Key > > > - ignore the new record if: > > > -- the record version is lower than the one in the store > > > -- the record timestamp is at least 5 minutes newer than the one in > store > > > - PUT (and thus replace) the record in the store > > > > > > Every 1 minute: > > > - for each record in the store > > > -- if the record has the field "forwarded == true" > > > --- DELETE it from the store if the record is one hour old > > > -- else > > > --- if the timestamp is more that 5 minutes old > > > ---- PUT the record in the store with the field "forwarded" set to true > > > ---- forward the record > > > === > > > > > > Caveats: > > > - low-level processors don't have access to the record's ingestion > > > timestamp. So we would have to add it to the record value before > > producing > > > the record. > > > - no secondary indexes, so we do complete iterations on each > `ponctuate` > > > - it feels so wrong > > > > > > Any suggestions ? It feels like a KStream of KTable records... > > > > > > Thanks. > > > > >