[ https://issues.apache.org/jira/browse/KAFKA-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-8017. ------------------------------------ Resolution: Invalid The streams-broker-upgrade test was delete at some point in the past. Closing this ticket. > Narrow the scope of Streams' broker-upgrade-test > ------------------------------------------------ > > Key: KAFKA-8017 > URL: https://issues.apache.org/jira/browse/KAFKA-8017 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Priority: Major > > We had a streams-broker-upgrade test in which we kept the streams client as > the dev version, and upgrade/downgrade brokers between arbitrary versions. > This has several issues: > 1) not all upgrade / downgrade paths are supported due to message format > change. > 2) even for those supported paths, we should consider the impact of > inter.broker.protocol and message.format. More specifically: when upgrade to > new version byte code, we should stick with the old protocol/version, when > down grade to old version byte code, we should start with the old > protocol/version. > A good reference to look at is the broker's own upgrade path where they > listed all the possible path so far: > {code} > @parametrize(from_kafka_version=str(LATEST_1_1), > to_message_format_version=None, compression_types=["none"]) > @parametrize(from_kafka_version=str(LATEST_1_1), > to_message_format_version=None, compression_types=["lz4"]) > @parametrize(from_kafka_version=str(LATEST_1_0), > to_message_format_version=None, compression_types=["none"]) > @parametrize(from_kafka_version=str(LATEST_1_0), > to_message_format_version=None, compression_types=["snappy"]) > @parametrize(from_kafka_version=str(LATEST_0_11_0), > to_message_format_version=None, compression_types=["gzip"]) > @parametrize(from_kafka_version=str(LATEST_0_11_0), > to_message_format_version=None, compression_types=["lz4"]) > @parametrize(from_kafka_version=str(LATEST_0_10_2), > to_message_format_version=str(LATEST_0_9), compression_types=["none"]) > @parametrize(from_kafka_version=str(LATEST_0_10_2), > to_message_format_version=str(LATEST_0_10), compression_types=["snappy"]) > @parametrize(from_kafka_version=str(LATEST_0_10_2), > to_message_format_version=None, compression_types=["none"]) > @parametrize(from_kafka_version=str(LATEST_0_10_2), > to_message_format_version=None, compression_types=["lz4"]) > @parametrize(from_kafka_version=str(LATEST_0_10_1), > to_message_format_version=None, compression_types=["lz4"]) > @parametrize(from_kafka_version=str(LATEST_0_10_1), > to_message_format_version=None, compression_types=["snappy"]) > @parametrize(from_kafka_version=str(LATEST_0_10_0), > to_message_format_version=None, compression_types=["snappy"]) > @parametrize(from_kafka_version=str(LATEST_0_10_0), > to_message_format_version=None, compression_types=["lz4"]) > @cluster(num_nodes=7) > @parametrize(from_kafka_version=str(LATEST_0_9), > to_message_format_version=None, compression_types=["none"], > security_protocol="SASL_SSL") > @cluster(num_nodes=6) > @parametrize(from_kafka_version=str(LATEST_0_9), > to_message_format_version=None, compression_types=["snappy"]) > @parametrize(from_kafka_version=str(LATEST_0_9), > to_message_format_version=None, compression_types=["lz4"]) > @parametrize(from_kafka_version=str(LATEST_0_9), > to_message_format_version=str(LATEST_0_9), compression_types=["none"]) > @parametrize(from_kafka_version=str(LATEST_0_9), > to_message_format_version=str(LATEST_0_9), compression_types=["lz4"]) > @cluster(num_nodes=7) > @parametrize(from_kafka_version=str(LATEST_0_8_2), > to_message_format_version=None, compression_types=["none"]) > @parametrize(from_kafka_version=str(LATEST_0_8_2), > to_message_format_version=None, compression_types=["snappy"]) > def test_upgrade(self, from_kafka_version, to_message_format_version, > compression_types, > security_protocol="PLAINTEXT"): > {code} > And their upgrade code is: > {code} > def perform_upgrade(self, from_kafka_version, to_message_format_version=None): > self.logger.info("First pass bounce - rolling upgrade") > for node in self.kafka.nodes: > self.kafka.stop_node(node) > node.version = DEV_BRANCH > node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = > from_kafka_version > node.config[config_property.MESSAGE_FORMAT_VERSION] = > from_kafka_version > self.kafka.start_node(node) > self.logger.info("Second pass bounce - remove > inter.broker.protocol.version config") > for node in self.kafka.nodes: > self.kafka.stop_node(node) > del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] > if to_message_format_version is None: > del node.config[config_property.MESSAGE_FORMAT_VERSION] > else: > node.config[config_property.MESSAGE_FORMAT_VERSION] = > to_message_format_version > self.kafka.start_node(node) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)