Hi Eno,

2. Well, records could arrive out of order. But it should happen rarely,
and it's no big deal anyway. So let's forget about the version number if it
makes things easier !

3. I completely missed out on KTable aggregations. Thanks a lot for the
pointer, that opens new perspectives.

... a few hours pass ...

Ok, in my case, since my input is an infinite stream of new records, I
would have to "window" my KTables, right ?
With `KStream.groupBy().reduce()`, I can generate a windowed KTable of
records, and even use the reducer function to compare the version numbers.
Next, I use `KTable.groupBy().aggregate()` to benefit from the `adder` and
`substractor` mechanisms [1].

The last problem is about the record timestamp. If I work on a one-hour
window, and records are sent between let's say 00:59 and 01:01, they would
live in two different KTables and this would create duplicates.
To deal with this, I could mess with the records timestamps, so each new
record version is considered by Kafka Streams having the same timestamp
than the first version seen by the producer.
Here is my idea:
In our system, clients send data to an HTTP API. This API produces the
records in Kafka. I can't rely on the clock of the clients sending the
original data, (so the records' timestamps are set by the servers ingesting
the records in Kafka), but I can rely on a time difference. The client only
gives information about the time spent since the first version of the
record was sent. Via a custom timestamp extractor, I just need to subtract
the time spent to the record's timestamp to ensure that it will fall in
same window.
Long text, small code:
https://gist.github.com/nfo/6df4d1076af9da5fd1c29b0ad4564f2a .What do you
think ?

About the windowed KTables in the first step, I guess I should avoid making
them too long, since they store the whole records. We usually aggregate
with windows size from 1 hour to 1 month. I should compute all the
aggregates covering more than 1 hour from the 1-hour aggregates, right ?

[1]
http://docs.confluent.io/3.1.1/streams/javadocs/org/apache/kafka/streams/kstream/KGroupedTable.html#aggregate(org.apache.kafka.streams.kstream.Initializer,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.streams.kstream.Aggregator,%20org.apache.kafka.common.serialization.Serde,%20java.lang.String)

Thanks (a lot).
Nicolas


2017-01-13 17:32 GMT+01:00 Eno Thereska <eno.there...@gmail.com>:

> Hi Nicolas,
>
> There is a lot here, so let's try to split the concerns around some themes:
>
> 1. The Processor API is flexible and can definitely do what you want, but
> as you mentioned, at the cost of you having to manually craft the code.
> 2. Why are the versions used? I sense there is concern about records
> arriving out of order so the versions give each record with the same ID an
> order. Is that correct?
> 3. If you didn't have the version and the count requirement I'd say using
> a KTable to interpret the stream and then aggregating on that would be
> sufficient. There might be a way to do that with a mixture of the DSL and
> the processor API.
>
> Another alternative might be to use the Interactive Query APIs (
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-
> queries-in-apache-kafka/ <https://www.confluent.io/blog/unifying-stream-
> processing-and-interactive-queries-in-apache-kafka/>) to first get all
> your data in KTables and then query it periodically (you can decide on the
> frequency manually).
>
> Thanks
> Eno
>
>
> > On 12 Jan 2017, at 22:19, Nicolas Fouché <nfou...@onfocus.io> wrote:
> >
> > Hi,
> >
> > long long technical story, sorry for that.
> >
> > I'm dealing with a special case. My input topic receives records
> containing
> > an id in the key (and another field for partitioning), and a version
> number
> > in the value, amongst other metrics. Records with the same id are sent
> > every 5 seconds, and the version number increments.
> >
> > These metrics in the record value are used in aggregations to compute
> > `sums` and `counts` (then stored in a DB to compute averages), and to
> > compute a few other data structures like cumulative time buckets. If the
> > aggregation receives the same record with updated metrics, I have to
> > decrement `sum` by the metric value of the previous record, and increment
> > `sum` by the new metric value. Also, the `count` would be incremented by
> 1
> > only if the record is seen for the first time (which is not the same as
> > "version number = 1").
> >
> > To implement this, we would write a processor which would compute the
> diff
> > of metrics by storing the last version of each record in its state. This
> > diff is sent to the aggregation, this diff also tells if the record was
> the
> > first (so `count` is incremented). I think this can only written with the
> > low level API.
> > That could work well, except we have a dozen type of records, with a few
> > metrics each, and quite a few fields to compute in aggregations. Each
> time
> > we deal with this type of "duplicate" records, we would have to write all
> > the code to compute the diffs again, and the aggregation algorithm
> becomes
> > way less trivial (we deal with cumulative time buckets, if one knows
> what I
> > mean).
> >
> > So we got another idea, which does not seem to feel right in a
> *streaming*
> > environment, and quite inefficient:
> >
> > ====
> > The goal is to "buffer" records until we're quite sure no new version
> will
> > be received. And if a new version is actually received, it's ignored.
> > A generic low level processor would be used in topologies which receive
> the
> > same records with updated metrics and an incremented version.
> >
> > One state store: contains the records, used to know if a record was
> already
> > received and when, and if the record was already transferred.
> >
> > Algorithm:
> >
> > On each new record:
> > - GET the previous record in the store by Key
> > - ignore the new record if:
> > -- the record version is lower than the one in the store
> > -- the record timestamp is at least 5 minutes newer than the one in store
> > - PUT (and thus replace) the record in the store
> >
> > Every 1 minute:
> > - for each record in the store
> > -- if the record has the field "forwarded == true"
> > --- DELETE it from the store if the record is one hour old
> > -- else
> > --- if the timestamp is more that 5 minutes old
> > ---- PUT the record in the store with the field "forwarded" set to true
> > ---- forward the record
> > ===
> >
> > Caveats:
> > - low-level processors don't have access to the record's ingestion
> > timestamp. So we would have to add it to the record value before
> producing
> > the record.
> > - no secondary indexes, so we do complete iterations on each `ponctuate`
> > - it feels so wrong
> >
> > Any suggestions ? It feels like a KStream of KTable records...
> >
> > Thanks.
>
>

Reply via email to