[ https://issues.apache.org/jira/browse/KAFKA-13002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370618#comment-17370618 ]
Ismael Juma commented on KAFKA-13002: ------------------------------------- [~vvcephei] If you are using the admin client, it seems that this is a regression. I mentioned this ticket in the original PR. > dev branch Streams not able to fetch end offsets from pre-3.0 brokers > --------------------------------------------------------------------- > > Key: KAFKA-13002 > URL: https://issues.apache.org/jira/browse/KAFKA-13002 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: John Roesler > Priority: Blocker > Fix For: 3.0.0 > > Attachments: soaks.png > > > Note: this is not a report against a released version of AK. It seems to be a > problem on the trunk development branch only. > After deploying our soak test against `trunk/HEAD` on Friday, I noticed that > Streams is no longer processing: > !soaks.png! > I found this stacktrace in the logs during startup: > {code:java} > 5075 [2021-06-25T16:50:44-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The > listOffsets request failed. > (org.apache.kafka.streams.processor.internals.ClientUtils) > 5076 [2021-06-25T16:50:44-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnsupportedVersionException: The broker does > not support LIST_OFFSETS with version in range [7,7]. The supported > range is [0,6]. > 5077 at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > 5078 at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > 5079 at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > 5080 at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > 5081 at > org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147) > 5082 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643) > 5083 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579) > 5084 at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387) > 5085 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) > 5086 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > 5087 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) > 5088 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593) > 5089 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556) > 5090 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178) > 5091 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153) > 5092 at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > 5093 at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > 5094 at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > 5095 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > 5096 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > 5097 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > 5098 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > 5099 at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297) > 5100 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) > 5101 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > 5102 at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:932) > 5103 at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885) > 5104 at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) > 5105 at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) > 5106 at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) > {code} > Just eyeballing the recent commits, I'm guessing it was due to > [https://github.com/apache/kafka/commit/bd72ef1bf1e40feb3bc17349a385b479fa5fa530] > . It looks like that code sets the initial "minimum version" to 7, but then > should back off into compatibility mode. Therefore, maybe that stacktrace is > expected (though it's not great UX regardless). > However, it does not seem like Streams is actually able to back off. The next > thing I see is: > {code:java} > [2021-06-25T16:50:44-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > [2021-06-25 21:50:44,507] WARN [i-0691913411e8c77c3-StreamThread-1] Task 3_2 > had endOffsetSum=-3 smaller than offsetSum=0 on member > 24e46b47-0a01-4b57-9d15-771482869097. This probably means the task is > corrupted, which in turn indicates that it will need to restore from scratch > if it gets assigned. The assignor will de-prioritize returning this task to > this member in the hopes that some other member may be able to re-use its > state. (org.apache.kafka.streams.processor.internals.assignment.ClientState) > {code} > Which is itself a problem. It looks like there's a sentinel "-3" value > returned as the end offset, but since that value is lower than any real > endOffset Streams will have book-kept, Streams will assume that all its local > state is corrupt. The result is that Streams will delete all its local state > and rebuild from the changelog. This isn't an ideal behavior on restart. > Finally, I never actually see Streams able to proceed with processing. The > only thing it logs after this point (as far as I can tell) is: > {code:java} > [2021-06-25T16:50:54-05:00] > (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) > [2021-06-25 21:50:52,463] INFO [i-0691913411e8c77c3-StreamThread-1] > stream-thread [i-0691913411e8c77c3-StreamThread-1] End offset for changelog > stream-soak-test-trunk-ccloud-alos--KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0 > cannot be found; will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) {code} > So, it seems the version backoff simply isn't working. > Obviously, we'll need to fix these problems before we can release 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)