clolov commented on code in PR #18712:
URL: https://github.com/apache/kafka/pull/18712#discussion_r1930629086


##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -957,6 +961,144 @@ public void testReadStateLeaderEpochUpdateNoUpdate() {
         
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testDeleteStateSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        // apply a record in to verify delete
+        CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID,
+            TOPIC_ID,
+            PARTITION,
+            new ShareGroupOffset.Builder()
+                .setSnapshotEpoch(0)
+                .setStateEpoch(0)
+                .setLeaderEpoch(0)
+                .setStateBatches(List.of(
+                        new PersisterStateBatch(
+                            0,
+                            10,
+                            (byte) 0,
+                            (short) 1
+                        )
+                    )
+                )
+                .build()
+        );
+        shard.replay(0L, 0L, (short) 0, record);
+        assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateFirstRecordDeleteSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+        
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateInvalidRequestData() {

Review Comment:
   Is it easy for you to also test the condition `topicId == null`? Or is it 
tested elsewhere and I have missed it?



##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -957,6 +961,144 @@ public void testReadStateLeaderEpochUpdateNoUpdate() {
         
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testDeleteStateSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        // apply a record in to verify delete
+        CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID,
+            TOPIC_ID,
+            PARTITION,
+            new ShareGroupOffset.Builder()
+                .setSnapshotEpoch(0)
+                .setStateEpoch(0)
+                .setLeaderEpoch(0)
+                .setStateBatches(List.of(
+                        new PersisterStateBatch(
+                            0,
+                            10,
+                            (byte) 0,
+                            (short) 1
+                        )
+                    )
+                )
+                .build()
+        );
+        shard.replay(0L, 0L, (short) 0, record);
+        assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateFirstRecordDeleteSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+        
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateInvalidRequestData() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        int partition = -1;
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(partition)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toErrorResponseData(
+            TOPIC_ID, partition, Errors.INVALID_REQUEST, 
ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage());
+        List<CoordinatorRecord> expectedRecords = List.of();
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testDeleteNullMetadataImage() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        shard.onNewMetadataImage(null, null);

Review Comment:
   As far as I can tell this tests the case where the `metadataImage == null`, 
is it easy for you to test the `metadataImage.topics().getTopic(topicId) == 
null || metadataImage.topics().getPartition(topicId, partitionId) == null` 
condition as well?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10513,6 +10513,96 @@ class KafkaApisTest extends Logging {
     })
   }
 
+  @Test

Review Comment:
   Is it easy to test the case where the share coordinator is not enabled? Or 
is this exercised someplace else which I have missed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to