[ 
https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799859#comment-17799859
 ] 

Hugo Abreu edited comment on KAFKA-10875 at 12/22/23 3:21 PM:
--------------------------------------------------------------

We are seeing this behaviour with kafka-clients 3.6.1 and broker 2.8.1.

We can consistently reproduce the issue with the following snippet:
{code:java}
try (Consumer<?, ?> c = new KafkaConsumer<>(Map.of(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "",
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
))) {

    final Instant instant = Instant.now();
    final int times = 10;
    final List<Integer> minutes = List.of(
            1, 5, 10, 20, 25, 30, 60, 90
    );

    for (final Integer minute : minutes) {
        final Instant minus = instant.minus(minute, 
TimeUnit.MINUTES.toChronoUnit());
        // Retries, just for the sake of it being transitive.
        for (int i = 0; i < times; i++) {
            final List<PartitionInfo> partitionInfos = c.partitionsFor("topic");

            final Map<TopicPartition, OffsetAndTimestamp> offsetTimes = 
c.offsetsForTimes(
                    partitionInfos.stream()
                            .filter(info -> info.partition() == 0)
                            .map(info -> new TopicPartition(info.topic(), 
info.partition()))
                            .collect(Collectors.toMap(Function.identity(), v -> 
minus.toEpochMilli()))
            );
            LOGGER.info("Going back {}min -- got: {}", minute, offsetTimes);
            if (offsetTimes.values().stream().allMatch(Objects::nonNull)) {
                // We got a response for all the partitions.
                break;
            }
        }
        
LOGGER.info("======================================================================");
    }
} {code}
Essentially, we get a topic with ~2 days retention period and some of the 
partitions, during the day, receive different loads. The behaviour that we are 
seeing is:
 * The last message received in e.g., partition 2 has the timestamp of 
1703250071130 – Friday, 22 December 2023 13:01:11.130
 * We attempt to search, using the `offsetsForTimes` (using the code bellow)

The output is:
{code:java}
13:52:25.134 [main] INFO KafkaConsumerGroupManagerTest - Going back 1min -- 
got: {topic-2=null}
13:52:43.310 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:52:43.490 [main] INFO KafkaConsumerGroupManagerTest - Going back 5min -- 
got: {topic-2=null}
13:53:01.779 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:01.959 [main] INFO KafkaConsumerGroupManagerTest - Going back 10min -- 
got: {topic-2=null}
13:53:20.030 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:20.212 [main] INFO KafkaConsumerGroupManagerTest - Going back 20min -- 
got: {topic-2=null}
13:53:38.370 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:38.551 [main] INFO KafkaConsumerGroupManagerTest - Going back 25min -- 
got: {topic-2=null}
13:53:57.316 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:57.495 [main] INFO KafkaConsumerGroupManagerTest - Going back 30min -- 
got: {topic-2=null}
13:54:15.432 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - Going back 60min -- 
got: {topic-2=(timestamp=1703250071130, leaderEpoch=12, offset=178105)}
13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:54:15.794 [main] INFO KafkaConsumerGroupManagerTest - Going back 90min -- 
got: {topic-2=(timestamp=1703248360581, leaderEpoch=12, offset=178095)}
13:54:15.795 [main] INFO KafkaConsumerGroupManagerTest - 
====================================================================== {code}
According to the documentation, all of these searchs should have a return, 
since we should obtain the earliest offset, greater than, or equal to the 
timestamp provided. This almost seems that the its the other way around. 
We'll try to get some time to look under the hood and see if we can help.

 

 


was (Author: JIRAUSER303606):
We are seeing this behaviour with kafka-clients 3.6.1 and broker 2.8.1.

We can consistently reproduce the issue with the following snippet:
{code:java}
try (Consumer<?, ?> c = new KafkaConsumer<>(Map.of(
        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "",
        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
))) {

    final Instant instant = Instant.now();
    final int times = 10;
    final List<Integer> minutes = List.of(
            1, 5, 10, 20, 25, 30, 60, 90
    );

    for (final Integer minute : minutes) {
        final Instant minus = instant.minus(minute, 
TimeUnit.MINUTES.toChronoUnit());
        // Retries, just for the sake of it being transitive.
        for (int i = 0; i < times; i++) {
            final List<PartitionInfo> partitionInfos = c.partitionsFor("topic");

            final Map<TopicPartition, OffsetAndTimestamp> offsetTimes = 
c.offsetsForTimes(
                    partitionInfos.stream()
                            .filter(info -> info.partition() == 0)
                            .map(info -> new TopicPartition(info.topic(), 
info.partition()))
                            .collect(Collectors.toMap(Function.identity(), v -> 
minus.toEpochMilli()))
            );
            LOGGER.info("Going back {}min -- got: {}", minute, offsetTimes);
            if (offsetTimes.values().stream().allMatch(Objects::nonNull)) {
                // We got a response for all the partitions.
                break;
            }
        }
        
LOGGER.info("======================================================================");
    }
} {code}

Essentially, we get a topic with ~2 days retention period and some of the 
partitions, during the day, some of these paritions receive different loads. 
The behaviour that we are seeing is:
 * The last message received in e.g., partition 2 has the timestamp of 
1703250071130 – Friday, 22 December 2023 13:01:11.130
 * We attempt to search, using the `offsetsForTimes` (using the code bellow)

The output is:
{code:java}
13:52:25.134 [main] INFO KafkaConsumerGroupManagerTest - Going back 1min -- 
got: {topic-2=null}
13:52:43.310 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:52:43.490 [main] INFO KafkaConsumerGroupManagerTest - Going back 5min -- 
got: {topic-2=null}
13:53:01.779 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:01.959 [main] INFO KafkaConsumerGroupManagerTest - Going back 10min -- 
got: {topic-2=null}
13:53:20.030 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:20.212 [main] INFO KafkaConsumerGroupManagerTest - Going back 20min -- 
got: {topic-2=null}
13:53:38.370 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:38.551 [main] INFO KafkaConsumerGroupManagerTest - Going back 25min -- 
got: {topic-2=null}
13:53:57.316 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:53:57.495 [main] INFO KafkaConsumerGroupManagerTest - Going back 30min -- 
got: {topic-2=null}
13:54:15.432 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - Going back 60min -- 
got: {topic-2=(timestamp=1703250071130, leaderEpoch=12, offset=178105)}
13:54:15.617 [main] INFO KafkaConsumerGroupManagerTest - 
======================================================================
13:54:15.794 [main] INFO KafkaConsumerGroupManagerTest - Going back 90min -- 
got: {topic-2=(timestamp=1703248360581, leaderEpoch=12, offset=178095)}
13:54:15.795 [main] INFO KafkaConsumerGroupManagerTest - 
====================================================================== {code}
According to the documentation, all of these searchs should have a return, 
since we should obtain the earliest offset, greater than, or equal to the 
timestamp provided. This almost seems that the its the other way around. 
We'll try to get some time to look under the hood and see if we can help.

 

 

> offsetsForTimes returns null for some partitions when it shouldn't?
> -------------------------------------------------------------------
>
>                 Key: KAFKA-10875
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10875
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Yifei Gong
>            Priority: Minor
>
> I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
> I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am 
> trying to seek to offsets after certain timestamp inside 
> {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
> I found this strange behavior of method {{offsetsForTimes}}:
> When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020 
> 5:06:55.534 AM) like below:
> {code:java}
> @Override
> public void onPartitionsAssigned(Consumer<?, ?> consumer, 
> Collection<TopicPartition> partitions) {
>     // calling assignment just to ensure my consumer is actually assigned the 
> partitions
>     Set<TopicPartition> tps = consumer.assignment();
>     Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
>     offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
>         .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L))));
> }
> {code}
> By setting breakpoint, I can see I got below map:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
> {TopicPartition@5498} "My.Data.Topic-5" -> null
> {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501} 
> "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
> {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504} 
> "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)" 
> {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507} 
> "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)" 
> {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510} 
> "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)" 
> {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513} 
> "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)" 
> {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516} 
> "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)" 
> {TopicPartition@5518} "My.Data.Topic-6" -> null
> {noformat}
> As you can see some of the partitions (5 and 6) it returned null.
> However, if I seek a more recent timestamp like {{1607941818423L}} (GMT 
> December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493} 
> "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496} 
> "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
> {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499} 
> "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
> {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502} 
> "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
> {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505} 
> "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
> {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508} 
> "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
> {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511} 
> "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
> {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514} 
> "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
> {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517} 
> "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
> {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520} 
> "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"
> {noformat}
> So I am confused why seeking to an older timestamp gave me nulls when there 
> are indeed messages with later timestamp as I tried the 2nd time? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to