[ https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14278895#comment-14278895 ]
Sriharsha Chintalapani edited comment on KAFKA-1461 at 1/15/15 4:24 PM: ------------------------------------------------------------------------ [~guozhang] I had the following code in my mind about backoff retries incase of any error. This code will be under ReplicaFetcherThread.handlePartitions. I am thinking off maintaining two maps in ReplicaFetcherThread private val partitionsWithErrorStandbyMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> timestamp one for offset and one for timestamp. remove the partitions from the AbstractFetcherThread.partitionsMap and add back to the map once the currentTime > partitionsWithErrorMap.timestamp + replicaFetcherRetryBackoffMs . I am not quite sure about maintaining these two maps . If its look ok to you , I'll send a patch or if you have any other approach please let me know. {code} def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { //add to the partitionsWithErrorMap with currentTime. for (partition <- partitions) { if(!partitionsWithErrorMap.contains(partition)) { partitionsWithErrorMap.put(partition, System.currentTimeMillis()) currentOffset(partition) match { case Some(offset: Long) => partitionsWithErrorStandbyMap.put(partition, offset) } } } removePartitions(partitions.toSet) val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long] // process partitionsWithErrorMap and add partitions back if the backoff time elapsed. partitionsWithErrorMap.foreach { case((topicAndPartition, timeMs)) => if(System.currentTimeMillis() > timeMs + brokerConfig.replicaFetcherRetryBackoffMs) { partitionsWithErrorStandbyMap.get(topicAndPartition) match { case Some(offset: Long) => partitionsToBeAdded.put(topicAndPartition, offset) } partitionsWithErrorStandbyMap.remove(topicAndPartition) } } addPartitions(partitionsToBeAdded) } {code} was (Author: sriharsha): [~guozhang] I had the following code in my mind about backoff retries incase of any error. This code will be under ReplicaFetcherThread.handlePartitions. I am thinking off maintaining two maps in ReplicaFetcherThread private val partitionsWithErrorStandbyMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> offset private val partitionsWithErrorMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition) -> timestamp one for offset and one for timestamp. remove the partitions from the AbstractFetcherThread.partitionsMap and add back to the map once the currentTime > partitionsWithErrorMap.timestamp + replicaFetcherRetryBackoffMs . I am not quite sure about maintaining these two maps . If its look ok to you , I'll send a patch or if you have any other approach please let me know. ```code def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { //add to the partitionsWithErrorMap with currentTime. for (partition <- partitions) { if(!partitionsWithErrorMap.contains(partition)) { partitionsWithErrorMap.put(partition, System.currentTimeMillis()) currentOffset(partition) match { case Some(offset: Long) => partitionsWithErrorStandbyMap.put(partition, offset) } } } removePartitions(partitions.toSet) val partitionsToBeAdded = new mutable.HashMap[TopicAndPartition, Long] // process partitionsWithErrorMap and add partitions back if the backoff time elapsed. partitionsWithErrorMap.foreach { case((topicAndPartition, timeMs)) => if(System.currentTimeMillis() > timeMs + brokerConfig.replicaFetcherRetryBackoffMs) { partitionsWithErrorStandbyMap.get(topicAndPartition) match { case Some(offset: Long) => partitionsToBeAdded.put(topicAndPartition, offset) } partitionsWithErrorStandbyMap.remove(topicAndPartition) } } addPartitions(partitionsToBeAdded) } ``` > Replica fetcher thread does not implement any back-off behavior > --------------------------------------------------------------- > > Key: KAFKA-1461 > URL: https://issues.apache.org/jira/browse/KAFKA-1461 > Project: Kafka > Issue Type: Improvement > Components: replication > Affects Versions: 0.8.1.1 > Reporter: Sam Meder > Assignee: Sriharsha Chintalapani > Labels: newbie++ > Fix For: 0.8.3 > > > The current replica fetcher thread will retry in a tight loop if any error > occurs during the fetch call. For example, we've seen cases where the fetch > continuously throws a connection refused exception leading to several replica > fetcher threads that spin in a pretty tight loop. > To a much lesser degree this is also an issue in the consumer fetcher thread, > although the fact that erroring partitions are removed so a leader can be > re-discovered helps some. -- This message was sent by Atlassian JIRA (v6.3.4#6332)