> will it consider a timestamp in the body of the message, if we have > implemented a custom TimeExtractor?
Yes. > Or, which I feel is more likely - does TimeExtractor stream time only apply > later on once deserialisation has happened? Well, the extractor does apply after deserialization, but we deserialize each partition head-record to be able to apply the timestamp extractor: ie, deserialization happens when a record becomes the "head record". Cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java > the accuracy of the aggregates may have to come second to the throughput. Increasing the grace period should not really affect throughput, but latency. -Matthias On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote: > > Thanks for your reply Matthias, and really great talks :-) > > You’re right that I only have one input topic - though it does have 20 > partitions. > The pointer to max.task.idle.ms cleared something up for me; I read the > following line from Kafka docs but couldn’t find what configuration they were > referring to. > >> Within a stream task that may be processing multiple topic-partitions, >> if users configure the application to not wait for all partitions to contain >> some buffered data and pick from the partition with the smallest timestamp >> to process the next record, then later on when some records are fetched for >> other topic-partitions, their timestamps may be smaller than those processed >> records fetched from another topic-partition. >> > > When streams is checking the head record of each partition to pick the lowest > timestamp - will it consider a timestamp in the body of the message, if we > have implemented a custom TimeExtractor? > Or, which I feel is more likely - does TimeExtractor stream time only apply > later on once deserialisation has happened? > The reason I ask is because our producer code doesn’t manually set the > timestamp in ProducerRecord, only in the JSON body. That may be something we > can look to change. > > As you say, I fear adjusting grace time may be my only solution; however > because this is a real-time monitoring application…the accuracy of the > aggregates may have to come second to the throughput. > > Many thanks, > > Marcus > > > On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org> wrote: >> In general, Kafka Streams tries to process messages in timestamp order,> >> ie, oldest message first. However, Kafka Streams always need to process> >> messages in offset order per partition, and thus, the timestamp> >> synchronization applied to records from different topic (eg, if you join> >> two topics).> >> >> There is config `max.task.idle.ms` to improve timestamp synchronization,> >> but I am not sure if it would help in your case, as it seems you have a> >> single input topic.> >> >> It seems, there is already out-of-order data in your input topic. Also> >> note that your repartition step, may introduce out-or-order data.> >> >> As you are using a custom Processor, it is up to you to handle> >> out-of-order data, and it seems that you may need to introduce a larger> >> grace period. In general, it's very hard (too impossible) to know how> >> much unorder is in a topic, due the decoupled nature of Kafka and> >> interleaved writes of different producers into a topic.> >> >> Not sure if you could change the original partitioning to just use> >> `location-id` to avoid the additional repartitioning step. This could> >> help to reduce unorder.> >> >> For completeness, check out those Kafka Summit talks:> >> -> >> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/> >> >> -> >> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/> >> >> >> Hope this helps.> >> >> -Matthias> >> >> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:> >>> Hi All,> >>>> >>> Just to give a bit of context; I have an application which is SNMP polling> >>> a network. Each collector agent works on a 1 minute schedule, polling> >>> device(s) and posting results to a Kafka topic.> >>> The time a given collector publishes data can vary within a minute, but it> >>> should never overlap with the next minute time bucket.> >>>> >>> The topic produced to, for arguments sake 'device-results' has multiple> >>> partitions. The data is keyed such as 'device-id|location-id'.> >>>> >>> I then had a requirement to aggregate the data by location; every device> >>> result within the same location is summed, and an aggregate is output each> >>> minute.> >>> I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a> >>> solution to this problem - but we found the throughput was abysmal -> >>> probably due to the I/O performance of our virtual machine infrastructure.> >>>> >>> Instead we have hand-rolled something simplistic - which does the job 99%> >>> well.> >>> - We use a through() to re-partition the topic to just location-id> >>> - We keep an object representing the current minute's aggregate in an> >>> in-memory state store (with changelog)> >>> - When any device result is transformed, and has a timestamp that is >>> older> >>> than our current window time - we output the aggregate, otherwise update> >>> the running sum.> >>>> >>> What I have noticed is that when I do a rolling restart of the> >>> application, such as to push new code, data is dropped because of messages> >>> processed out of order.> >>> I changed the code to include the equivalent of an extra minute's grace> >>> time, but in production I see messages arriving that are > 2min behind >>> what> >>> the latest messages are.> >>>> >>> I came across the documentation> >>> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering> >>> >>> which alluded to maybe a solution.> >>> Could anyone advise if there is a way in code/configuration properties >>> that> >>> I could better guarantee that streams prioritises the *oldest* messages> >>> first, rather than caring about offset?> >>>> >>> Thanks in advance for any replies!> >>>> >>> Marcus> >>>>