By default, timestamps are set by the producer.

Thus, if different producers write to the same topic-partition, their
writes are interleaved and even if each individual client sends data
ordered, they might end up out-of-order in the topic.

As you are only interested in per-key ordering, it might be sufficient
to ensure that data for a specific key is only written by a single
producer? -- You might still have unorder across different keys, but a
per-key order should actually be sufficient.

Or, if you don't care about the race condition between two producers
updating the same key, you could also reconfigure the topic and let the
broker set the timestamp.

Another alternative is, to use a different timestamp extractor and
change the timestamps on read.

Shameless plug: I did a talk about this topic that might be helpful ->
https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/


-Matthias

On 7/10/20 7:00 AM, Richard Rossel wrote:
> Thanks Matthias, it makes sense, now I need to find out why the
> topic is not sorted by timestamp.
> 
> The topic I'm using to be loaded as globalKTable is partitioned by key,
> and the timestamp is being handled by kafka.
> I have multiple clients, different machines, pushing data to that topic,
>  maybe the problem is a time synchronization issue between machines.
> 
> Do you know if the timestamp associated with each record on topic is
> being set up by clients pushing the data,  or by the kafka broker leader of
> that partition?
> 
> 
> Thanks.-
> 
> 
> 
> On Thu, Jul 9, 2020 at 4:45 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> If you load data into a KTable or GlobalKTable, it's expected that the
>> data is partitioned by key, and that records with the same key have
>> non-descending timestamps.
>>
>> If a record with let's say key A and timestamp 5 is put into the table,
>> and later a record with key A and timestamp 4 is put into the table, the
>> second record is an out-of-order record and a warning is logged for this
>> case, as it might indicate that your data is no ordered correctly what
>> might lead to incorrect/unexpected join results.
>>
>> If data is out-of-order your table content semantically goes (partially)
>> back in time what is usually undesired -- your table content should
>> usually go forward in time only.
>>
>> Does this help?
>>
>>
>> -Matthias
>>
>>
>>
>> On 7/8/20 10:54 AM, Richard Rossel wrote:
>>> Hi there,
>>>
>>> I'm getting lot of this type of warning:
>>>
>>>
>>>  WARN org.apache.kafka.streams.kstream.internals.KTableSource - Detected
>>> out-of-order KTable update for entity-STATE-STORE-0000000000 at offset
>>> 65806, partition 5.
>>>
>>> It looks like the warning is generated each time a new record goes into
>> the
>>> source topic which is being used to load a globalKTable, and only happens
>>> when the source topic is updated very frequently (each second).
>>>
>>> I would like to know what is the meaning of that warning, maybe I'm not
>>> loading the Ktable correctly.
>>>
>>> My process is basically a simple enrichment process (Using Kafka 2.4):
>>> ...
>>>
>>>  GlobalKTable<String, String> entity = builder.globalTable(entityTopic);
>>>
>>>  KStream<String, String> joined = dataStream.join(entity,
>>>
>>>                 (leftKey, leftValue) -> leftKey,
>>>
>>>                 joiner);
>>>
>>>  joined.to(enrichedTopic);
>>> ...
>>>
>>> The entityTopic and the topic used by dataStream, are sharing the same
>> key
>>> type (username),
>>> and the entityTopic is receiving entries from another process very
>>> frequently.
>>>
>>> Do you think those warning messages are because of the way I'm creating
>> the
>>> Ktable?
>>>
>>> Thanks.-
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to