[ 
https://issues.apache.org/jira/browse/KAFKA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13621433#comment-13621433
 ] 

Scott Carey commented on KAFKA-156:
-----------------------------------

I am positive that the producer wire protocol has to have built-in features to 
support the ability to prevent dropped messages when brokers are unavailable.  
There is no way to achieve 'optimal transmission' without two-phase commit or 
idempotence between the producer and broker.  I define 'optimal transmission' 
as the guarantee that data is not duplicated or lost after some well known 
point has been reached as viewed by the producer.   Prior to this point (for 
example, when the message is in a in memory queue), there can be no guarantees 
from any system.

{quote}
"FWIW Clearspring has a pipeline with: ConcurrentQueue --> spill to disk queue 
with max size (then drops messages) --> SyncProducer with retry/backoff. "
{quote}
Such a system can get as close as only losing or duplicating one 'batch' of 
messages, where that batch size is >= 1 message.  At best, when reading form 
the data spilled from disk, between sending a batch and recieving 
acknowledgement, a crash at either end will leave that batch in limbo.   The 
batch needs an identifier that both sides can persist or generate to identify 
the batch in case one side has to recover from a crash.(two phase commit).  
Many database systems have this (see 
http://www.postgresql.org/docs/9.2/static/sql-prepare-transaction.html), where 
as a client you can name a transaction so that after you get an acknowledgement 
from the prepare commit, the client can log that it has been prepared, send the 
commit command, and if it crashes before getting the acknowledgement, upon 
recovery it can look up the identifier for the in flight commit, and check with 
the system to see if it succeeded or not.


We have an internal system that we are attempting to replace Kafka with, but it 
does not guarantee delivery as we do.  We spool data on our producers into 
batches (a file per batch), and then transfer these batches into the downstream 
system.  This system stores these batches in a staging area, so that if either 
side crashes before the batch transfer completes recovery is simple.  Upon 
validating that the batch (which is uniquely  named) is identical on both 
sides, the producer can remove it locally and promote from the staging area to 
the completed area (atomically).  This again is safe if either side crashes, 
since an item in the staging area that does not exist on the producer indicates 
it has successfully been moved.

Kafka will have to mimic this sort of safety at each stage.  On the consumer 
side, batch offsets + partition and topic information serve as unique 
identifiers for a batch that allow only-once semantics.  On the producer side, 
is there something equivalent? 

Replication mitigates the problem significantly, but there is still the 
possibility that an item is dropped or duplicated if there is a transient 
network issue that TCP/IP does not handle, if the broker does not hand out 
unique batch ids for each batch (I am unsure of this). 

 If messages are spooled to disk when a broker is unavailable, the process of 
reading back items from that log and sending them to the broker without loss or 
duplication is tricky.  Each batch of messages will need an identifier shared 
between the broker and producer, and the batch will need to be marked with the 
identifier safely to disk prior to sending the batch to the broker.  After 
acknowledgement the producer can delete the batch or mark it complete.  If it 
crashes between sending the batch to the broker and receiving a response (or 
otherwise fails to get acknowledgement) it must be able to ask the broker 
whether the batch with the given identifier was received, or alternatively, it 
can send the batch twice and the broker will ignore the duplicate send based on 
the identifier.

Does the producer wire protocol include batch ids generated by the broker so 
that this can be implemented?  It does not seem to be the case here 
https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-ProduceAPI
This protocol does not seem to support the ability to support "only once" 
message semantics.

                
> Messages should not be dropped when brokers are unavailable
> -----------------------------------------------------------
>
>                 Key: KAFKA-156
>                 URL: https://issues.apache.org/jira/browse/KAFKA-156
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Sharad Agarwal
>             Fix For: 0.8
>
>
> When none of the broker is available, producer should spool the messages to 
> disk and keep retrying for brokers to come back.
> This will also enable brokers upgrade/maintenance without message loss.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to