Thanks for the feedback Bruno. I updated the KIP (I thought the info was there, but it seems it was too implicit).
1) Each time a client TimeoutException happens, we would log a WARN message. If `task.timeout.ms` expires, we would rethrow the last client `TimeoutException` to stop processing. 2) Yes, this first client `TimeoutException` will start the timer, and a successful retry would reset/disable it. -Matthias On 5/14/20 9:19 AM, John Roesler wrote: > Thanks for the update, Matthias! > > Other than Bruno’s good points, this proposal looks good to me. > > Thanks, > John > > On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote: >> Hi Matthias, >> >> Thank you for the KIP. I like your KIP. >> >> Here my feedback: >> >> 1. The KIP is not clear about what should happen when task.timeout.ms >> expires. To facilitate the mapping from the error users might >> encounter due to timeouts to this KIP, it would be good to state the >> error that will be thrown when task.timeout.ms expires. >> >> 2. The KIP does also not clearly state how task.timeout.ms is >> measured. Does the time start with the first timeout exception and >> then run until either the timeout expires or the task receives a >> successful reply? Or is it started each time the task is processed by >> the stream thread and stopped when its turn is over and when the sum >> of the single times without a successful reply reaches the timeout an >> error is thrown? >> >> Best, >> Bruno >> >> On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax <mj...@apache.org> wrote: >>> >>> John, Guozhang, >>> >>> thanks a lot for your feedback. I updated the KIP on a slightly >>> different angle: instead of using retries, we should switch to a timeout >>> based approach. I also extended the KIP to deprecate producer/admin >>> `retries` config. >>> >>> I like the proposal to skip a task if a client TimeoutException occurs >>> and just retry it later; update the KIP accordingly. However, I would >>> not retry forever by default. In general, all points you raised are >>> valid and the question is just what _default_ do we want to have. Given >>> the issue that tasks might get "out-of-sync" regarding their event-time >>> progress and that inexperience users might not do proper monitoring, I >>> prefer to have a "reasonable" default timeout if a task does not make >>> progress at all and fail for this case. >>> >>> I would also argue (following Guozhang) that we don't necessarily need >>> new metrics. Monitoring the number of alive threads (recently added), >>> consumer lag, processing rate etc should give an operator enough insight >>> into the application. I don't see the need atm to add some specify "task >>> timeout" metrics. >>> >>> For the issue of cascading failures, I would want to exclude it from >>> this KIP to keep it focused. >>> >>> >>> -Matthias >>> >>> >>> On 2/27/20 1:31 PM, Guozhang Wang wrote: >>>> Hello John, >>>> >>>> I'll make note that you owe me a beer now :) >>>> >>>> I think I'm leaning towards your approach as well based on my observations >>>> on previously reported timeout exceptions in the past. I once left some >>>> thoughts on Matthias' PR here >>>> https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510 >>> and I >>>> think I can better summarize my thoughts in this thread: >>>> >>>> 1) First of all, we need to think from user's perspective, what they'd >>>> really want to be notified: >>>> >>>> a. "If my application cannot make progress temporarily due to various >>>> transient issues (John had listed several examples already), just handle >>>> that internally and I do not wanna be notified and worry about how to tune >>>> my timeout configs at all". >>>> b. "If my application cannot make progress for a long time, which is >>> likely >>>> due to a bad config, a human error, network issues, etc such that I should >>>> be involved in the loop of trouble shooting, let me know sooner than >>> later". >>>> >>>> and what they'd not preferred but may happen today: >>>> >>>> c. "one transient error cause a thread to die, but then after tasks >>>> migrated everything goes to normal; so the application silently lost a >>>> thread without letting me know"; in fact, in such cases even a cascading >>>> exception that eventually kills all thread may be better since at >>> least the >>>> users would be notified. >>>> >>>> Based on that, instead of retrying immediately at the granularity each >>>> blocking call, it should be sufficient to only consider the handling logic >>>> at the thread level. That is, within an iteration of the thread, it would >>>> try to: >>>> >>>> * initialized some created tasks; >>>> * restored some restoring tasks; >>>> * processed some running tasks who have buffered records that are >>>> processable; >>>> * committed some tasks. >>>> >>>> In each of these steps, we may need to make some blocking calls in the >>>> underlying embedded clients, and if either of them timed out, we would not >>>> be able to make progress partially or not being able to make any progress >>>> at all. If we still want to set a configured value of "retries", I think a >>>> better idea would be to say "if we cannot make progress for consecutive N >>>> iterations of a thread, the user should be notified". >>>> >>>> --------------- >>>> >>>> 2) Second, let's consider what's a good way to notify the user. Today our >>>> way of notification is simple: throw the exception all the way up to >>> user's >>>> uncaught-exception-handler (if it's registered) and let the thread >>> die. I'm >>>> wondering if we could instead educate users to watch on some key metrics >>>> for "progress indicate" than relying on the exception handler though. Some >>>> candidates in mind: >>>> >>>> * consumer-lag: this is for both source topics and for repartition topics, >>>> it indicates if one or more of the tasks within each sub-topology is >>>> lagging or not; in the case where *some or all* of the threads cannot make >>>> progress. E.g. if a downstream task's thread is blocked somehow while its >>>> upstream task's thread is not, then the consumer-lag on the repartition >>>> topic would keep growing. >>>> >>>> * *idle* state: this is an idea we discussed in >>>> https://issues.apache.org/jira/browse/KAFKA-6520, i.e. to introduce an >>>> instance-level new state, if all threads of the instance cannot make >>>> progress (primarily for the reason that it cannot talk to the brokers). >>>> >>>> * process-rate: this is at thread-level. However if some tasks cannot make >>>> progress while others can still make progress within a thread, its >>>> process-rate would now drop to zero and it's a bit hard to indicate >>>> compared with comsumer-lag. >>>> >>>> If we feel that relying on metrics is better than throwing the exception >>>> and let the thread die, then we would not need to have the "retry" config >>>> as well. >>>> >>>> --------------- >>>> >>>> 3) This maybe semi-related to the timeout itself, but as I mentioned today >>>> one common issue we would need to resolve is to lose a thread BUT not >>>> losing the whole instance. In other words, we should consider when we have >>>> to throw an exception from a thread (not due to timeouts, but say due to >>>> some fatal error), should we just kill the corresponding thread or should >>>> we be more brutal and just kill the whole instance instead. I'm happy to >>>> defer this to another discussion thread but just bring this up here. >>>> >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Thu, Feb 27, 2020 at 10:40 AM John Roesler <vvcep...@apache.org> wrote: >>>> >>>>> Hi Matthias, >>>>> >>>>> Thanks for the proposal! I think this will be a wonderful improvement >>>>> to Streams. In particular, thanks for the motivation. It would indeed >>>>> be nice not to have to set long timeout configs and block individual >>>>> client requests in order to cope with transient slow responses. >>>>> >>>>> I'm very well aware that this might make me sound like a crazy person, >>>>> but one alternative I'd like to consider is not bounding the retries at >>>>> all. >>>>> Instead, Streams would just skip over timed-out tasks and try again >>>>> on the next iteration, as you proposed, but would continue to do so >>>>> indefinitely. Clearly, we shouldn't do such a thing silently, so I'd >>>>> further >>>>> propose to log a warning every time a task times out and also maintain >>>>> a new metric indicating task timeouts. >>>>> >>>>> To see why this might be attractive, let me pose a hypothetical >>>>> installation >>>>> which has thousands of Streams instances, maybe as part of hundreds of >>>>> applications belonging to dozens of teams. Let's also assume there is a >>>>> single broker cluster serving all these instances. Such an >>> environment has >>>>> a number of transient failure modes: >>>>> * A single broker instance may become slow to respond due to hardware >>>>> failures (e.g., a bad NIC) or other environmental causes (CPU competition >>>>> with co-resident processes, long JVM GC pauses, etc.). Single-broker >>>>> unavailability could cause some tasks to time out while others can >>> proceed >>>>> in an individual Streams instance. >>>>> * The entire broker cluster could become temporarily unavailable >>> (consider: >>>>> a faulty firewall configuration gets deployed, severing all Streams >>>>> instances >>>>> from the brokers). >>>>> * A faulty security configuration may temporarily sever whole application >>>>> from >>>>> the brokers. >>>>> * Any number of causes could likewise sever a single instance in a single >>>>> application from all brokers. >>>>> * Finally, networking problems can disconnect arbitrary pairs of Streams >>>>> instances and Broker instances. >>>>> >>>>> This is not an accounting of all possible failure modes, obviously, >>> but the >>>>> point is that, in a large, decentralized organization, you can experience >>>>> lots of transient failures that have some features in common: >>>>> F1. It's someone else's fault, and someone else must take action to >>> fix it. >>>>> F2. It will take "human time" to fix it. I.e., hours, not milliseconds. >>>>> F3. A single failure can affect "everyone" (e.g., one broker with >>> long GCs >>>>> can cause timeouts in all thousands of instances over all dozens of >>> teams). >>>>> >>>>> As an operator belonging to one team, whether we bound retries or not, >>>>> I would need to be alerted when the app stops making progress, I'd need >>>>> to investigate, and in the above cases, I'd need to escalate to the >>> network >>>>> and/or broker infrastructure teams. >>>>> >>>>> Clearly, I can be alerted either by threads dying or by non-progress >>>>> metrics. >>>>> As a responsible operator, I'd have alerts on _both_, so we shouldn't >>>>> consider >>>>> either signal to be "louder" or more reliable than the other. >>>>> >>>>> A side observation: in a lot of the failure modes, a specific task won't >>>>> be able >>>>> to make progress no matter which thread or instance it's on (i.e., if the >>>>> transaction coordinator for one of its output partitions is slow or >>>>> unresponsive). >>>>> Therefore, killing the thread with a bounded retry config would only >>> result >>>>> in a cascade of thread deaths across all my instances until either I run >>>>> out of >>>>> threads or the incident is resolved. >>>>> >>>>> The key questions to me are: >>>>> Q1. Do we want to continue trying to make what progress we can while >>>>> the incident is being investigated and remediated? >>>>> Q2. Should I (the operator for a single team) have to take any action >>> once >>>>> the infrastructure failures are resolved? >>>>> >>>>> We can paraphrase these as, "do you want your business to grind to a halt >>>>> due to a single failure?", and "do you want everyone to stay up all night >>>>> waiting for a fix so they can all restart their applications?" >>>>> >>>>> Just from the operator/business perspective, it seems like we want: >>>>> Q1:yes and Q2:no, which in combination with F1,2,3 above indicates >>>>> to me that it would be better for Streams to just keep retrying >>>>> indefinitely. >>>>> >>>>> There is one point I think you've mentioned to me in the past that it >>>>> may not be _safe_ to just quit working on one specific task while >>>>> progressing on others. If we have a repartition topic sourced by >>>>> two tasks T1 and T2, and feeding a windowed aggregation (for example), >>>>> then failing to process T1 while continuing on T2 for a long time >>>>> would cause a lot of timestamp skew, and could ultimately result in all >>>>> those delayed records in T1 being out of grace period by the time they >>>>> get processed. Arguably, this is a completely normal and expected >>>>> situation in a distributed system, which is why we have grace period to >>>>> begin with, but since the cause of this particular skew is inside of >>>>> Streams, it would be possible and nice to detect and avoid the situation. >>>>> >>>>> However, we should note that killing a single thread that hosts T1 will >>>>> _not_ deterministically halt processing on T2, nor will stopping the >>>>> single instance that hosts T1, since T2 might be on another instance. >>>>> We would need a cluster-wide broadcast of some kind to either halt >>>>> all processing on all tasks, or (more sophisticated) to halt processing >>>>> on T2 when we detect non-progress of T1. >>>>> >>>>> Depending on the failure mode, it's possible that just shuffling the >>> tasks >>>>> around could let us start making progress again, for example when only >>>>> a single Streams instance can't reach one or more brokers. However, this >>>>> objective is not accomplished by just stopping one thread, we would need >>>>> to relocate all tasks off the affected instance to attempt this >>>>> remediation. >>>>> >>>>> A separate thing to point out is that, just an instance being unavailable >>>>> for processing does not imply that it is also unavailable for querying. >>>>> Especially in light of KIP-535, where we started to support querying >>>>> "stale" stores, it seems worthwhile to keep the threads and instances >>>>> alive, even if they pause processing. >>>>> >>>>> Well, if you've made it this far, congratulations. I owe you a beer. >>> I hope >>>>> you don't interpret the length of this email as reflective of my >>> zeal. I'm >>>>> also ok with a bounded retries config of some kind, but I wanted to be >>>>> sure we've considered the above effects. >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> >>>>> On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote: >>>> Hi, >>>> >>>> I would like to propose KIP-572 to make Kafka Streams more robust with >>>> regard to timeout exception handling. >>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim >>>> eouts+and+retries+in+Kafka+Streams >>>> >>>> Note, that there is a long list of rejected alternatives that can be >>>> used as starting point for the discussion. In fact, I am not sure if >>>> one of those listed alternative might be better than the current >>>> proposal -- I just had to pick one design for now (the reason why I >>>> picked the current design is that it's semantically fully backward >>>> compatible and does not introduce any new configs). >>>> >>>> Looking forward to your feedback. >>>> >>>> -Matthias >>>>>> >>>>> >>>> >>>> >>> >>
signature.asc
Description: OpenPGP digital signature