[ https://issues.apache.org/jira/browse/KAFKA-4951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
cuiyang updated KAFKA-4951: --------------------------- Description: I foud that KafkaProducer may send duplicated message sometimes, which is happend when: In Sender thread: NetworkClient::poll() -> this.selector.poll() //send the message, such as "abc", and send it to broker successfully -> handleTimedOutRequests(responses,updatedNow); //Judge whether the message "abc" which is sent above is expired or timeout, and the judge is based on the parameter this.requestTimeoutMs and updatedNow; -> response.request().callback().onComplete() -> completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If themessage was judged as expired, then it will be reenqueued and send repeatly next loop; -> this.accumulator.reenqueue(batch,now); The problem comes out: If the message "abc" is sent successfully to broker, but it may be judged to expired, so the message will be sent repeately next loop, which make the message duplicated. I can reproduce this scenario normally. In my opinion, I think Send::handleTimedOutRequests() is not much useful, because the response of sending request from broker is succesfully and has no error, which means brokers persist it successfully. And this function will induce to the duplicated message problems. was: I foud that KafkaProducer may send duplicated message sometimes, which is happend when: In Sender thread: NetworkClient::poll() -> this.selector.poll() //send the message, such as "abc", and send it to broker successfully -> handleTimedOutRequests(responses,updatedNow); //Judge whether the message "abc" which is sent above is expired or timeout, and the judge is based on the parameter this.requestTimeoutMs and updatedNow; -> response.request().callback().onComplete() -> completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If themessage was judged as expired, then it will be reenqueued and send repeatly next loop; -> this.accumulator.reenqueue(batch,now); The problem comes out: If the message "abc" is sent successfully to broker, but it may be judged to expired, so the message will be sent repeately next loop, which make the message duplicated. I can reproduce this scenario normally. In my opinion, I think Send::handleTimedOutRequests() is not much useful, because the response of sending request from broker is succesfully and has no error. And this function will induce to the duplicated message problems. > KafkaProducer may send duplicated message sometimes > --------------------------------------------------- > > Key: KAFKA-4951 > URL: https://issues.apache.org/jira/browse/KAFKA-4951 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.1 > Reporter: cuiyang > > I foud that KafkaProducer may send duplicated message sometimes, which is > happend when: > In Sender thread: > NetworkClient::poll() > -> this.selector.poll() //send the message, such as "abc", and > send it to broker successfully > -> handleTimedOutRequests(responses,updatedNow); //Judge whether > the message "abc" which is sent above is expired or timeout, and the judge > is based on the parameter this.requestTimeoutMs and updatedNow; > -> response.request().callback().onComplete() > -> > completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now); //If > themessage was judged as expired, then it will be reenqueued and send > repeatly next loop; > -> this.accumulator.reenqueue(batch,now); > The problem comes out: If the message "abc" is sent successfully to broker, > but it may be judged to expired, so the message will be sent repeately next > loop, which make the message duplicated. > I can reproduce this scenario normally. > In my opinion, I think Send::handleTimedOutRequests() is not much useful, > because the response of sending request from broker is succesfully and has no > error, which means brokers persist it successfully. And this function will > induce to the duplicated message problems. -- This message was sent by Atlassian JIRA (v6.3.15#6346)