[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169038#comment-17169038 ]
Sophie Blee-Goldman commented on KAFKA-10336: --------------------------------------------- Thanks for the detailed bug report. Quick question – are you sure this is only possible when standbys are enabled? It seems like you could end up in a situation where a suppression task ends up on an upgraded instance, which then completes restoration and starts writing to the changelog with the newer protocol. Then during a subsequent rebalance of the rolling upgrade, this task gets migrated back to an old instance, tries to read the new protocol version, and dies. There's some consolation here: this case is probably pretty rare, at least relative to the case with standbys enabled > Rolling upgrade with Suppression AND Standbys may throw exceptions > ------------------------------------------------------------------ > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 > Reporter: John Roesler > Priority: Blocker > Fix For: 2.7.0 > > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) > nodes would continue writing messages in the old (2.3) format. Thus, the > still-running old nodes will still be able to read them. > Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. > Post-bounce, the nodes would start writing in the 2.4 format, which is ok > because all the members are running 2.4 at this point and can decode these > messages, even if they are still configured to write with version 2.3. > After the second rolling bounce, the whole cluster is both running 2.4 and > writing with the 2.4 format. > Part 2: > Managing two rolling bounces can be difficult, so it is also desirable to > implement a mechanism for automatically negotiating the schema version > internally. > In fact, this is already present in Streams, and it is called "version > probing". Right now, version probing is used to enable the exact same kind of > transition from an old-message-format to a new-message-format when both old > and new members are in the cluster, but it is only used for the assignment > protocol messages (i.e., the formats of the subscription and assignment > messages that group members send to each other). > We can expand the "version probing" version from "assignment protocol > version" to "general protocol version". Then, when the cluster contains > mixed-versioned members, the entire cluster will only write changelog (and > repartition) messages with the protocol version of the oldest-versioned > member. > With that in place, you would never need to specify UPGRADE_FROM. You'd > simply perform rolling upgrades, and Streams would internally negotiate the > right protocol/schema versions to write such that all running members can > decode them at all times. > Part 3: > Although Part 2 is sufficient to ensure rolling upgrades, it does not allow > for downgrades. If you upgrade your whole cluster to 2.4, then later decide > you want to go back to 2.3, you will find that the 2.3-versioned nodes crash > when attempting to decode changelog messages that had previously been written > by 2.4 nodes. Since the changelog messages are by design durable > indefinitely, this effectively prevents ever downgrading. > To solve this last problem, I propose that, although we don't require > UPGRADE_FROM, we still allow it. Specifying UPGRADE_FROM=2.3 would cause > new-versioned members to set their "max protocol version" in the assignment > protocol to 2.3, so version probing would never let the members upgrade their > message formats to 2.4. You could run 2.4 as long as you want with > UPGRADE_FROM set to 2.3. If any issues arise, you could still downgrade the > application to version 2.3. > Once you're satisfied that 2.4 is working and you won't want to downgrade > anymore, you would remove the UPGRADE_FROM config and bounce again. Now, the > members will be free to start writing with the latest message format. > Notes: > * No KIP is required, since all the needed mechanisms are already present > * As part of completing this work, we should enable rolling upgrade tests in > the streams_application_upgrade_test.py -- This message was sent by Atlassian Jira (v8.3.4#803005)