[ 
https://issues.apache.org/jira/browse/KAFKA-4951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

cuiyang updated KAFKA-4951:
---------------------------
    Description: 
I foud that KafkaProducer may send duplicated message sometimes, which is 
happend when:
     In Sender thread:
     NetworkClient::poll()
         -> this.selector.poll()    //send the message, such as "abc", and send 
it to broker successfully
         -> handleTimedOutRequests(responses,updatedNow);  //Judge whether the 
message  "abc" which is sent above is expired or timeout,  and the judge is  
based on the parameter  this.requestTimeoutMs and updatedNow;  
         -> response.request().callback().onComplete()
             -> 
completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now);   //If 
themessage was judged as expired, then it will be reenqueued and send repeatly 
next loop;
                 -> this.accumulator.reenqueue(batch,now);
The problem comes out:  If the message "abc" is sent successfully to broker, 
but it may be judged to expired, so the message will be sent repeately next 
loop, which make the message duplicated.

I can reproduce this scenario normally.
In my opinion, I think Send::handleTimedOutRequests() is not much useful, 
because the response of sending request from broker is succesfully and has no 
error, which means brokers persist it successfully. And this function  will 
induce to the duplicated message problems.


  was:
I foud that KafkaProducer may send duplicated message sometimes, which is 
happend when:
     In Sender thread:
     NetworkClient::poll()
         -> this.selector.poll()    //send the message, such as "abc", and send 
it to broker successfully
         -> handleTimedOutRequests(responses,updatedNow);  //Judge whether the 
message  "abc" which is sent above is expired or timeout,  and the judge is  
based on the parameter  this.requestTimeoutMs and updatedNow;  
         -> response.request().callback().onComplete()
             -> 
completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now);   //If 
themessage was judged as expired, then it will be reenqueued and send repeatly 
next loop;
                 -> this.accumulator.reenqueue(batch,now);
The problem comes out:  If the message "abc" is sent successfully to broker, 
but it may be judged to expired, so the message will be sent repeately next 
loop, which make the message duplicated.

I can reproduce this scenario normally.
In my opinion, I think Send::handleTimedOutRequests() is not much useful, 
because the response of sending request from broker is succesfully and has no 
error. And this function  will induce to the duplicated message problems.



> KafkaProducer may send duplicated message sometimes
> ---------------------------------------------------
>
>                 Key: KAFKA-4951
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4951
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.1
>            Reporter: cuiyang
>
> I foud that KafkaProducer may send duplicated message sometimes, which is 
> happend when:
>      In Sender thread:
>      NetworkClient::poll()
>          -> this.selector.poll()    //send the message, such as "abc", and 
> send it to broker successfully
>          -> handleTimedOutRequests(responses,updatedNow);  //Judge whether 
> the message  "abc" which is sent above is expired or timeout,  and the judge 
> is  based on the parameter  this.requestTimeoutMs and updatedNow;  
>          -> response.request().callback().onComplete()
>              -> 
> completeBatch(batch,Errors.NETWORK_EXCEPTION,-1L,correlationId,now);   //If 
> themessage was judged as expired, then it will be reenqueued and send 
> repeatly next loop;
>                  -> this.accumulator.reenqueue(batch,now);
> The problem comes out:  If the message "abc" is sent successfully to broker, 
> but it may be judged to expired, so the message will be sent repeately next 
> loop, which make the message duplicated.
> I can reproduce this scenario normally.
> In my opinion, I think Send::handleTimedOutRequests() is not much useful, 
> because the response of sending request from broker is succesfully and has no 
> error, which means brokers persist it successfully. And this function  will 
> induce to the duplicated message problems.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to