Hi Matthias,

Thanks for the proposal! I think this will be a wonderful improvement
to Streams. In particular, thanks for the motivation. It would indeed
be nice not to have to set long timeout configs and block individual 
client requests in order to cope with transient slow responses.

I'm very well aware that this might make me sound like a crazy person,
but one alternative I'd like to consider is not bounding the retries at all.
Instead, Streams would just skip over timed-out tasks and try again
on the next iteration, as you proposed, but would continue to do so
indefinitely. Clearly, we shouldn't do such a thing silently, so I'd further
propose to log a warning every time a task times out and also maintain
a new metric indicating task timeouts.

To see why this might be attractive, let me pose a hypothetical installation
which has thousands of Streams instances, maybe as part of hundreds of
applications belonging to dozens of teams. Let's also assume there is a
single broker cluster serving all these instances. Such an environment has
a number of transient failure modes:
* A single broker instance may become slow to respond due to hardware
failures (e.g., a bad NIC) or other environmental causes (CPU competition
with co-resident processes, long JVM GC pauses, etc.). Single-broker
unavailability could cause some tasks to time out while others can proceed
in an individual Streams instance.
* The entire broker cluster could become temporarily unavailable (consider:
a faulty firewall configuration gets deployed, severing all Streams instances
from the brokers).
* A faulty security configuration may temporarily sever whole application from
the brokers.
* Any number of causes could likewise sever a single instance in a single
application from all brokers.
* Finally, networking problems can disconnect arbitrary pairs of Streams
instances and Broker instances.

This is not an accounting of all possible failure modes, obviously, but the
point is that, in a large, decentralized organization, you can experience
lots of transient failures that have some features in common:
F1. It's someone else's fault, and someone else must take action to fix it.
F2. It will take "human time" to fix it. I.e., hours, not milliseconds.
F3. A single failure can affect "everyone" (e.g., one broker with long GCs
can cause timeouts in all thousands of instances over all dozens of teams).

As an operator belonging to one team, whether we bound retries or not,
I would need to be alerted when the app stops making progress, I'd need
to investigate, and in the above cases, I'd need to escalate to the network
and/or broker infrastructure teams.

Clearly, I can be alerted either by threads dying or by non-progress metrics.
As a responsible operator, I'd have alerts on _both_, so we shouldn't consider
either signal to be "louder" or more reliable than the other.

A side observation: in a lot of the failure modes, a specific task won't be able
to make progress no matter which thread or instance it's on (i.e., if the
transaction coordinator for one of its output partitions is slow or 
unresponsive).
Therefore, killing the thread with a bounded retry config would only result
in a cascade of thread deaths across all my instances until either I run out of
threads or the incident is resolved.

The key questions to me are:
Q1. Do we want to continue trying to make what progress we can while
the incident is being investigated and remediated?
Q2. Should I (the operator for a single team) have to take any action once
the infrastructure failures are resolved?

We can paraphrase these as, "do you want your business to grind to a halt
due to a single failure?", and "do you want everyone to stay up all night
waiting for a fix so they can all restart their applications?"

Just from the operator/business perspective, it seems like we want:
Q1:yes and Q2:no, which in combination with F1,2,3 above indicates
to me that it would be better for Streams to just keep retrying indefinitely.

There is one point I think you've mentioned to me in the past that it
may not be _safe_ to just quit working on one specific task while
progressing on others. If we have a repartition topic sourced by
two tasks T1 and T2, and feeding a windowed aggregation (for example),
then failing to process T1 while continuing on T2 for a long time
would cause a lot of timestamp skew, and could ultimately result in all
those delayed records in T1 being out of grace period by the time they
get processed. Arguably, this is a completely normal and expected
situation in a distributed system, which is why we have grace period to
begin with, but since the cause of this particular skew is inside of
Streams, it would be possible and nice to detect and avoid the situation.

However, we should note that killing a single thread that hosts T1 will
_not_ deterministically halt processing on T2, nor will stopping the
single instance that hosts T1, since T2 might be on another instance.
We would need a cluster-wide broadcast of some kind to either halt
all processing on all tasks, or (more sophisticated) to halt processing
on T2 when we detect non-progress of T1.

Depending on the failure mode, it's possible that just shuffling the tasks
around could let us start making progress again, for example when only
a single Streams instance can't reach one or more brokers. However, this
objective is not accomplished by just stopping one thread, we would need
to relocate all tasks off the affected instance to attempt this remediation.

A separate thing to point out is that, just an instance being unavailable
for processing does not imply that it is also unavailable for querying.
Especially in light of KIP-535, where we started to support querying
"stale" stores, it seems worthwhile to keep the threads and instances
alive, even if they pause processing.

Well, if you've made it this far, congratulations. I owe you a beer. I hope
you don't interpret the length of this email as reflective of my zeal. I'm
also ok with a bounded retries config of some kind, but I wanted to be
sure we've considered the above effects.

Thanks,
-John


On Sat, Feb 22, 2020, at 00:51, Matthias J. Sax wrote:
> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
> 
> Hi,
> 
> I would like to propose KIP-572 to make Kafka Streams more robust with
> regard to timeout exception handling.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+tim
> eouts+and+retries+in+Kafka+Streams
> 
> Note, that there is a long list of rejected alternatives that can be
> used as starting point for the discussion. In fact, I am not sure if
> one of those listed alternative might be better than the current
> proposal -- I just had to pick one design for now (the reason why I
> picked the current design is that it's semantically fully backward
> compatible and does not introduce any new configs).
> 
> Looking forward to your feedback.
> 
> - -Matthias
> -----BEGIN PGP SIGNATURE-----
> 
> iQIzBAEBCgAdFiEEI8mthP+5zxXZZdDSO4miYXKq/OgFAl5Qz2kACgkQO4miYXKq
> /Oj6mw/9E/AZlhMZRb1WKPENxeacXNLtlzamJZira9tcbQVGZ6/PBldFrx/T0/rG
> HooPuyb4m3mFPB1JJ5lc5VujkIVGbet5Xq6MHishJ1LEKgVKtXLWlhp6RMZAfNCK
> hzzwVV5Ddkc7ooKMAlIzb16Yfxr99YVl9umMO/rroPp7RWgIVM5jHIWXH7sGUDSA
> qElyuIdUkDXq0QzITt65QWHeWfy59RbLSetvDZmgaZ8IT20IBur80LSrNlfLfHk6
> XxjtPUm0OEplp8mrVYw4mGR+SX2aMjEjZ9PUpSV8hHoQjf6jF5TmZJPOd+Gv3b8v
> WtqTFHRvXaz5gdGBmR5evj60OOETwZcqspJ+PGNRQmu9MO/fJ6iMPiz5FK7I34om
> 43dwnKvmUdJakFkcsF7rHzuU5zp9txlnyCTQGqB6U34cC3RuUPNUEKDjFXSLXTXd
> XgDagg+TK8sa3v+zFrk6Y/gbX4jGEBf/DOzxt980Pu5ahGznefGbAuVZ6SDAIhm5
> 3NHiHGXRIhbp++gknPOq8UB1/eoshk6iL7+L/W1m2nnmvl/HvJIy0+w/5Mv9VvPF
> 01NVryC6jE2u6eE0SLDHA/dBaQ6TY0nk/1fIadJTmgfhUXUFC16JPmrUuBMkd+fN
> QuTXHZJKS/brcg+DL+L01nd5nKn6jKH+OB+VxFQJuVCdSo4bKzg=
> =53Xz
> -----END PGP SIGNATURE-----
>

Reply via email to