Hi All,

Just to give a bit of context; I have an application which is SNMP polling
a network. Each collector agent works on a 1 minute schedule, polling
device(s) and posting results to a Kafka topic.
The time a given collector publishes data can vary within a minute, but it
should never overlap with the next minute time bucket.

The topic produced to, for arguments sake 'device-results' has multiple
partitions. The data is keyed such as 'device-id|location-id'.

I then had a requirement to aggregate the data by location; every device
result within the same location is summed, and an aggregate is output each
minute.
I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a
solution to this problem - but we found the throughput was abysmal -
probably due to the I/O performance of our virtual machine infrastructure.

Instead we have hand-rolled something simplistic - which does the job 99%
well.
 - We use a through() to re-partition the topic to just location-id
 - We keep an object representing the current minute's aggregate in an
in-memory state store (with changelog)
 - When any device result is transformed, and has a timestamp that is older
than our current window time - we output the aggregate, otherwise update
the running sum.

 What I have noticed is that when I do a rolling restart of the
application, such as to push new code, data is dropped because of messages
processed out of order.
I changed the code to include the equivalent of an extra minute's grace
time, but in production I see messages arriving that are > 2min behind what
the latest messages are.

I came across the documentation
https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering
which alluded to maybe a solution.
Could anyone advise if there is a way in code/configuration properties that
I could better guarantee that streams prioritises the *oldest* messages
first, rather than caring about offset?

Thanks in advance for any replies!

Marcus

Reply via email to