Hello Murilo, Could you elaborate a bit more on how you "trimmed" the topology? For example:
1) Did you change the code so that only 6 sub-topologies will be built, and their sub-topology ids stays the same? i.e. you just trimmed the last 3 sub-topologies with id 6,7,8? 2) Did you delete the local state dir for those sub-topologies? 3) Did you delete all the repartition/changelog topics for those sub-topologies? 4) For the remaining 6 sub-topologies, are their state store names and topic names remains all the same? --- I understand that they have static names, but do they have numerical suffices that get changed? On Tue, Jan 25, 2022 at 6:43 PM Murilo Tavares <murilo...@gmail.com> wrote: > Hi > I have a KafkaStreams application that is too heavyweight, with 9 > sub-topologies. > I am trying to disable some unneeded part of the topology that is > completely independent of the rest of the topology. Since my state stores > have fixed, predictable names, I compared the topologies and I believe it > should be safe to trim some sub-topologies. > After trimming the unused ones, it now has 6 sub-topologies. > Nevertheless, the application won't start. It seems to be trying to recover > previous tasks, that shouldn't exist anymore. > I have let the application down for 30 min so any timeouts, like session or > polling timeouts could expire, but still, when the application starts, it > reads the task states from somewhere and fails to recover it... > Here's the log (note the "unknown task 7_0", which makes sense since the > number of topologies felt from 9 to 6): > > 2022-01-26 02:28:17.552 [asdasdasd-StreamThread-1] INFO > > > org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor > - Decided on assignment: > {1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1, > 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([]) > prevActiveTasks: ([]) prevStandbyTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1, > 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1, 7_0, 7_1, 8_0, 8_1, 9_0, 9_1]) > changelogOffsetTotalsByTask: ([0_0=1244818, 0_1=625988, 1_0=15255, > 1_1=64645, 2_0=670938, 2_1=100636, 3_0=6379662, 3_1=5600072, 4_0=2362, > 4_1=15224, 5_0=19577, 5_1=113994, 6_0=7403980, 6_1=9195079, 7_0=226722, > 7_1=76623, 8_0=7334, 8_1=66344, 9_0=0, 9_1=39]) taskLagTotals: ([0_0=3, > 0_1=3, 1_0=1, 1_1=1, 2_0=1, 2_1=1, 3_0=7, 3_1=7, 4_0=1, 4_1=1, 5_0=1, > 5_1=1, 6_0=1, 6_1=1]) capacity: 1 assigned: 14]} with no followup probing > rebalance. > 2022-01-26 02:28:17.558 [asdasdasd-StreamThread-1] INFO > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - > stream-thread [asdasdasd-StreamThread-1-consumer] Assigned tasks [0_0, 0_1, > 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] including > stateful [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, > 6_1] to clients as: > 1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1, > 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])]. > 2022-01-26 02:28:17.566 [asdasdasd-StreamThread-1] INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - > [Consumer instanceId=asdasdasd-1, > clientId=asdasdasd-StreamThread-1-consumer, groupId=inventory-assembler-4] > Rebalance failed. > java.lang.IllegalStateException: Tried to lookup lag for unknown task 7_0 > at > > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318) > ~[app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$534.00000000F9BC5F20.applyAsLong(Unknown > Source) ~[?:?] > at java.util.Comparator.lambda$comparingLong$6043328a$1(Unknown Source) > ~[?:?] > at java.util.Comparator$$Lambda$535.00000000F9B1D820.compare(Unknown > Source) ~[?:?] > at java.util.Comparator.lambda$thenComparing$36697e65$1(Unknown Source) > ~[?:?] > at java.util.Comparator$$Lambda$399.00000000FEBE6620.compare(Unknown > Source) ~[?:?] > at java.util.TreeMap.put(Unknown Source) ~[?:?] > at java.util.TreeSet.add(Unknown Source) ~[?:?] > at java.util.AbstractCollection.addAll(Unknown Source) ~[?:?] > at java.util.TreeSet.addAll(Unknown Source) ~[?:?] > at > > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205) > ~[app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119) > ~[app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845) > ~[app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) > [app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > ~[app.jar:?] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296) > [app.jar:?] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) > [app.jar:?] > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > [app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925) > [app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885) > [app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) > [app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) > [app.jar:?] > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) > [app.jar:?] > > Any idea how could I overcome this? I even tried to change ConsumerConfig. > *GROUP_INSTANCE_ID_CONFIG *and StreamsConfig.*CLIENT_ID_CONFIG *but that > didn't seem to work... > > Thanks > Murilo > -- -- Guozhang