smjn commented on code in PR #18712: URL: https://github.com/apache/kafka/pull/18712#discussion_r1930735955
########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java: ########## @@ -957,6 +961,144 @@ public void testReadStateLeaderEpochUpdateNoUpdate() { verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); } + @Test + public void testDeleteStateSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); + + // apply a record in to verify delete + CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, + TOPIC_ID, + PARTITION, + new ShareGroupOffset.Builder() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(0) + .setStateBatches(List.of( + new PersisterStateBatch( + 0, + 10, + (byte) 0, + (short) 1 + ) + ) + ) + .build() + ); + shard.replay(0L, 0L, (short) 0, record); + assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List<CoordinatorRecord> expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateFirstRecordDeleteSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List<CoordinatorRecord> expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateInvalidRequestData() { Review Comment: At code level it is not possible to pass in the topicId as null since the the call flow is: DefaultStatePersister -> PersisterStateManager -> KafkaApis -> ShareCoordinatorService -> ShareCoordinatorShard. The DefaultStatePersister is an implementation of the Persister interface which is used to communicate with the share coordinator via kafkaApis. This way callers are completely oblivious of the share coordinator. It can be easily swapped for some other abstraction. Furthermore, this is an inter broker RPC which means external clients cannot directly invoke it via kafkaAdminClient. The topicId is null protected in PersisterStateManager. For this new DeleteShareGroupState - the PersisterStateManager code has not been added yet. But we will be creating a new inner class over there which will extend PersisterStateManager.PersisterStateManagerHandler whose constructor takes care of doing the null check and preventing this scenario. Hence, it is not explicitly tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org