[
https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142361#comment-17142361
]
Sophie Blee-Goldman commented on KAFKA-10191:
---------------------------------------------
Awesome, thanks Jun.
I think the hypothesis above fits. We actually are "losing" a task in this
system test – we start up with no optimizations, then restart with
optimizations on. The optimization does some shuffling of repartition topics to
reduce the total number of repartitions and we end up with fewer tasks. So the
ClientState will have tasks in its previousTasksForConsumer map that aren't in
taskLagTotals. We need to filter these tasks out.
Did you want to submit a fix for this [~chia7712]?
> fix flaky StreamsOptimizedTest
> ------------------------------
>
> Key: KAFKA-10191
> URL: https://issues.apache.org/jira/browse/KAFKA-10191
> Project: Kafka
> Issue Type: Test
> Components: streams, unit tests
> Reporter: Chia-Ping Tsai
> Assignee: Chia-Ping Tsai
> Priority: Blocker
> Fix For: 2.6.0
>
>
> {quote}Exception in thread
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1"
> java.lang.IllegalStateException: Tried to lookup lag for unknown task
> 2_0Exception in thread
> "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1"
> java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at
> org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
> at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
> at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216)
> at java.util.TreeMap.put(TreeMap.java:552) at
> java.util.TreeSet.add(TreeSet.java:255) at
> java.util.AbstractCollection.addAll(AbstractCollection.java:344) at
> java.util.TreeSet.addAll(TreeSet.java:312) at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250)
> at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164)
> at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920)
> at
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:508)
> {quote}
>
> this issue may be related to
> [https://github.com/apache/kafka/commit/0f68dc7a640b26a8edea154ea4ea2b6d93b5104b]
> since the test passes If the commit is reverted
--
This message was sent by Atlassian Jira
(v8.3.4#803005)