[
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168979#comment-17168979
]
John Roesler commented on KAFKA-10336:
--------------------------------------
I've upgraded this to a blocker for 2.7.0, so that we won't forget about it. It
was a regression in 2.3.0 when we changed the suppression buffer format the
first time, but we didn't detect it because of a testing gap.
However, it's a pretty serious issue, and will only become more impactful as
more people use suppression and as we make other internal topic format changes,
for example in the fix for https://issues.apache.org/jira/browse/KAFKA-10322
> Rolling upgrade with Suppression AND Standbys may throw exceptions
> ------------------------------------------------------------------
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
> Reporter: John Roesler
> Priority: Blocker
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may
> experience exceptions leading to threads shutting down on the OLD instances
> during a rolling upgrade. No corruption is expected, and when the rolling
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several
> times to fix bugs. The binary schema of the changelog values is determined by
> a version header on the records, and new versions are able to decode all old
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if
> it encounters a version number that it doesn't recognize, causing the thread
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing
> into the suppression buffer and sending the same messages into the changelog,
> while another "standby" worker reads those messages, decodes them, and
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active
> worker, what can happen today is that the active worker may write changelog
> messages with a higher version number than the standby worker can understand.
> When the standby worker receives one of these messages, it will throw the
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B)
> suppression, (C) standby replicas, (D) changelogged suppression buffers.
> Changing any of those four variables will prevent the issue from occurring. I
> would NOT recommend disabling (D), and (B) is probably off the table, since
> the application logic presumably depends on it. Therefore, your practical
> choices are to disable standbys (C), or to do a full-cluster bounce (A).
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal,
> they would only afflict the old-versioned nodes. I.e., the nodes you intend
> to replace anyway. So another "workaround" is simply to ignore the exceptions
> and proceed with the rolling bounce. As the old-versioned nodes are replaced
> with new-versioned nodes, the new nodes will again be able to decode their
> peers' changelog messages and be able to maintain the hot-standby replicas of
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it
> while expanding our system test coverage as part of KAFKA-10173. I added a
> rolling upgrade test with an application that uses both suppression and
> standby replicas, and observed that the rolling upgrades would occasionally
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future
> versions, we need to implement a mechanism to prevent new-versioned nodes
> from writing new-versioned messages, which would appear as future-versioned
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish
> this. In that case, when upgrading from 2.3 to 2.4, you would set
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4)
> nodes would continue writing messages in the old (2.3) format. Thus, the
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce.
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok
> because all the members are running 2.4 at this point and can decode these
> messages, even if they are still configured to write with version 2.3.
> After the second rolling bounce, the whole cluster is both running 2.4 and
> writing with the 2.4 format.
> Part 2:
> Managing two rolling bounces can be difficult, so it is also desirable to
> implement a mechanism for automatically negotiating the schema version
> internally.
> In fact, this is already present in Streams, and it is called "version
> probing". Right now, version probing is used to enable the exact same kind of
> transition from an old-message-format to a new-message-format when both old
> and new members are in the cluster, but it is only used for the assignment
> protocol messages (i.e., the formats of the subscription and assignment
> messages that group members send to each other).
> We can expand the "version probing" version from "assignment protocol
> version" to "general protocol version". Then, when the cluster contains
> mixed-versioned members, the entire cluster will only write changelog (and
> repartition) messages with the protocol version of the oldest-versioned
> member.
> With that in place, you would never need to specify UPGRADE_FROM. You'd
> simply perform rolling upgrades, and Streams would internally negotiate the
> right protocol/schema versions to write such that all running members can
> decode them at all times.
> Part 3:
> Although Part 2 is sufficient to ensure rolling upgrades, it does not allow
> for downgrades. If you upgrade your whole cluster to 2.4, then later decide
> you want to go back to 2.3, you will find that the 2.3-versioned nodes crash
> when attempting to decode changelog messages that had previously been written
> by 2.4 nodes. Since the changelog messages are by design durable
> indefinitely, this effectively prevents ever downgrading.
> To solve this last problem, I propose that, although we don't require
> UPGRADE_FROM, we still allow it. Specifying UPGRADE_FROM=2.3 would cause
> new-versioned members to set their "max protocol version" in the assignment
> protocol to 2.3, so version probing would never let the members upgrade their
> message formats to 2.4. You could run 2.4 as long as you want with
> UPGRADE_FROM set to 2.3. If any issues arise, you could still downgrade the
> application to version 2.3.
> Once you're satisfied that 2.4 is working and you won't want to downgrade
> anymore, you would remove the UPGRADE_FROM config and bounce again. Now, the
> members will be free to start writing with the latest message format.
> Notes:
> * No KIP is required, since all the needed mechanisms are already present
> * As part of completing this work, we should enable rolling upgrade tests in
> the streams_application_upgrade_test.py
--
This message was sent by Atlassian Jira
(v8.3.4#803005)