Hi All, Just to give a bit of context; I have an application which is SNMP polling a network. Each collector agent works on a 1 minute schedule, polling device(s) and posting results to a Kafka topic. The time a given collector publishes data can vary within a minute, but it should never overlap with the next minute time bucket.
The topic produced to, for arguments sake 'device-results' has multiple partitions. The data is keyed such as 'device-id|location-id'. I then had a requirement to aggregate the data by location; every device result within the same location is summed, and an aggregate is output each minute. I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a solution to this problem - but we found the throughput was abysmal - probably due to the I/O performance of our virtual machine infrastructure. Instead we have hand-rolled something simplistic - which does the job 99% well. - We use a through() to re-partition the topic to just location-id - We keep an object representing the current minute's aggregate in an in-memory state store (with changelog) - When any device result is transformed, and has a timestamp that is older than our current window time - we output the aggregate, otherwise update the running sum. What I have noticed is that when I do a rolling restart of the application, such as to push new code, data is dropped because of messages processed out of order. I changed the code to include the equivalent of an extra minute's grace time, but in production I see messages arriving that are > 2min behind what the latest messages are. I came across the documentation https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering which alluded to maybe a solution. Could anyone advise if there is a way in code/configuration properties that I could better guarantee that streams prioritises the *oldest* messages first, rather than caring about offset? Thanks in advance for any replies! Marcus