smjn commented on code in PR #18712:
URL: https://github.com/apache/kafka/pull/18712#discussion_r1930735955


##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -957,6 +961,144 @@ public void testReadStateLeaderEpochUpdateNoUpdate() {
         
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void testDeleteStateSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        // apply a record in to verify delete
+        CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID,
+            TOPIC_ID,
+            PARTITION,
+            new ShareGroupOffset.Builder()
+                .setSnapshotEpoch(0)
+                .setStateEpoch(0)
+                .setLeaderEpoch(0)
+                .setStateBatches(List.of(
+                        new PersisterStateBatch(
+                            0,
+                            10,
+                            (byte) 0,
+                            (short) 1
+                        )
+                    )
+                )
+                .build()
+        );
+        shard.replay(0L, 0L, (short) 0, record);
+        assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateFirstRecordDeleteSuccess() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        DeleteShareGroupStateRequestData request = new 
DeleteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(Collections.singletonList(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(Collections.singletonList(new 
DeleteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)))));
+
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+        
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+
+        // apply tombstone
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = List.of(
+            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
+                GROUP_ID, TOPIC_ID, PARTITION)
+        );
+
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
+        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
+        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+    }
+
+    @Test
+    public void testDeleteStateInvalidRequestData() {

Review Comment:
   At code level it is not possible to pass in the topicId as null since the 
the call flow is:
   DefaultStatePersister -> PersisterStateManager -> KafkaApis -> 
ShareCoordinatorService -> ShareCoordinatorShard.
   
   The DefaultStatePersister is an implementation of the Persister interface 
which is used to communicate with the share coordinator via kafkaApis. This way 
callers are completely oblivious of the share coordinator. It can be easily 
swapped for some other abstraction.
   
   Furthermore, this is an inter broker RPC which means external clients cannot 
directly invoke it via kafkaAdminClient. The topicId is null protected in 
PersisterStateManager.
   
   For this new DeleteShareGroupState - the PersisterStateManager code has not 
been added yet. But we will be creating a new inner class over there which will 
extend PersisterStateManager.PersisterStateManagerHandler whose constructor 
takes care of doing the null check and preventing this scenario. Hence, it is 
not explicitly tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to