If anyone is interested, here is my custom timestamp extractor:
https://gist.github.com/nfo/54d5830720e163d2e7e848b6e4baac20 .

2017-01-16 15:52 GMT+01:00 Nicolas Fouché <nfou...@onfocus.io>:

> Hi Michael,
>
> got it. I understand that it would be less error-prone to generate the
> final "altered" timestamp on the Producer side, instead of trying to
> compute it each time the record is consumed.
>
> Thanks.
> Nicolas.
>
> 2017-01-16 10:03 GMT+01:00 Michael Noll <mich...@confluent.io>:
>
>> Nicolas,
>>
>> quick feedback on timestamps:
>>
>> > In our system, clients send data to an HTTP API. This API produces the
>> > records in Kafka. I can't rely on the clock of the clients sending the
>> > original data, (so the records' timestamps are set by the servers
>> ingesting
>> > the records in Kafka), but I can rely on a time difference. The client
>> only
>> > gives information about the time spent since the first version of the
>> > record was sent. Via a custom timestamp extractor, I just need to
>> subtract
>> > the time spent to the record's timestamp to ensure that it will fall in
>> > same window.
>>
>> Alternatively, you can also let the HTTP API handle the timestamp
>> calculations, and then embed the "final" timestamp in the message payload
>> (like the messave value).  Then, in your downstream application, you'd
>> define a custom timestamp extractor that returns this embedded timestamp.
>>
>> One advantage of the approach I outlined above is that other consumers of
>> the same data (who may or may not be aware of how you need to compute a
>> timestamp diff to get the "real" timestamp) can simply re-use the
>> timestamp
>> embedded in the payload without having to know/worry about the custom
>> calculation.  It might also be easier for Ops personnel to have access to
>> a
>> ready-to-use timestamp in case they need to debug or troubleshoot.
>>
>> -Michael
>>
>>
>>
>>
>> On Sun, Jan 15, 2017 at 11:10 PM, Nicolas Fouché <nfou...@onfocus.io>
>> wrote:
>>
>> > Hi Eno,
>> >
>> > 2. Well, records could arrive out of order. But it should happen rarely,
>> > and it's no big deal anyway. So let's forget about the version number
>> if it
>> > makes things easier !
>> >
>> > 3. I completely missed out on KTable aggregations. Thanks a lot for the
>> > pointer, that opens new perspectives.
>> >
>> > ... a few hours pass ...
>> >
>> > Ok, in my case, since my input is an infinite stream of new records, I
>> > would have to "window" my KTables, right ?
>> > With `KStream.groupBy().reduce()`, I can generate a windowed KTable of
>> > records, and even use the reducer function to compare the version
>> numbers.
>> > Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder`
>> and
>> > `substractor` mechanisms [1].
>> >
>> > The last problem is about the record timestamp. If I work on a one-hour
>> > window, and records are sent between let's say 00:59 and 01:01, they
>> would
>> > live in two different KTables and this would create duplicates.
>> > To deal with this, I could mess with the records timestamps, so each new
>> > record version is considered by Kafka Streams having the same timestamp
>> > than the first version seen by the producer.
>> > Here is my idea:
>> > In our system, clients send data to an HTTP API. This API produces the
>> > records in Kafka. I can't rely on the clock of the clients sending the
>> > original data, (so the records' timestamps are set by the servers
>> ingesting
>> > the records in Kafka), but I can rely on a time difference. The client
>> only
>> > gives information about the time spent since the first version of the
>> > record was sent. Via a custom timestamp extractor, I just need to
>> subtract
>> > the time spent to the record's timestamp to ensure that it will fall in
>> > same window.
>> > Long text, small code:
>> > https://gist.github.com/nfo/6df4d1076af9da5fd1c29b0ad4564f2a .What do
>> you
>> > think ?
>> >
>> > About the windowed KTables in the first step, I guess I should avoid
>> making
>> > them too long, since they store the whole records. We usually aggregate
>> > with windows size from 1 hour to 1 month. I should compute all the
>> > aggregates covering more than 1 hour from the 1-hour aggregates, right ?
>> >
>> > [1]
>> > http://docs.confluent.io/3.1.1/streams/javadocs/org/apache/
>> > kafka/streams/kstream/KGroupedTable.html#aggregate(
>> > org.apache.kafka.streams.kstream.Initializer,%20org.
>> > apache.kafka.streams.kstream.Aggregator,%20org.apache.
>> > kafka.streams.kstream.Aggregator,%20org.apache.kafka.common.
>> serialization.
>> > Serde,%20java.lang.String)
>> >
>> > Thanks (a lot).
>> > Nicolas
>> >
>> >
>> > 2017-01-13 17:32 GMT+01:00 Eno Thereska <eno.there...@gmail.com>:
>> >
>> > > Hi Nicolas,
>> > >
>> > > There is a lot here, so let's try to split the concerns around some
>> > themes:
>> > >
>> > > 1. The Processor API is flexible and can definitely do what you want,
>> but
>> > > as you mentioned, at the cost of you having to manually craft the
>> code.
>> > > 2. Why are the versions used? I sense there is concern about records
>> > > arriving out of order so the versions give each record with the same
>> ID
>> > an
>> > > order. Is that correct?
>> > > 3. If you didn't have the version and the count requirement I'd say
>> using
>> > > a KTable to interpret the stream and then aggregating on that would be
>> > > sufficient. There might be a way to do that with a mixture of the DSL
>> and
>> > > the processor API.
>> > >
>> > > Another alternative might be to use the Interactive Query APIs (
>> > > https://www.confluent.io/blog/unifying-stream-processing-
>> > and-interactive-
>> > > queries-in-apache-kafka/ <https://www.confluent.io/blog
>> /unifying-stream-
>> > > processing-and-interactive-queries-in-apache-kafka/>) to first get
>> all
>> > > your data in KTables and then query it periodically (you can decide on
>> > the
>> > > frequency manually).
>> > >
>> > > Thanks
>> > > Eno
>> > >
>> > >
>> > > > On 12 Jan 2017, at 22:19, Nicolas Fouché <nfou...@onfocus.io>
>> wrote:
>> > > >
>> > > > Hi,
>> > > >
>> > > > long long technical story, sorry for that.
>> > > >
>> > > > I'm dealing with a special case. My input topic receives records
>> > > containing
>> > > > an id in the key (and another field for partitioning), and a version
>> > > number
>> > > > in the value, amongst other metrics. Records with the same id are
>> sent
>> > > > every 5 seconds, and the version number increments.
>> > > >
>> > > > These metrics in the record value are used in aggregations to
>> compute
>> > > > `sums` and `counts` (then stored in a DB to compute averages), and
>> to
>> > > > compute a few other data structures like cumulative time buckets. If
>> > the
>> > > > aggregation receives the same record with updated metrics, I have to
>> > > > decrement `sum` by the metric value of the previous record, and
>> > increment
>> > > > `sum` by the new metric value. Also, the `count` would be
>> incremented
>> > by
>> > > 1
>> > > > only if the record is seen for the first time (which is not the
>> same as
>> > > > "version number = 1").
>> > > >
>> > > > To implement this, we would write a processor which would compute
>> the
>> > > diff
>> > > > of metrics by storing the last version of each record in its state.
>> > This
>> > > > diff is sent to the aggregation, this diff also tells if the record
>> was
>> > > the
>> > > > first (so `count` is incremented). I think this can only written
>> with
>> > the
>> > > > low level API.
>> > > > That could work well, except we have a dozen type of records, with a
>> > few
>> > > > metrics each, and quite a few fields to compute in aggregations.
>> Each
>> > > time
>> > > > we deal with this type of "duplicate" records, we would have to
>> write
>> > all
>> > > > the code to compute the diffs again, and the aggregation algorithm
>> > > becomes
>> > > > way less trivial (we deal with cumulative time buckets, if one knows
>> > > what I
>> > > > mean).
>> > > >
>> > > > So we got another idea, which does not seem to feel right in a
>> > > *streaming*
>> > > > environment, and quite inefficient:
>> > > >
>> > > > ====
>> > > > The goal is to "buffer" records until we're quite sure no new
>> version
>> > > will
>> > > > be received. And if a new version is actually received, it's
>> ignored.
>> > > > A generic low level processor would be used in topologies which
>> receive
>> > > the
>> > > > same records with updated metrics and an incremented version.
>> > > >
>> > > > One state store: contains the records, used to know if a record was
>> > > already
>> > > > received and when, and if the record was already transferred.
>> > > >
>> > > > Algorithm:
>> > > >
>> > > > On each new record:
>> > > > - GET the previous record in the store by Key
>> > > > - ignore the new record if:
>> > > > -- the record version is lower than the one in the store
>> > > > -- the record timestamp is at least 5 minutes newer than the one in
>> > store
>> > > > - PUT (and thus replace) the record in the store
>> > > >
>> > > > Every 1 minute:
>> > > > - for each record in the store
>> > > > -- if the record has the field "forwarded == true"
>> > > > --- DELETE it from the store if the record is one hour old
>> > > > -- else
>> > > > --- if the timestamp is more that 5 minutes old
>> > > > ---- PUT the record in the store with the field "forwarded" set to
>> true
>> > > > ---- forward the record
>> > > > ===
>> > > >
>> > > > Caveats:
>> > > > - low-level processors don't have access to the record's ingestion
>> > > > timestamp. So we would have to add it to the record value before
>> > > producing
>> > > > the record.
>> > > > - no secondary indexes, so we do complete iterations on each
>> > `ponctuate`
>> > > > - it feels so wrong
>> > > >
>> > > > Any suggestions ? It feels like a KStream of KTable records...
>> > > >
>> > > > Thanks.
>> > >
>> > >
>> >
>>
>
>

Reply via email to