Thanks for the feedback Bruno. I updated the KIP (I thought the info was
there, but it seems it was too implicit).


1) Each time a client TimeoutException happens, we would log a WARN
message. If `task.timeout.ms` expires, we would rethrow the last client
`TimeoutException` to stop processing.


2) Yes, this first client `TimeoutException` will start the timer, and a
successful retry would reset/disable it.


-Matthias

On 5/14/20 9:19 AM, John Roesler wrote:
> Thanks for the update, Matthias!
> 
> Other than Bruno’s good points, this proposal looks good to me. 
> 
> Thanks,
> John
> 
> On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote:
>> Hi Matthias,
>>
>> Thank you for the KIP. I like your KIP.
>>
>> Here my feedback:
>>
>> 1. The KIP is not clear about what should happen when task.timeout.ms
>> expires. To facilitate the mapping from the error users might
>> encounter due to timeouts to this KIP, it would be good to state the
>> error that will be thrown when task.timeout.ms expires.
>>
>> 2. The KIP does also not clearly state how task.timeout.ms is
>> measured. Does the time start with the first timeout exception and
>> then run until either the timeout expires or the task receives a
>> successful reply? Or is it started each time the task is processed by
>> the stream thread and stopped when its turn is over and when the sum
>> of the single times without a successful reply reaches the timeout an
>> error is thrown?
>>
>> Best,
>> Bruno
>>
>> On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>> John, Guozhang,
>>>
>>> thanks a lot for your feedback. I updated the KIP on a slightly
>>> different angle: instead of using retries, we should switch to a timeout
>>> based approach. I also extended the KIP to deprecate producer/admin
>>> `retries` config.
>>>
>>> I like the proposal to skip a task if a client TimeoutException occurs
>>> and just retry it later; update the KIP accordingly. However, I would
>>> not retry forever by default. In general, all points you raised are
>>> valid and the question is just what _default_ do we want to have. Given
>>> the issue that tasks might get "out-of-sync" regarding their event-time
>>> progress and that inexperience users might not do proper monitoring, I
>>> prefer to have a "reasonable" default timeout if a task does not make
>>> progress at all and fail for this case.
>>>
>>> I would also argue (following Guozhang) that we don't necessarily need
>>> new metrics. Monitoring the number of alive threads (recently added),
>>> consumer lag, processing rate etc should give an operator enough insight
>>> into the application. I don't see the need atm to add some specify "task
>>> timeout" metrics.
>>>
>>> For the issue of cascading failures, I would want to exclude it from
>>> this KIP to keep it focused.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 2/27/20 1:31 PM, Guozhang Wang wrote:
>>>> Hello John,
>>>>
>>>> I'll make note that you owe me a beer now :)
>>>>
>>>> I think I'm leaning towards your approach as well based on my observations
>>>> on previously reported timeout exceptions in the past. I once left some
>>>> thoughts on Matthias' PR here
>>>> https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
>>> and I
>>>> think I can better summarize my thoughts in this thread:
>>>>
>>>> 1) First of all, we need to think from user's perspective, what they'd
>>>> really want to be notified:
>>>>
>>>> a. "If my application cannot make progress temporarily due to various
>>>> transient issues (John had listed several examples already), just handle
>>>> that internally and I do not wanna be notified and worry about how to tune
>>>> my timeout configs at all".
>>>> b. "If my application cannot make progress for a long time, which is
>>> likely
>>>> due to a bad config, a human error, network issues, etc such that I should
>>>> be involved in the loop of trouble shooting, let me know sooner than
>>> later".
>>>>
>>>> and what they'd not preferred but may happen today:
>>>>
>>>> c. "one transient error cause a thread to die, but then after tasks
>>>> migrated everything goes to normal; so the application silently lost a
>>>> thread without letting me know"; in fact, in such cases even a cascading
>>>> exception that eventually kills all thread may be better since at
>>> least the
>>>> users would be notified.
>>>>
>>>> Based on that, instead of retrying immediately at the granularity each
>>>> blocking call, it should be sufficient to only consider the handling logic
>>>> at the thread level. That is, within an iteration of the thread, it would
>>>> try to:
>>>>
>>>> * initialized some created tasks;
>>>> * restored some restoring tasks;
>>>> * processed some running tasks who have buffered records that are
>>>> processable;
>>>> * committed some tasks.
>>>>
>>>> In each of these steps, we may need to make some blocking calls in the
>>>> underlying embedded clients, and if either of them timed out, we would not
>>>> be able to make progress partially or not being able to make any progress
>>>> at all. If we still want to set a configured value of "retries", I think a
>>>> better idea would be to say "if we cannot make progress for consecutive N
>>>> iterations of a thread, the user should be notified".
>>>>
>>>> ---------------
>>>>
>>>> 2) Second, let's consider what's a good way to notify the user. Today our
>>>> way of notification is simple: throw the exception all the way up to
>>> user's
>>>> uncaught-exception-handler (if it's registered) and let the thread
>>> die. I'm
>>>> wondering if we could instead educate users to watch on some key metrics
>>>> for "progress indicate" than relying on the exception handler though. Some
>>>> candidates in mind:
>>>>
>>>> * consumer-lag: this is for both source topics and for repartition topics,
>>>> it indicates if one or more of the tasks within each sub-topology is
>>>> lagging or not; in the case where *some or all* of the threads cannot make
>>>> progress. E.g. if a downstream task's thread is blocked somehow while its
>>>> upstream task's thread is not, then the consumer-lag on the repartition
>>>> topic would keep growing.
>>>>
>>>> * *idle* state: this is an idea we discussed in
>>>> https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an
>>>> instance-level new state, if all threads of the instance cannot make
>>>> progress (primarily for the reason that it cannot talk to the brokers).
>>>>
>>>> * process-rate: this is at thread-level. However if some tasks cannot make
>>>> progress while others can still make progress within a thread, its
>>>> process-rate would now drop to zero and it's a bit hard to indicate
>>>> compared with comsumer-lag.
>>>>
>>>> If we feel that relying on metrics is better than throwing the exception
>>>> and let the thread die, then we would not need to have the "retry" config
>>>> as well.
>>>>
>>>> ---------------
>>>>
>>>> 3) This maybe semi-related to the timeout itself, but as I mentioned today
>>>> one common issue we would need to resolve is to lose a thread BUT not
>>>> losing the whole instance. In other words, we should consider when we have
>>>> to throw an exception from a thread (not due to timeouts, but say due to
>>>> some fatal error), should we just kill the corresponding thread or should
>>>> we be more brutal and just kill the whole instance instead. I'm happy to
>>>> defer this to another discussion thread but just bring this up here.
>>>>
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vvcep...@apache.org> wrote:
>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> Thanks for the proposal! I think this will be a wonderful improvement
>>>>> to Streams. In particular, thanks for the motivation. It would indeed
>>>>> be nice not to have to set long timeout configs and block individual
>>>>> client requests in order to cope with transient slow responses.
>>>>>
>>>>> I'm very well aware that this might make me sound like a crazy person,
>>>>> but one alternative I'd like to consider is not bounding the retries at
>>>>> all.
>>>>> Instead, Streams would just skip over timed-out tasks and try again
>>>>> on the next iteration, as you proposed, but would continue to do so
>>>>> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd
>>>>> further
>>>>> propose to log a warning every time a task times out and also maintain
>>>>> a new metric indicating task timeouts.
>>>>>
>>>>> To see why this might be attractive, let me pose a hypothetical
>>>>> installation
>>>>> which has thousands of Streams instances, maybe as part of hundreds of
>>>>> applications belonging to dozens of teams. Let's also assume there is a
>>>>> single broker cluster serving all these instances. Such an
>>> environment has
>>>>> a number of transient failure modes:
>>>>> * A single broker instance may become slow to respond due to hardware
>>>>> failures (e.g., a bad NIC) or other environmental causes (CPU competition
>>>>> with co-resident processes, long JVM GC pauses, etc.). Single-broker
>>>>> unavailability could cause some tasks to time out while others can
>>> proceed
>>>>> in an individual Streams instance.
>>>>> * The entire broker cluster could become temporarily unavailable
>>> (consider:
>>>>> a faulty firewall configuration gets deployed, severing all Streams
>>>>> instances
>>>>> from the brokers).
>>>>> * A faulty security configuration may temporarily sever whole application
>>>>> from
>>>>> the brokers.
>>>>> * Any number of causes could likewise sever a single instance in a single
>>>>> application from all brokers.
>>>>> * Finally, networking problems can disconnect arbitrary pairs of Streams
>>>>> instances and Broker instances.
>>>>>
>>>>> This is not an accounting of all possible failure modes, obviously,
>>> but the
>>>>> point is that, in a large, decentralized organization, you can experience
>>>>> lots of transient failures that have some features in common:
>>>>> F1. It's someone else's fault, and someone else must take action to
>>> fix it.
>>>>> F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
>>>>> F3. A single failure can affect "everyone" (e.g., one broker with
>>> long GCs
>>>>> can cause timeouts in all thousands of instances over all dozens of
>>> teams).
>>>>>
>>>>> As an operator belonging to one team, whether we bound retries or not,
>>>>> I would need to be alerted when the app stops making progress, I'd need
>>>>> to investigate, and in the above cases, I'd need to escalate to the
>>> network
>>>>> and/or broker infrastructure teams.
>>>>>
>>>>> Clearly, I can be alerted either by threads dying or by non-progress
>>>>> metrics.
>>>>> As a responsible operator, I'd have alerts on _both_, so we shouldn't
>>>>> consider
>>>>> either signal to be "louder" or more reliable than the other.
>>>>>
>>>>> A side observation: in a lot of the failure modes, a specific task won't
>>>>> be able
>>>>> to make progress no matter which thread or instance it's on (i.e., if the
>>>>> transaction coordinator for one of its output partitions is slow or
>>>>> unresponsive).
>>>>> Therefore, killing the thread with a bounded retry config would only
>>> result
>>>>> in a cascade of thread deaths across all my instances until either I run
>>>>> out of
>>>>> threads or the incident is resolved.
>>>>>
>>>>> The key questions to me are:
>>>>> Q1. Do we want to continue trying to make what progress we can while
>>>>> the incident is being investigated and remediated?
>>>>> Q2. Should I (the operator for a single team) have to take any action
>>> once
>>>>> the infrastructure failures are resolved?
>>>>>
>>>>> We can paraphrase these as, "do you want your business to grind to a halt
>>>>> due to a single failure?", and "do you want everyone to stay up all night
>>>>> waiting for a fix so they can all restart their applications?"
>>>>>
>>>>> Just from the operator/business perspective, it seems like we want:
>>>>> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
>>>>> to me that it would be better for Streams to just keep retrying
>>>>> indefinitely.
>>>>>
>>>>> There is one point I think you've mentioned to me in the past that it
>>>>> may not be _safe_ to just quit working on one specific task while
>>>>> progressing on others. If we have a repartition topic sourced by
>>>>> two tasks T1 and T2, and feeding a windowed aggregation (for example),
>>>>> then failing to process T1 while continuing on T2 for a long time
>>>>> would cause a lot of timestamp skew, and could ultimately result in all
>>>>> those delayed records in T1 being out of grace period by the time they
>>>>> get processed. Arguably, this is a completely normal and expected
>>>>> situation in a distributed system, which is why we have grace period to
>>>>> begin with, but since the cause of this particular skew is inside of
>>>>> Streams, it would be possible and nice to detect and avoid the situation.
>>>>>
>>>>> However, we should note that killing a single thread that hosts T1 will
>>>>> _not_ deterministically halt processing on T2, nor will stopping the
>>>>> single instance that hosts T1, since T2 might be on another instance.
>>>>> We would need a cluster-wide broadcast of some kind to either halt
>>>>> all processing on all tasks, or (more sophisticated) to halt processing
>>>>> on T2 when we detect non-progress of T1.
>>>>>
>>>>> Depending on the failure mode, it's possible that just shuffling the
>>> tasks
>>>>> around could let us start making progress again, for example when only
>>>>> a single Streams instance can't reach one or more brokers. However, this
>>>>> objective is not accomplished by just stopping one thread, we would need
>>>>> to relocate all tasks off the affected instance to attempt this
>>>>> remediation.
>>>>>
>>>>> A separate thing to point out is that, just an instance being unavailable
>>>>> for processing does not imply that it is also unavailable for querying.
>>>>> Especially in light of KIP-535, where we started to support querying
>>>>> "stale" stores, it seems worthwhile to keep the threads and instances
>>>>> alive, even if they pause processing.
>>>>>
>>>>> Well, if you've made it this far, congratulations. I owe you a beer.
>>> I hope
>>>>> you don't interpret the length of this email as reflective of my
>>> zeal. I'm
>>>>> also ok with a bounded retries config of some kind, but I wanted to be
>>>>> sure we've considered the above effects.
>>>>>
>>>>> Thanks,
>>>>> -John
>>>>>
>>>>>
>>>>> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
>>>> Hi,
>>>>
>>>> I would like to propose KIP-572 to make Kafka Streams more robust with
>>>> regard to timeout exception handling.
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
>>>> eouts+and+retries+in+Kafka+Streams
>>>>
>>>> Note, that there is a long list of rejected alternatives that can be
>>>> used as starting point for the discussion. In fact, I am not sure if
>>>> one of those listed alternative might be better than the current
>>>> proposal -- I just had to pick one design for now (the reason why I
>>>> picked the current design is that it's semantically fully backward
>>>> compatible and does not introduce any new configs).
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>> -Matthias
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to