[ 
https://issues.apache.org/jira/browse/KAFKA-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8017.
------------------------------------
    Resolution: Invalid

The streams-broker-upgrade test was delete at some point in the past. Closing 
this ticket.

> Narrow the scope of Streams' broker-upgrade-test
> ------------------------------------------------
>
>                 Key: KAFKA-8017
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8017
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Priority: Major
>
> We had a streams-broker-upgrade test in which we kept the streams client as 
> the dev version, and upgrade/downgrade brokers between arbitrary versions. 
> This has several issues:
> 1) not all upgrade / downgrade paths are supported due to message format 
> change.
> 2) even for those supported paths, we should consider the impact of 
> inter.broker.protocol and message.format. More specifically: when upgrade to 
> new version byte code, we should stick with the old protocol/version, when 
> down grade to old version byte code, we should start with the old 
> protocol/version.
> A good reference to look at is the broker's own upgrade path where they 
> listed all the possible path so far:
> {code}
>     @parametrize(from_kafka_version=str(LATEST_1_1), 
> to_message_format_version=None, compression_types=["none"])
>     @parametrize(from_kafka_version=str(LATEST_1_1), 
> to_message_format_version=None, compression_types=["lz4"])
>     @parametrize(from_kafka_version=str(LATEST_1_0), 
> to_message_format_version=None, compression_types=["none"])
>     @parametrize(from_kafka_version=str(LATEST_1_0), 
> to_message_format_version=None, compression_types=["snappy"])
>     @parametrize(from_kafka_version=str(LATEST_0_11_0), 
> to_message_format_version=None, compression_types=["gzip"])
>     @parametrize(from_kafka_version=str(LATEST_0_11_0), 
> to_message_format_version=None, compression_types=["lz4"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_2), 
> to_message_format_version=str(LATEST_0_9), compression_types=["none"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_2), 
> to_message_format_version=str(LATEST_0_10), compression_types=["snappy"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_2), 
> to_message_format_version=None, compression_types=["none"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_2), 
> to_message_format_version=None, compression_types=["lz4"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_1), 
> to_message_format_version=None, compression_types=["lz4"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_1), 
> to_message_format_version=None, compression_types=["snappy"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_0), 
> to_message_format_version=None, compression_types=["snappy"])
>     @parametrize(from_kafka_version=str(LATEST_0_10_0), 
> to_message_format_version=None, compression_types=["lz4"])
>     @cluster(num_nodes=7)
>     @parametrize(from_kafka_version=str(LATEST_0_9), 
> to_message_format_version=None, compression_types=["none"], 
> security_protocol="SASL_SSL")
>     @cluster(num_nodes=6)
>     @parametrize(from_kafka_version=str(LATEST_0_9), 
> to_message_format_version=None, compression_types=["snappy"])
>     @parametrize(from_kafka_version=str(LATEST_0_9), 
> to_message_format_version=None, compression_types=["lz4"])
>     @parametrize(from_kafka_version=str(LATEST_0_9), 
> to_message_format_version=str(LATEST_0_9), compression_types=["none"])
>     @parametrize(from_kafka_version=str(LATEST_0_9), 
> to_message_format_version=str(LATEST_0_9), compression_types=["lz4"])
>     @cluster(num_nodes=7)
>     @parametrize(from_kafka_version=str(LATEST_0_8_2), 
> to_message_format_version=None, compression_types=["none"])
>     @parametrize(from_kafka_version=str(LATEST_0_8_2), 
> to_message_format_version=None, compression_types=["snappy"])
>     def test_upgrade(self, from_kafka_version, to_message_format_version, 
> compression_types,
>                      security_protocol="PLAINTEXT"):
> {code}
> And their upgrade code is:
> {code}
> def perform_upgrade(self, from_kafka_version, to_message_format_version=None):
>         self.logger.info("First pass bounce - rolling upgrade")
>         for node in self.kafka.nodes:
>             self.kafka.stop_node(node)
>             node.version = DEV_BRANCH
>             node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = 
> from_kafka_version
>             node.config[config_property.MESSAGE_FORMAT_VERSION] = 
> from_kafka_version
>             self.kafka.start_node(node)
>         self.logger.info("Second pass bounce - remove 
> inter.broker.protocol.version config")
>         for node in self.kafka.nodes:
>             self.kafka.stop_node(node)
>             del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
>             if to_message_format_version is None:
>                 del node.config[config_property.MESSAGE_FORMAT_VERSION]
>             else:
>                 node.config[config_property.MESSAGE_FORMAT_VERSION] = 
> to_message_format_version
>             self.kafka.start_node(node)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to