[ https://issues.apache.org/jira/browse/KAFKA-4951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
cuiyang updated KAFKA-4951: --------------------------- Description: I foud that KafkaProducer may send duplicated message sometimes, which is happend when: In Sender thread: NetworkClient::poll() -> this.selector.poll() //send the message, such as "abc", and send it to broker successfully -> handleTimedOutRequests(responses,updatedNow); //Judge whether the message "abc" which is sent above is expired or timeout, and the judge is based on the parameter this.requestTimeoutMs and updatedNow; -> response.request().callback().onComplete() -> completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If themessage was judged as expired, then it will be reenqueued and send repeatly next loop; -> this.accumulator.reenqueue(batch,now); The problem comes out: If the message "abc" is sent successfully to broker, but it may be judged to expired, so the message will be sent repeately next loop, which make the message duplicated. I can reproduce this scenario normally. In my opinion, I think Send::handleTimedOutRequests() is not much useful, because the response of sending request from broker is succesfully and has no error. And this function will induce to the duplicated message problems. was: I foud that KafkaProducer may send duplicated message sometimes, which is happend when: In Sender thread: NetworkClient::poll() -> this.selector.poll() //write the message, such as "abc"; -> handleTimedOutRequests(responses,updatedNow); //Judge whether the message "abc" which is sent above is expired or timeout, and the judge is based on the parameter this.requestTimeoutMs and updatedNow; -> response.request().callback().onComplete() -> completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If themessage was judged as expired, then it will be reenqueued and send repeatly next loop; -> this.accumulator.reenqueue(batch,now); The problem comes out: If the message "abc" is sent successfully to broker, but it may be judged to expired, so the message will be sent repeately next loop, which make the message duplicated. I can reproduce this scenario normally. In my opinion, I think Send::handleTimedOutRequests() is not much useful, because the response of sending request from broker is succesfully and has no error. And this function will induce to the duplicated message problems. > KafkaProducer may send duplicated message sometimes > --------------------------------------------------- > > Key: KAFKA-4951 > URL: https://issues.apache.org/jira/browse/KAFKA-4951 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.1 > Reporter: cuiyang > > I foud that KafkaProducer may send duplicated message sometimes, which is > happend when: > In Sender thread: > NetworkClient::poll() > -> this.selector.poll() //send the message, such as "abc", and > send it to broker successfully > -> handleTimedOutRequests(responses,updatedNow); //Judge whether > the message "abc" which is sent above is expired or timeout, and the judge > is based on the parameter this.requestTimeoutMs and updatedNow; > -> response.request().callback().onComplete() > -> > completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If > themessage was judged as expired, then it will be reenqueued and send > repeatly next loop; > -> this.accumulator.reenqueue(batch,now); > The problem comes out: If the message "abc" is sent successfully to broker, > but it may be judged to expired, so the message will be sent repeately next > loop, which make the message duplicated. > I can reproduce this scenario normally. > In my opinion, I think Send::handleTimedOutRequests() is not much useful, > because the response of sending request from broker is succesfully and has no > error. And this function will induce to the duplicated message problems. -- This message was sent by Atlassian JIRA (v6.3.15#6346)