Thanks for your reply Matthias, and really great talks :-)

You’re right that I only have one input topic - though it does have 20 
partitions.
The pointer to max.task.idle.ms cleared something up for me; I read the 
following line from Kafka docs but couldn’t find what configuration they were 
referring to.

>       Within a stream task that may be processing multiple topic-partitions, 
> if users configure the application to not wait for all partitions to contain 
> some buffered data and pick from the partition with the smallest timestamp to 
> process the next record, then later on when some records are fetched for 
> other topic-partitions, their timestamps may be smaller than those processed 
> records fetched from another topic-partition.
> 
        
When streams is checking the head record of each partition to pick the lowest 
timestamp - will it consider a timestamp in the body of the message, if we have 
implemented a custom TimeExtractor?
Or, which I feel is more likely - does TimeExtractor stream time only apply 
later on once deserialisation has happened?
The reason I ask is because our producer code doesn’t manually set the 
timestamp in ProducerRecord, only in the JSON body. That may be something we 
can look to change.

As you say, I fear adjusting grace time may be my only solution; however 
because this is a real-time monitoring application…the accuracy of the 
aggregates may have to come second to the throughput.

Many thanks,

Marcus


On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org> wrote: 
> In general, Kafka Streams tries to process messages in timestamp order,> 
> ie, oldest message first. However, Kafka Streams always need to process> 
> messages in offset order per partition, and thus, the timestamp> 
> synchronization applied to records from different topic (eg, if you join> 
> two topics).> 
> 
> There is config `max.task.idle.ms` to improve timestamp synchronization,> 
> but I am not sure if it would help in your case, as it seems you have a> 
> single input topic.> 
> 
> It seems, there is already out-of-order data in your input topic. Also> 
> note that your repartition step, may introduce out-or-order data.> 
> 
> As you are using a custom Processor, it is up to you to handle> 
> out-of-order data, and it seems that you may need to introduce a larger> 
> grace period. In general, it's very hard (too impossible) to know how> 
> much unorder is in a topic, due the decoupled nature of Kafka and> 
> interleaved writes of different producers into a topic.> 
> 
> Not sure if you could change the original partitioning to just use> 
> `location-id` to avoid the additional repartitioning step. This could> 
> help to reduce unorder.> 
> 
> For completeness, check out those Kafka Summit talks:> 
>  -> 
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/>
>  
>  -> 
> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/>
>  
> 
> Hope this helps.> 
> 
> -Matthias> 
> 
> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:> 
> > Hi All,> 
> > > 
> > Just to give a bit of context; I have an application which is SNMP polling> 
> > a network. Each collector agent works on a 1 minute schedule, polling> 
> > device(s) and posting results to a Kafka topic.> 
> > The time a given collector publishes data can vary within a minute, but it> 
> > should never overlap with the next minute time bucket.> 
> > > 
> > The topic produced to, for arguments sake 'device-results' has multiple> 
> > partitions. The data is keyed such as 'device-id|location-id'.> 
> > > 
> > I then had a requirement to aggregate the data by location; every device> 
> > result within the same location is summed, and an aggregate is output each> 
> > minute.> 
> > I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a> 
> > solution to this problem - but we found the throughput was abysmal -> 
> > probably due to the I/O performance of our virtual machine infrastructure.> 
> > > 
> > Instead we have hand-rolled something simplistic - which does the job 99%> 
> > well.> 
> >  - We use a through() to re-partition the topic to just location-id> 
> >  - We keep an object representing the current minute's aggregate in an> 
> > in-memory state store (with changelog)> 
> >  - When any device result is transformed, and has a timestamp that is 
> > older> 
> > than our current window time - we output the aggregate, otherwise update> 
> > the running sum.> 
> > > 
> >  What I have noticed is that when I do a rolling restart of the> 
> > application, such as to push new code, data is dropped because of messages> 
> > processed out of order.> 
> > I changed the code to include the equivalent of an extra minute's grace> 
> > time, but in production I see messages arriving that are > 2min behind 
> > what> 
> > the latest messages are.> 
> > > 
> > I came across the documentation> 
> > https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering>
> >  
> > which alluded to maybe a solution.> 
> > Could anyone advise if there is a way in code/configuration properties 
> > that> 
> > I could better guarantee that streams prioritises the *oldest* messages> 
> > first, rather than caring about offset?> 
> > > 
> > Thanks in advance for any replies!> 
> > > 
> > Marcus> 
> > > 
> 

Reply via email to