AndrewJSchofield commented on code in PR #19443: URL: https://github.com/apache/kafka/pull/19443#discussion_r2048505136
########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java: ########## @@ -1357,6 +1358,350 @@ public void testInitializePartitionIdNonExistentInMetadataImage() { verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0)); } + @Test + public void testSnapshotColdPartitionsNoEligiblePartitions() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + int offset = 0; + int producerId = 0; + short producerEpoch = 0; + int leaderEpoch = 0; + + long timestamp = TIME.milliseconds(); + + CoordinatorRecord record1 = CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ); + + CoordinatorRecord record2 = CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(1), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ); + + shard.replay(offset, producerId, producerEpoch, record1); + shard.replay(offset + 1, producerId, producerEpoch, record2); + + assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, 0))); + assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, 1))); + + TIME.sleep(5000); // Less than config. + + assertEquals(0, shard.snapshotColdPartitions().records().size()); + } + + @Test + public void testSnapshotColdPartitionsSnapshotUpdateNotConsidered() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + int offset = 0; + int producerId = 0; + short producerEpoch = 0; + int leaderEpoch = 0; + + long timestamp = TIME.milliseconds(); + + CoordinatorRecord record1 = CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ); + + SharePartitionKey key = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, 0); + + shard.replay(offset, producerId, producerEpoch, record1); + assertNotNull(shard.getShareStateMapValue(key)); + + long sleep = 12000; + TIME.sleep(sleep); + + List<CoordinatorRecord> expectedRecords = List.of( + CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(1) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp + sleep) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ) + ); + + assertEquals(expectedRecords, shard.snapshotColdPartitions().records()); + + shard.replay(offset + 1, producerId, producerEpoch, expectedRecords.get(0)); + assertNotNull(shard.getShareStateMapValue(key)); + + CoordinatorRecord record2 = CoordinatorRecord.record( + new ShareUpdateKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareUpdateValue() + .setSnapshotEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setStateBatches(List.of( + new ShareUpdateValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ); + + shard.replay(offset + 2, producerId, producerEpoch, record2); + + TIME.sleep(sleep); + + assertNotNull(shard.getShareStateMapValue(key)); + assertEquals(timestamp + sleep, shard.getShareStateMapValue(key).writeTimestamp()); // No snapshot since update has no time info. + } + + @Test + public void testSnapshotColdPartitionsDoesNotPerpetuallySnapshot() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + int offset = 0; + int producerId = 0; + short producerEpoch = 0; + int leaderEpoch = 0; + + long timestamp = TIME.milliseconds(); + + CoordinatorRecord record1 = CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ); + + shard.replay(offset, producerId, producerEpoch, record1); + assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, 0))); + + long sleep = 12000; + TIME.sleep(sleep); + + List<CoordinatorRecord> expectedRecords = List.of( + CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(1) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp + sleep) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ) + ); + + assertEquals(expectedRecords, shard.snapshotColdPartitions().records()); + + shard.replay(offset + 1, producerId, producerEpoch, expectedRecords.get(0)); + assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, 0))); + + // Since all existing snapshots are already snapshotted, no new records will be created. + TIME.sleep(12000); + + assertEquals(0, shard.snapshotColdPartitions().records().size()); + } + + @Test + public void testSnapshotColdPartitionsPartialEligiblePartitions() { + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + MetadataImage image = mock(MetadataImage.class); + shard.onNewMetadataImage(image, null); + int offset = 0; + int producerId = 0; + short producerEpoch = 0; + int leaderEpoch = 0; + SharePartitionKey key0 = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, 0); + SharePartitionKey key1 = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, 1); + + long timestamp = TIME.milliseconds(); + int record1SnapshotEpoch = 0; + + CoordinatorRecord record1 = CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(record1SnapshotEpoch) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ); + + long delta = 15000; // 15 seconds + + CoordinatorRecord record2 = CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(1), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(0) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp + delta) + .setWriteTimestamp(timestamp + delta) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ); + + shard.replay(offset, producerId, producerEpoch, record1); + shard.replay(offset + 1, producerId, producerEpoch, record2); + + assertNotNull(shard.getShareStateMapValue(key0)); + assertNotNull(shard.getShareStateMapValue(key1)); + assertEquals(timestamp, shard.getShareStateMapValue(key0).writeTimestamp()); + assertEquals(timestamp + delta, shard.getShareStateMapValue(key1).writeTimestamp()); + + long sleep = 12000; + TIME.sleep(sleep); // Record 1 is eligible now. + + List<CoordinatorRecord> expectedRecords = List.of( + CoordinatorRecord.record( + new ShareSnapshotKey() + .setGroupId(GROUP_ID) + .setTopicId(TOPIC_ID) + .setPartition(0), + new ApiMessageAndVersion( + new ShareSnapshotValue() + .setSnapshotEpoch(record1SnapshotEpoch + 1) + .setStateEpoch(0) + .setLeaderEpoch(leaderEpoch) + .setCreateTimestamp(timestamp) + .setWriteTimestamp(timestamp + sleep) + .setStateBatches(List.of( + new ShareSnapshotValue.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))), + (short) 0 + ) + ) + ); + + List<CoordinatorRecord> records = shard.snapshotColdPartitions().records(); + assertEquals(expectedRecords, records); + + shard.replay(offset + 2, producerId, producerEpoch, records.get(0)); + + assertEquals(timestamp + delta, shard.getShareStateMapValue(key1).writeTimestamp()); + } Review Comment: And I suppose you should `assertEquals(timestamp + sleep, shard.getShareStateMapValue(key0).writeTimestamp())` also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org