[ 
https://issues.apache.org/jira/browse/KAFKA-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14277973#comment-14277973
 ] 

Chris Riccomini edited comment on KAFKA-1863 at 1/15/15 12:39 AM:
------------------------------------------------------------------

Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka 
producer to send messages. We batch inside Samza's KafkaSystemProducer, so 
calling KafkaSystemProducer.send() prepends your message to a buffer inside the 
KafkaSystemProducer (we don't send it to the underlying Kafka producer at this 
point). When the buffer reaches some max (say, 200), the send() call will 
trigger a sync KafkaSystemProducer.flush() call, which will call send() on the 
underlying producer. Since the underlying kafka producer's send() call is 
blocking, any exception that occurs happens on the thread that's calling 
send(), so we just simply wrap the send() call in a try/catch. Flush is also 
called before Samza checkpoints its offsets, to make sure that any outstanding 
output is sent (so we don't lose data if there's a failure).

Samza's KafkaSystemProducer is currently setup to retry forever, no matter 
what, when sends to Kafka fail. It waits 10s, and retries. This behavior has 
come in handy in scenarios where we have (un)expected downtime or cluster 
maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza 
jobs to kill themselves. We just want them to chill out until the grid's back.

This strategy has some drawbacks, though. 

# The KafkaSystemProducer.flush() call is blocking. This hurts throughput. 
Sometimes, quite severely.
# Some exceptions are never going to get better. Retrying doesn't make sense in 
this case.

When we upgrade to the new Producer, we want to address these issues. For (1), 
the new producer everything is async, so that gets fixed automatically. For 
(2), we'd like to only retry with RetriableException, and fail for the rest.

The trick is, if the sends are no longer sync, we have to have a mechanism by 
which we can verify that all outstanding messages have been successfully sent 
before we return.

{code}
send()
send()
send()
flush() // when this returns, all messages must be in Kafka.. inflight must be 
0 and buffer must be 0
{code}

The naive approach would be to just hold on to a future for each send() call, 
and stuff it in a list. When flush is invoked, simply iterate over the list, 
and call Future.get(). The confusing thing with this approach is what to do if 
Future.get() tells us that there was an exception when the message send was 
attempted. It seems that re-sending the message at this point would result in 
out of order messages. Is that correct?

{code}
send() // success
send() // failed
send() // enqueued
flush()
{code}

In the above example, if we call Future.get() on the second send()'s future, 
and we see it's a failure, the third send() could have already happened. In 
this case, if we re-send the failed message, then we get the third message 
before the second, right?

Another oddity of this approach is that it seems we would have to double-buffer 
the messages in order to resend them, since RecordMetadata doesn't have the 
actual message that we sent. In order to re-send, we'd have to keep the message 
in a buffer in KafkaSystemProducer, along with its future, until the 
Future.get() returns successfully.

We then thought that we would set:

{noformat}
retries=2000000000
{noformat}

At this point, we thought that the producer would re-send forever from the send 
thread, which is pretty much what we wanted in the case of a failure. But then 
we realized that if we had multiple in-flight requests, this could lead to out 
of order messages, so we forced:

{noformat}
max.in.flight.requests.per.connection=1
{noformat}

So at this point, we believe that any message that gets into the send thread 
will be guaranteed to be sent, and in the right order, always.

But, talking with [~guozhang], it sounds like there are some cases where 
ApiExceptions are thrown from the main thread, not the send thread. In this 
case, it seems that our {{retries}} setting has no effect, because the message 
hasn't even made it into the queue for the send thread yet. So, if an exception 
is thrown in the main thread, we seem to have to catch the RetriableException 
that's thrown, and re-send the message.

This is the current implementation that [~navina] is working on. Part of me is 
thinking, "This can't be right. We must be misunderstanding something." So, I 
1) want to confirm that we're not misunderstanding anything, and 2) see if 
there is a better way to accomplish what we want:

# Async sends.
# A "flush" mechanism that blocks until all messages have been sent to Kafka.
# In-order retries when a RetriableFailure occurs.
# Forward a non-RetriableException when it occurs.

I'm pretty confident that my mental model is broken, so any help correcting it 
would be appreciated. Also, any tips on a better way to accomplish what we want 
would be appreciated.


was (Author: criccomini):
Here's how Samza's KafkaSystemProducer currently works. We use a sync Kafka 
producer to send messages. We batch inside Samza's KafkaSystemProducer, so 
calling KafkaSystemProducer.send() prepends your message to a buffer inside the 
KafkaSystemProducer (we don't send it to the underlying Kafka producer at this 
point). When the buffer reaches some max (say, 200), the send() call will 
trigger a sync KafkaSystemProducer.flush() call, which will call send() on the 
underlying producer. Since the underlying kafka producer's send() call is 
blocking, any exception that occurs happens on the thread that's calling 
send(), so we just simply wrap the send() call in a try/catch. Flush is also 
called before Samza checkpoints its offsets, to make sure that any outstanding 
output is sent (so we don't lose data if there's a failure).

Samza's KafkaSystemProducer is currently setup to retry forever, no matter 
what, when sends to Kafka fail. It waits 10s, and retries. This behavior has 
come in handy in scenarios where we have (un)expected downtime or cluster 
maintenance on a Kafka grid. When the grid's offline, we don't want 50 Samza 
jobs to kill themselves. We just want them to chill out until the grid's back.

This strategy has some drawbacks, though. 

# The send is blocking. This hurts throughput. Sometimes, quite severely.
# Some exceptions are never going to get better. Retrying doesn't make sense in 
this case.

When we upgrade to the new Producer, we want to address these issues. For (1), 
the new producer everything is async, so that gets fixed automatically. For 
(2), we'd like to only retry with RetriableException, and fail for the rest.

The trick is, if the sends are no longer sync, we have to have a mechanism by 
which we can verify that all outstanding messages have been successfully sent 
before we return.

{code}
send()
send()
send()
flush() // when this returns, all messages must be in Kafka.. inflight must be 
0 and buffer must be 0
{code}

The naive approach would be to just hold on to a future for each send() call, 
and stuff it in a list. When flush is invoked, simply iterate over the list, 
and call Future.get(). The confusing thing with this approach is what to do if 
Future.get() tells us that there was an exception when the message send was 
attempted. It seems that re-sending the message at this point would result in 
out of order messages. Is that correct?

{code}
send() // success
send() // failed
send() // enqueued
flush()
{code}

In the above example, if we call Future.get() on the second send()'s future, 
and we see it's a failure, the third send() could have already happened. In 
this case, if we re-send the failed message, then we get the third message 
before the second, right?

Another oddity of this approach is that it seems we would have to double-buffer 
the messages in order to resend them, since RecordMetadata doesn't have the 
actual message that we sent. In order to re-send, we'd have to keep the message 
in a buffer in KafkaSystemProducer, along with its future, until the 
Future.get() returns successfully.

We then thought that we would set:

{noformat}
retries=2000000000
{noformat}

At this point, we thought that the producer would re-send forever from the send 
thread, which is pretty much what we wanted in the case of a failure. But then 
we realized that if we had multiple in-flight requests, this could lead to out 
of order messages, so we forced:

{noformat}
max.in.flight.requests.per.connection=1
{noformat}

So at this point, we believe that any message that gets into the send thread 
will be guaranteed to be sent, and in the right order, always.

But, talking with [~guozhang], it sounds like there are some cases where 
ApiExceptions are thrown from the main thread, not the send thread. In this 
case, it seems that our {{retries}} setting has no effect, because the message 
hasn't even made it into the queue for the send thread yet. So, if an exception 
is thrown in the main thread, we seem to have to catch the RetriableException 
that's thrown, and re-send the message.

This is the current implementation that [~navina] is working on. Part of me is 
thinking, "This can't be right. We must be misunderstanding something." So, I 
1) want to confirm that we're not misunderstanding anything, and 2) see if 
there is a better way to accomplish what we want:

# Async sends.
# A "flush" mechanism that blocks until all messages have been sent to Kafka.
# In-order retries when a RetriableFailure occurs.
# Forward a non-RetriableException when it occurs.

I'm pretty confident that my mental model is broken, so any help correcting it 
would be appreciated. Also, any tips on a better way to accomplish what we want 
would be appreciated.

> Exception categories / hierarchy in clients
> -------------------------------------------
>
>                 Key: KAFKA-1863
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1863
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>             Fix For: 0.9.0
>
>
> In the new clients package we introduces a new set of exceptions, but its 
> hierarchy is not very clear as of today:
> {code}
> RuntimeException -> KafkaException -> BufferExhastedException
>                                                            -> ConfigException
>                                                            -> 
> SerializationException
>                                                            -> 
> QuotaViolationException
>                                                            -> SchemaException
>                                                            -> ApiException
> ApiException -> InvalidTopicException
>                      -> OffsetMetadataTooLarge (probabaly need to be renamed)
>                      -> RecordBatchTooLargeException
>                      -> RecordTooLargeException
>                      -> UnknownServerException
>                      -> RetriableException
> RetriableException -> CorruptRecordException
>                                -> InvalidMetadataException
>                                -> NotEnoughtReplicasAfterAppendException
>                                -> NotEnoughReplicasException
>                                -> OffsetOutOfRangeException
>                                -> TimeoutException
>                                -> UnknownTopicOrPartitionException
> {code}
> KafkaProducer.send() may throw KafkaExceptions that are not ApiExceptions; 
> other exceptions will be set in the returned future metadata.
> We need better to
> 1. Re-examine the hierarchy. For example, for producers only exceptions that 
> are thrown directly from the caller thread before it is appended to the batch 
> buffer should be ApiExceptions; some exceptions could be renamed / merged.
> 2. Clearly document the exception category / hierarchy as part of the release.
> [~criccomini] may have some more feedbacks for this issue from Samza's usage 
> experience. [~jkreps]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to