rondagostino commented on code in PR #14206:
URL: https://github.com/apache/kafka/pull/14206#discussion_r1338985779
##########
metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java:
##########
@@ -102,14 +115,18 @@ public class ClusterImageTest {
IMAGE1 = new ClusterImage(map1, cmap1);
DELTA1_RECORDS = new ArrayList<>();
+ // unfence b0
DELTA1_RECORDS.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
setId(0).setEpoch(1000),
UNFENCE_BROKER_RECORD.highestSupportedVersion()));
+ // fence b1
DELTA1_RECORDS.add(new ApiMessageAndVersion(new FenceBrokerRecord().
setId(1).setEpoch(1001),
FENCE_BROKER_RECORD.highestSupportedVersion()));
+ // mark b0 in controlled shutdown
DELTA1_RECORDS.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
setBrokerId(0).setBrokerEpoch(1000).setInControlledShutdown(
-
BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
- (short) 0));
+
BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()),
+ FENCE_BROKER_RECORD.highestSupportedVersion()));
Review Comment:
`s/FENCE_BROKER_RECORD/BROKER_REGISTRATION_CHANGE_RECORD/`
##########
metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java:
##########
@@ -61,26 +62,34 @@ public class ConfigurationsImageTest {
IMAGE1 = new ConfigurationsImage(map1);
DELTA1_RECORDS = new ArrayList<>();
+ // remove configs
DELTA1_RECORDS.add(new ApiMessageAndVersion(new
ConfigRecord().setResourceType(BROKER.id()).
setResourceName("0").setName("foo").setValue(null),
CONFIG_RECORD.highestSupportedVersion()));
+ DELTA1_RECORDS.add(new ApiMessageAndVersion(new
ConfigRecord().setResourceType(BROKER.id()).
+ setResourceName("0").setName("baz").setValue(null),
+ CONFIG_RECORD.highestSupportedVersion()));
+ DELTA1_RECORDS.add(new ApiMessageAndVersion(new
ConfigRecord().setResourceType(BROKER.id()).
+ setResourceName("1").setName("foobar").setValue(null),
+ CONFIG_RECORD.highestSupportedVersion()));
+ // add back config to b1
Review Comment:
`s#// add back config to b1#// add new config to b1#`
##########
metadata/src/test/java/org/apache/kafka/image/FeaturesImageTest.java:
##########
@@ -96,6 +121,20 @@ public void testImage2RoundTrip() {
testToImage(IMAGE2);
}
+ @Test
+ public void testImage3RoundTrip() {
+ testToImage(IMAGE3);
+ }
+
+ @Test
+ public void testApplyDelta2() {
+ assertEquals(IMAGE3, DELTA2.apply());
+ // check image2 + delta2 = image3, since records for image2 + delta2
might differ from records from image3
+ List<ApiMessageAndVersion> records = getImageRecords(IMAGE3);
Review Comment:
`s/IMAGE3/IMAGE2`
I wonder why this test is still passing with this mistake? Probably not a
big deal, but still might be good to satisfy yourself that it makes sense that
it would pass like this.
##########
metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java:
##########
@@ -156,6 +173,66 @@ public class ClusterImageTest {
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT,
"localhost", 19093))).
setSupportedFeatures(Collections.emptyMap()).build());
IMAGE2 = new ClusterImage(map2, cmap2);
+
+ DELTA2_RECORDS = new ArrayList<>(DELTA1_RECORDS);
+ // fence b0
+ DELTA2_RECORDS.add(new ApiMessageAndVersion(new FenceBrokerRecord().
+ setId(0).setEpoch(1000),
FENCE_BROKER_RECORD.highestSupportedVersion()));
+ // unfence b1
+ DELTA2_RECORDS.add(new ApiMessageAndVersion(new UnfenceBrokerRecord().
+ setId(1).setEpoch(1001),
UNFENCE_BROKER_RECORD.highestSupportedVersion()));
+ // mark b0 as not in controlled shutdown
+ DELTA2_RECORDS.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
+ setBrokerId(0).setBrokerEpoch(1000).setInControlledShutdown(
+ BrokerRegistrationInControlledShutdownChange.NONE.value()),
+ FENCE_BROKER_RECORD.highestSupportedVersion()));
Review Comment:
`s/FENCE_BROKER_RECORD/BROKER_REGISTRATION_CHANGE_RECORD/`
##########
metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java:
##########
@@ -176,22 +200,30 @@ private PartitionRegistration newPartition(int[]
replicas) {
public void testBasicLocalChanges() {
int localId = 3;
/* Changes already include in DELTA1_RECORDS and IMAGE1:
- * foo - topic id deleted
+ * foo - topic id deleted then recreated with different topic id
* bar-0 - stay as follower with different partition epoch
* baz-0 - new topic to leader
+ * bam - topic id created then deleted
*/
List<ApiMessageAndVersion> topicRecords = new
ArrayList<>(DELTA1_RECORDS);
- // Create a new foo topic with a different id
- Uuid newFooId = Uuid.fromString("b66ybsWIQoygs01vdjH07A");
+ // Attempt to create a new foo topic with a different id and replica
placement
+ topicRecords.add(
+ new ApiMessageAndVersion(
+ new TopicRecord().setName("foo").setTopicId(FOO_UUID3),
+ TOPIC_RECORD.highestSupportedVersion()
+ )
+ );
+ topicRecords.add(newPartitionRecord(FOO_UUID3, 0, Arrays.asList(0, 1,
localId)));
Review Comment:
Could you explain this? We already have a record creating a topic `foo`
with `FOO_UUID2`. I would expect this new record to not be generated by the
controller since a topic with that name already exists. So while I agree that
`TopicsDelta` currently will replay it successfully, I'm not sure we should be
testing what happens when it does since we never expect this to happen. In
fact, it could be argued that the delta should not replay it. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]