clolov commented on code in PR #18712: URL: https://github.com/apache/kafka/pull/18712#discussion_r1930629086
########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java: ########## @@ -957,6 +961,144 @@ public void testReadStateLeaderEpochUpdateNoUpdate() { verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); } + @Test + public void testDeleteStateSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); + + // apply a record in to verify delete + CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, + TOPIC_ID, + PARTITION, + new ShareGroupOffset.Builder() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(0) + .setStateBatches(List.of( + new PersisterStateBatch( + 0, + 10, + (byte) 0, + (short) 1 + ) + ) + ) + .build() + ); + shard.replay(0L, 0L, (short) 0, record); + assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List<CoordinatorRecord> expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateFirstRecordDeleteSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List<CoordinatorRecord> expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateInvalidRequestData() { Review Comment: Is it easy for you to also test the condition `topicId == null`? Or is it tested elsewhere and I have missed it? ########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java: ########## @@ -957,6 +961,144 @@ public void testReadStateLeaderEpochUpdateNoUpdate() { verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); } + @Test + public void testDeleteStateSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); + + // apply a record in to verify delete + CoordinatorRecord record = ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, + TOPIC_ID, + PARTITION, + new ShareGroupOffset.Builder() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(0) + .setStateBatches(List.of( + new PersisterStateBatch( + 0, + 10, + (byte) 0, + (short) 1 + ) + ) + ) + .build() + ); + shard.replay(0L, 0L, (short) 0, record); + assertNotNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List<CoordinatorRecord> expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateFirstRecordDeleteSuccess() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION))))); + + CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + + // apply tombstone + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List<CoordinatorRecord> expectedRecords = List.of( + ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord( + GROUP_ID, TOPIC_ID, PARTITION) + ); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertNull(shard.getShareStateMapValue(shareCoordinatorKey)); + assertNull(shard.getLeaderMapValue(shareCoordinatorKey)); + assertNull(shard.getStateEpochMapValue(shareCoordinatorKey)); + } + + @Test + public void testDeleteStateInvalidRequestData() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + + int partition = -1; + + DeleteShareGroupStateRequestData request = new DeleteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new DeleteShareGroupStateRequestData.PartitionData() + .setPartition(partition))))); + + CoordinatorResult<DeleteShareGroupStateResponseData, CoordinatorRecord> result = shard.deleteState(request); + + DeleteShareGroupStateResponseData expectedData = DeleteShareGroupStateResponse.toErrorResponseData( + TOPIC_ID, partition, Errors.INVALID_REQUEST, ShareCoordinatorShard.NEGATIVE_PARTITION_ID.getMessage()); + List<CoordinatorRecord> expectedRecords = List.of(); + + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + } + + @Test + public void testDeleteNullMetadataImage() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + shard.onNewMetadataImage(null, null); Review Comment: As far as I can tell this tests the case where the `metadataImage == null`, is it easy for you to test the `metadataImage.topics().getTopic(topicId) == null || metadataImage.topics().getPartition(topicId, partitionId) == null` condition as well? ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -10513,6 +10513,96 @@ class KafkaApisTest extends Logging { }) } + @Test Review Comment: Is it easy to test the case where the share coordinator is not enabled? Or is this exercised someplace else which I have missed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org