More debugging notes and questions:

In Samza 0.10, the `retries` parameter is honoured and passed to kafka;
however, Samza itself retries sending failed the message in case the
exception is an instance of RetriableException:
```
class KafkaSystemProducer

(exception, loop) => {
  if(exception != null && !exception.isInstanceOf[RetriableException])
{   // Exception is thrown & not retriable
    debug("Exception detail : ", exception)
    //Close producer
    stop()
    producer = null
    //Mark loop as done as we are not going to retry
    loop.done
    metrics.sendFailed.inc
    throw new SamzaException("Failed to send message. Exception:\n
%s".format(exception))
  } else {
    warn("Retrying send messsage due to RetriableException - %s. Turn
on debugging to get a full stack trace".format(exception))
    debug("Exception detail:", exception)
    metrics.retries.inc
  }
}

```

on checking kafka codebase, it appears that kafka client only retries if
the exception being thrown is instance of RetriableException.
```

class Sender

private boolean canRetry(RecordBatch batch, Errors error) {
    return batch.attempts < this.retries && error.exception()
instanceof RetriableException;
}

```
And on failing, it returns the exception as it is to the caller.

So, if I understand correctly, Samza will retry this message with Kafka and
unless the problem that caused this in first place has resolved by now,
this will be an infinite attempt loop.

Have I understood this correctly? and if so, should user be given a hook
point to control the behavior on underlying failures?

On Fri, Jul 29, 2016 at 5:07 PM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> (correction: we are using samza 0.9.0)
>
> On Fri, Jul 29, 2016 at 12:09 PM, Gaurav Agarwal <gauravagarw...@gmail.com
> > wrote:
>
>> Hi All,
>>
>> We are using Samza (0.10.0) in our system and recently ran into a
>> problem where due to Kafka broker being unstable for few moments, our samza
>> tasks while trying to write message to kafka got exceptions. After that
>> moment, they went into a very long retry loop (Integer.MAX times).
>>
>> The repeated warning lines we are getting in container logs are:
>> *.*
>> *.*
>>
>> *WARN [2016-05-23
>> 06:41:36,645] [U:260,F:293,T:552,M:2,267] 
>> producer.internals.Sender:[Sender:completeBatch:257] - 
>> [kafka-producer-network-thread
>> | samza_producer-job4-1-1463686278936-2] - Got error produce response with
>> correlation id 5888322 on topic-partition Topic3-0, retrying (2144537752
>> <%282144537752> attempts left). Error: CORRUPT_MESSAGE*
>> *.*
>> *.*
>>
>> We experimented with setting the kafka producer 'retries' configuration
>> to a smaller number but it appears that samza does not permit overriding
>> this parameter. On top of it there is some additional Samza level retry
>> logic to re-send the message if kafka errored with a 'RetriableException'
>>
>> May I know what is the reason for disallowing this override?
>> Additionally, what is the recommended way to handle such situations?
>>
>> I would have thought that a possible policy would be that if after K
>> (configured by user) kafka retries, samza-kafka was still unable to send
>> the message, it could have thrown an exception out to the user land and let
>> the user determine what is to be done - in our case we would have chosen to
>> kill the container and have yarn samza app master request for a new one
>> from Yarn.
>>
>> There seem to be at-least a couple of bugs related to this already open
>>
>>
>>    1. https://issues.apache.org/jira/browse/SAMZA-610
>>    2. https://issues.apache.org/jira/browse/SAMZA-911
>>
>>
>> cheers,
>> gaurav
>>
>>
>

Reply via email to