In general, Kafka Streams tries to process messages in timestamp order, ie, oldest message first. However, Kafka Streams always need to process messages in offset order per partition, and thus, the timestamp synchronization applied to records from different topic (eg, if you join two topics).
There is config `max.task.idle.ms` to improve timestamp synchronization, but I am not sure if it would help in your case, as it seems you have a single input topic. It seems, there is already out-of-order data in your input topic. Also note that your repartition step, may introduce out-or-order data. As you are using a custom Processor, it is up to you to handle out-of-order data, and it seems that you may need to introduce a larger grace period. In general, it's very hard (too impossible) to know how much unorder is in a topic, due the decoupled nature of Kafka and interleaved writes of different producers into a topic. Not sure if you could change the original partitioning to just use `location-id` to avoid the additional repartitioning step. This could help to reduce unorder. For completeness, check out those Kafka Summit talks: - https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ - https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/ Hope this helps. -Matthias On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote: > Hi All, > > Just to give a bit of context; I have an application which is SNMP polling > a network. Each collector agent works on a 1 minute schedule, polling > device(s) and posting results to a Kafka topic. > The time a given collector publishes data can vary within a minute, but it > should never overlap with the next minute time bucket. > > The topic produced to, for arguments sake 'device-results' has multiple > partitions. The data is keyed such as 'device-id|location-id'. > > I then had a requirement to aggregate the data by location; every device > result within the same location is summed, and an aggregate is output each > minute. > I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a > solution to this problem - but we found the throughput was abysmal - > probably due to the I/O performance of our virtual machine infrastructure. > > Instead we have hand-rolled something simplistic - which does the job 99% > well. > - We use a through() to re-partition the topic to just location-id > - We keep an object representing the current minute's aggregate in an > in-memory state store (with changelog) > - When any device result is transformed, and has a timestamp that is older > than our current window time - we output the aggregate, otherwise update > the running sum. > > What I have noticed is that when I do a rolling restart of the > application, such as to push new code, data is dropped because of messages > processed out of order. > I changed the code to include the equivalent of an extra minute's grace > time, but in production I see messages arriving that are > 2min behind what > the latest messages are. > > I came across the documentation > https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering > which alluded to maybe a solution. > Could anyone advise if there is a way in code/configuration properties that > I could better guarantee that streams prioritises the *oldest* messages > first, rather than caring about offset? > > Thanks in advance for any replies! > > Marcus >