Thanks for your reply Matthias, and really great talks :-)
You’re right that I only have one input topic - though it does have 20 partitions. The pointer to max.task.idle.ms cleared something up for me; I read the following line from Kafka docs but couldn’t find what configuration they were referring to. > Within a stream task that may be processing multiple topic-partitions, > if users configure the application to not wait for all partitions to contain > some buffered data and pick from the partition with the smallest timestamp to > process the next record, then later on when some records are fetched for > other topic-partitions, their timestamps may be smaller than those processed > records fetched from another topic-partition. > When streams is checking the head record of each partition to pick the lowest timestamp - will it consider a timestamp in the body of the message, if we have implemented a custom TimeExtractor? Or, which I feel is more likely - does TimeExtractor stream time only apply later on once deserialisation has happened? The reason I ask is because our producer code doesn’t manually set the timestamp in ProducerRecord, only in the JSON body. That may be something we can look to change. As you say, I fear adjusting grace time may be my only solution; however because this is a real-time monitoring application…the accuracy of the aggregates may have to come second to the throughput. Many thanks, Marcus On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org> wrote: > In general, Kafka Streams tries to process messages in timestamp order,> > ie, oldest message first. However, Kafka Streams always need to process> > messages in offset order per partition, and thus, the timestamp> > synchronization applied to records from different topic (eg, if you join> > two topics).> > > There is config `max.task.idle.ms` to improve timestamp synchronization,> > but I am not sure if it would help in your case, as it seems you have a> > single input topic.> > > It seems, there is already out-of-order data in your input topic. Also> > note that your repartition step, may introduce out-or-order data.> > > As you are using a custom Processor, it is up to you to handle> > out-of-order data, and it seems that you may need to introduce a larger> > grace period. In general, it's very hard (too impossible) to know how> > much unorder is in a topic, due the decoupled nature of Kafka and> > interleaved writes of different producers into a topic.> > > Not sure if you could change the original partitioning to just use> > `location-id` to avoid the additional repartitioning step. This could> > help to reduce unorder.> > > For completeness, check out those Kafka Summit talks:> > -> > https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/> > > -> > https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/> > > > Hope this helps.> > > -Matthias> > > On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:> > > Hi All,> > > > > > Just to give a bit of context; I have an application which is SNMP polling> > > a network. Each collector agent works on a 1 minute schedule, polling> > > device(s) and posting results to a Kafka topic.> > > The time a given collector publishes data can vary within a minute, but it> > > should never overlap with the next minute time bucket.> > > > > > The topic produced to, for arguments sake 'device-results' has multiple> > > partitions. The data is keyed such as 'device-id|location-id'.> > > > > > I then had a requirement to aggregate the data by location; every device> > > result within the same location is summed, and an aggregate is output each> > > minute.> > > I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a> > > solution to this problem - but we found the throughput was abysmal -> > > probably due to the I/O performance of our virtual machine infrastructure.> > > > > > Instead we have hand-rolled something simplistic - which does the job 99%> > > well.> > > - We use a through() to re-partition the topic to just location-id> > > - We keep an object representing the current minute's aggregate in an> > > in-memory state store (with changelog)> > > - When any device result is transformed, and has a timestamp that is > > older> > > than our current window time - we output the aggregate, otherwise update> > > the running sum.> > > > > > What I have noticed is that when I do a rolling restart of the> > > application, such as to push new code, data is dropped because of messages> > > processed out of order.> > > I changed the code to include the equivalent of an extra minute's grace> > > time, but in production I see messages arriving that are > 2min behind > > what> > > the latest messages are.> > > > > > I came across the documentation> > > https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering> > > > > which alluded to maybe a solution.> > > Could anyone advise if there is a way in code/configuration properties > > that> > > I could better guarantee that streams prioritises the *oldest* messages> > > first, rather than caring about offset?> > > > > > Thanks in advance for any replies!> > > > > > Marcus> > > > >