Thanks Matthias - that's great to know.

> Increasing the grace period should not really affect throughput, but
> latency.

Yes, a slip of the tongue on my part, you’re right :-)

One last question if I may? I only see issues of out of order data in my 
re-partitioned topic as a result of a rebalance happening.
My hypothesis is that when an instance of my streams app dies - the consumption 
of data from the partitions it was responsible for falls behind compared to 
others.
I believe all stream threads across all app instances will pause consuming 
whilst the rebalance is worked through.. but am I right in thinking that one 
streams app (or at least some of its stream threads) will have to wait for 
state to be synced from the changelog topic?
In other words - when a rebalance happens - I assume the consumer group doesn’t 
wait for the slowest member to be ready to consume?

To illustrate with an example:
        If I have 3 partitions of a single topic and three streams app 
instances (1 partition each)
        I have a producer that produces to each partition each minute on the 
minute
        Normally the timestamp of the head record is roughly the same across 
all three partitions. This assumes no lag ever builds up on the consumer group, 
and also assumes data volume and size of messages is comparable.

        Now I kill streams app A. The rebalance protocol kicks in and gives 
instance B an extra partition to consume from.
        Could there now be a bigger lag for one or both of the partitions app B 
is consuming from because it had to sync state store state? (Assume B has 
enough stream processing threads idle and the machine is specced to cope with 
the extra load)
       …whereas app C, unhindered by state syncing, has potentially now 
produced to the through topic a record from a newer batch/time window.

If this is the case, do you think increasing standby replicas will lessen the 
issue?  I obviously don’t expect it to be a magic bullet, and grace period is 
still required in general


Best Regards,

Marcus




On Thu, Mar 11, 2021 at 1:40 AM Matthias J. Sax <mj...@apache.org 
<mailto:mj...@apache.org>> wrote:
> will it consider a timestamp in the body of the message, if we have 
> implemented a custom TimeExtractor?

Yes.


> Or, which I feel is more likely - does TimeExtractor stream time only apply 
> later on once deserialisation has happened?

Well, the extractor does apply after deserialization, but we deserialize
each partition head-record to be able to apply the timestamp extractor:
ie, deserialization happens when a record becomes the "head record".

Cf
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 
<https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java>


> the accuracy of the aggregates may have to come second to the throughput.

Increasing the grace period should not really affect throughput, but
latency.



-Matthias


On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote:
> 
> Thanks for your reply Matthias, and really great talks :-)
> 
> You’re right that I only have one input topic - though it does have 20 
> partitions.
> The pointer to max.task.idle.ms <http://max.task.idle.ms/> cleared something 
> up for me; I read the following line from Kafka docs but couldn’t find what 
> configuration they were referring to.
> 
>>      Within a stream task that may be processing multiple topic-partitions, 
>> if users configure the application to not wait for all partitions to contain 
>> some buffered data and pick from the partition with the smallest timestamp 
>> to process the next record, then later on when some records are fetched for 
>> other topic-partitions, their timestamps may be smaller than those processed 
>> records fetched from another topic-partition.
>>
>       
> When streams is checking the head record of each partition to pick the lowest 
> timestamp - will it consider a timestamp in the body of the message, if we 
> have implemented a custom TimeExtractor?
> Or, which I feel is more likely - does TimeExtractor stream time only apply 
> later on once deserialisation has happened?
> The reason I ask is because our producer code doesn’t manually set the 
> timestamp in ProducerRecord, only in the JSON body. That may be something we 
> can look to change.
> 
> As you say, I fear adjusting grace time may be my only solution; however 
> because this is a real-time monitoring application…the accuracy of the 
> aggregates may have to come second to the throughput.
> 
> Many thanks,
> 
> Marcus
> 
> 
> On 2021/03/09 08:21:22, "Matthias J. Sax" <m...@apache.org 
> <mailto:m...@apache.org>> wrote: 
>> In general, Kafka Streams tries to process messages in timestamp order,> 
>> ie, oldest message first. However, Kafka Streams always need to process> 
>> messages in offset order per partition, and thus, the timestamp> 
>> synchronization applied to records from different topic (eg, if you join> 
>> two topics).> 
>>
>> There is config `max.task.idle.ms <http://max.task.idle.ms/>` to improve 
>> timestamp synchronization,> 
>> but I am not sure if it would help in your case, as it seems you have a> 
>> single input topic.> 
>>
>> It seems, there is already out-of-order data in your input topic. Also> 
>> note that your repartition step, may introduce out-or-order data.> 
>>
>> As you are using a custom Processor, it is up to you to handle> 
>> out-of-order data, and it seems that you may need to introduce a larger> 
>> grace period. In general, it's very hard (too impossible) to know how> 
>> much unorder is in a topic, due the decoupled nature of Kafka and> 
>> interleaved writes of different producers into a topic.> 
>>
>> Not sure if you could change the original partitioning to just use> 
>> `location-id` to avoid the additional repartitioning step. This could> 
>> help to reduce unorder.> 
>>
>> For completeness, check out those Kafka Summit talks:> 
>>  -> 
>> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
>>  
>> <https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/>>
>>  
>>  -> 
>> https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/
>>  
>> <https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/>>
>>  
>>
>> Hope this helps.> 
>>
>> -Matthias> 
>>
>> On 3/3/21 7:03 AM, Marcus Horsley-Rai wrote:> 
>>> Hi All,> 
>>>>
>>> Just to give a bit of context; I have an application which is SNMP polling> 
>>> a network. Each collector agent works on a 1 minute schedule, polling> 
>>> device(s) and posting results to a Kafka topic.> 
>>> The time a given collector publishes data can vary within a minute, but it> 
>>> should never overlap with the next minute time bucket.> 
>>>>
>>> The topic produced to, for arguments sake 'device-results' has multiple> 
>>> partitions. The data is keyed such as 'device-id|location-id'.> 
>>>>
>>> I then had a requirement to aggregate the data by location; every device> 
>>> result within the same location is summed, and an aggregate is output each> 
>>> minute.> 
>>> I'm aware the streams DSL has groupByKey/WindowedBy/Suppress which is a> 
>>> solution to this problem - but we found the throughput was abysmal -> 
>>> probably due to the I/O performance of our virtual machine infrastructure.> 
>>>>
>>> Instead we have hand-rolled something simplistic - which does the job 99%> 
>>> well.> 
>>>  - We use a through() to re-partition the topic to just location-id> 
>>>  - We keep an object representing the current minute's aggregate in an> 
>>> in-memory state store (with changelog)> 
>>>  - When any device result is transformed, and has a timestamp that is 
>>> older> 
>>> than our current window time - we output the aggregate, otherwise update> 
>>> the running sum.> 
>>>>
>>>  What I have noticed is that when I do a rolling restart of the> 
>>> application, such as to push new code, data is dropped because of messages> 
>>> processed out of order.> 
>>> I changed the code to include the equivalent of an extra minute's grace> 
>>> time, but in production I see messages arriving that are > 2min behind 
>>> what> 
>>> the latest messages are.> 
>>>>
>>> I came across the documentation> 
>>> https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering
>>>  
>>> <https://kafka.apache.org/23/documentation/streams/core-concepts#streams_out_of_ordering>>
>>>  
>>> which alluded to maybe a solution.> 
>>> Could anyone advise if there is a way in code/configuration properties 
>>> that> 
>>> I could better guarantee that streams prioritises the *oldest* messages> 
>>> first, rather than caring about offset?> 
>>>>
>>> Thanks in advance for any replies!> 
>>>>
>>> Marcus> 
>>>>

Reply via email to