Yang Ling created KAFKA-14106:
---------------------------------

             Summary: Fetcher thread was shutdown and all fetched partitions 
are lost.
                 Key: KAFKA-14106
                 URL: https://issues.apache.org/jira/browse/KAFKA-14106
             Project: Kafka
          Issue Type: Bug
          Components: replication
    Affects Versions: 3.0.0, 2.2.2
            Reporter: Yang Ling


Dynamic changes of listeners will lead into out of sync. Our operation is as 
following:
 # Broker is started and listening on a ip-address.
 # Create some topics.
 # Change listening to a domain name via dynamic-configuration for some reason.
 # Create some new topics.
 # Produce message into any older topics.
 # All topics, produced in step 5, are out of sync.

Following is major logs:
{panel}
[2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added 
fetcher to broker BrokerEndPoint(id=2, host=168.1.3.88:9092) for partitions 
Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0), 
test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0)) 
(kafka.server.ReplicaFetcherManager)
[2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher 
to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions 
Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0), 
test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0)) 
(kafka.server.ReplicaFetcherManager)
{panel}

After read source code. We found following code in AbstractFetcherManager:
{code:scala}
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, 
InitialFetchState]) {
...
      for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
        val brokerIdAndFetcherId = 
BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
        val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
          case Some(currentFetcherThread) if currentFetcherThread.sourceBroker 
== brokerAndFetcherId.broker =>
            currentFetcherThread
          case Some(f) =>
            f.shutdown() // ----------------- marked
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
          case None =>
            addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
        }
        }
      }
...
}
{code}

As marked code defined, if sourceBroker is changed, in our case, the older 
fetcher thread will be shutdown and a new fetcher thread will be created using 
new sourceBroker. In this way, all of the fetched partitions in older fetcher 
thread will be lost.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to