> will it consider a timestamp in the body of the message, if we have 
> implemented a custom TimeExtractor?

Yes.


> Or, which I feel is more likely - does TimeExtractor stream time only apply 
> later on once deserialisation has happened?

Well, the extractor does apply after deserialization, but we deserialize
each partition head-record to be able to apply the timestamp extractor:
ie, deserialization happens when a record becomes the "head record".

Cf
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java


> the accuracy of the aggregates may have to come second to the throughput.

Increasing the grace period should not really affect throughput, but
latency.



-Matthias


On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote:
> 
> Thanks for your reply Matthias, and really great talks :-)
> 
> You’re right that I only have one input topic - though it does have 20 
> partitions.
> The pointer to max.task.idle.ms cleared something up for me; I read the 
> following line from Kafka docs but couldn’t find what configuration they were 
> referring to.
> 
>>      Within a stream task that may be processing multiple topic-partitions, 
>> if users configure the application to not wait for all partitions to contain 
>> some buffered data and pick from the partition with the smallest timestamp 
>> to process the next record, then later on when some records are fetched for 
>> other topic-partitions, their timestamps may be smaller than those processed 
>> records fetched from another topic-partition.
>>
>       
> When streams is checking the head record of each partition to pick the lowest 
> timestamp - will it consider a timestamp in the body of the message, if we 
> have implemented a custom TimeExtractor?
> Or, which I feel is more likely - does TimeExtractor stream time only apply 
> later on once deserialisation has happened?
> The reason I ask is because our producer code doesn’t manually set the 
> timestamp in ProducerRecord, only in the JSON body. That may be something we 
> can look to change.
> 
> As you say, I fear adjusting grace time may be my only solution; however 
> because this is a real-time monitoring application…the accuracy of the 
> aggregates may have to come second to the throughput.
> 
> Many thanks,
> 
> Marcus
> 
> 
> On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org> wrote: 
>> In general, Kafka Streams tries to process messages in timestamp order,> 
>> ie, oldest message first. However, Kafka Streams always need to process> 
>> messages in offset order per partition, and thus, the timestamp> 
>> synchronization applied to records from different topic (eg, if you join> 
>> two topics).> 
>>
>> There is config `max.task.idle.ms` to improve timestamp synchronization,> 
>> but I am not sure if it would help in your case, as it seems you have a> 
>> single input topic.> 
>>
>> It seems, there is already out-of-order data in your input topic. Also> 
>> note that your repartition step, may introduce out-or-order data.> 
>>
>> As you are using a custom Processor, it is up to you to handle> 
>> out-of-order data, and it seems that you may need to introduce a larger> 
>> grace period. In general, it's very hard (too impossible) to know how> 
>> much unorder is in a topic, due the decoupled nature of Kafka and> 
>> interleaved writes of different producers into a topic.> 
>>
>> Not sure if you could change the original partitioning to just use> 
>> `location-id` to avoid the additional repartitioning step. This could> 
>> help to reduce unorder.> 
>>
>> For completeness, check out those Kafka Summit talks:> 
>>  -> 
>> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/>
>>  
>>  -> 
>> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/>
>>  
>>
>> Hope this helps.> 
>>
>> -Matthias> 
>>
>> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:> 
>>> Hi All,> 
>>>>
>>> Just to give a bit of context; I have an application which is SNMP polling> 
>>> a network. Each collector agent works on a 1 minute schedule, polling> 
>>> device(s) and posting results to a Kafka topic.> 
>>> The time a given collector publishes data can vary within a minute, but it> 
>>> should never overlap with the next minute time bucket.> 
>>>>
>>> The topic produced to, for arguments sake 'device-results' has multiple> 
>>> partitions. The data is keyed such as 'device-id|location-id'.> 
>>>>
>>> I then had a requirement to aggregate the data by location; every device> 
>>> result within the same location is summed, and an aggregate is output each> 
>>> minute.> 
>>> I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a> 
>>> solution to this problem - but we found the throughput was abysmal -> 
>>> probably due to the I/O performance of our virtual machine infrastructure.> 
>>>>
>>> Instead we have hand-rolled something simplistic - which does the job 99%> 
>>> well.> 
>>>  - We use a through() to re-partition the topic to just location-id> 
>>>  - We keep an object representing the current minute's aggregate in an> 
>>> in-memory state store (with changelog)> 
>>>  - When any device result is transformed, and has a timestamp that is 
>>> older> 
>>> than our current window time - we output the aggregate, otherwise update> 
>>> the running sum.> 
>>>>
>>>  What I have noticed is that when I do a rolling restart of the> 
>>> application, such as to push new code, data is dropped because of messages> 
>>> processed out of order.> 
>>> I changed the code to include the equivalent of an extra minute's grace> 
>>> time, but in production I see messages arriving that are > 2min behind 
>>> what> 
>>> the latest messages are.> 
>>>>
>>> I came across the documentation> 
>>> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering>
>>>  
>>> which alluded to maybe a solution.> 
>>> Could anyone advise if there is a way in code/configuration properties 
>>> that> 
>>> I could better guarantee that streams prioritises the *oldest* messages> 
>>> first, rather than caring about offset?> 
>>>>
>>> Thanks in advance for any replies!> 
>>>>
>>> Marcus> 
>>>>

Reply via email to