Yang Ling created KAFKA-14106: --------------------------------- Summary: Fetcher thread was shutdown and all fetched partitions are lost. Key: KAFKA-14106 URL: https://issues.apache.org/jira/browse/KAFKA-14106 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 3.0.0, 2.2.2 Reporter: Yang Ling
Dynamic changes of listeners will lead into out of sync. Our operation is as following: # Broker is started and listening on a ip-address. # Create some topics. # Change listening to a domain name via dynamic-configuration for some reason. # Create some new topics. # Produce message into any older topics. # All topics, produced in step 5, are out of sync. Following is major logs: {panel} [2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added fetcher to broker BrokerEndPoint(id=2, host=168.1.3.88:9092) for partitions Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0), test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager) [2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread) [2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0), test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager) {panel} After read source code. We found following code in AbstractFetcherManager: {code:scala} def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, InitialFetchState]) { ... for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) { val brokerIdAndFetcherId = BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId) val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match { case Some(currentFetcherThread) if currentFetcherThread.sourceBroker == brokerAndFetcherId.broker => currentFetcherThread case Some(f) => f.shutdown() // ----------------- marked addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) case None => addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId) } } } ... } {code} As marked code defined, if sourceBroker is changed, in our case, the older fetcher thread will be shutdown and a new fetcher thread will be created using new sourceBroker. In this way, all of the fetched partitions in older fetcher thread will be lost. -- This message was sent by Atlassian Jira (v8.20.10#820010)