AndrewJSchofield commented on code in PR #19443:
URL: https://github.com/apache/kafka/pull/19443#discussion_r2048505136


##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -1357,6 +1358,350 @@ public void 
testInitializePartitionIdNonExistentInMetadataImage() {
         verify(topicsImage, times(1)).getPartition(eq(TOPIC_ID), eq(0));
     }
 
+    @Test
+    public void testSnapshotColdPartitionsNoEligiblePartitions() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+
+        long timestamp = TIME.milliseconds();
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        CoordinatorRecord record2 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(1),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        shard.replay(offset + 1, producerId, producerEpoch, record2);
+
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 0)));
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 1)));
+
+        TIME.sleep(5000);   // Less than config.
+
+        assertEquals(0, shard.snapshotColdPartitions().records().size());
+    }
+
+    @Test
+    public void testSnapshotColdPartitionsSnapshotUpdateNotConsidered() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+
+        long timestamp = TIME.milliseconds();
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        SharePartitionKey key = SharePartitionKey.getInstance(GROUP_ID, 
TOPIC_ID, 0);
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        assertNotNull(shard.getShareStateMapValue(key));
+
+        long sleep = 12000;
+        TIME.sleep(sleep);
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            CoordinatorRecord.record(
+                new ShareSnapshotKey()
+                    .setGroupId(GROUP_ID)
+                    .setTopicId(TOPIC_ID)
+                    .setPartition(0),
+                new ApiMessageAndVersion(
+                    new ShareSnapshotValue()
+                        .setSnapshotEpoch(1)
+                        .setStateEpoch(0)
+                        .setLeaderEpoch(leaderEpoch)
+                        .setCreateTimestamp(timestamp)
+                        .setWriteTimestamp(timestamp + sleep)
+                        .setStateBatches(List.of(
+                            new ShareSnapshotValue.StateBatch()
+                                .setFirstOffset(0)
+                                .setLastOffset(10)
+                                .setDeliveryCount((short) 1)
+                                .setDeliveryState((byte) 0))),
+                    (short) 0
+                )
+            )
+        );
+
+        assertEquals(expectedRecords, 
shard.snapshotColdPartitions().records());
+
+        shard.replay(offset + 1, producerId, producerEpoch, 
expectedRecords.get(0));
+        assertNotNull(shard.getShareStateMapValue(key));
+
+        CoordinatorRecord record2 = CoordinatorRecord.record(
+            new ShareUpdateKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareUpdateValue()
+                    .setSnapshotEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setStateBatches(List.of(
+                        new ShareUpdateValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset + 2, producerId, producerEpoch, record2);
+
+        TIME.sleep(sleep);
+
+        assertNotNull(shard.getShareStateMapValue(key));
+        assertEquals(timestamp + sleep, 
shard.getShareStateMapValue(key).writeTimestamp()); // No snapshot since update 
has no time info.
+    }
+
+    @Test
+    public void testSnapshotColdPartitionsDoesNotPerpetuallySnapshot() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+
+        long timestamp = TIME.milliseconds();
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 0)));
+
+        long sleep = 12000;
+        TIME.sleep(sleep);
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            CoordinatorRecord.record(
+                new ShareSnapshotKey()
+                    .setGroupId(GROUP_ID)
+                    .setTopicId(TOPIC_ID)
+                    .setPartition(0),
+                new ApiMessageAndVersion(
+                    new ShareSnapshotValue()
+                        .setSnapshotEpoch(1)
+                        .setStateEpoch(0)
+                        .setLeaderEpoch(leaderEpoch)
+                        .setCreateTimestamp(timestamp)
+                        .setWriteTimestamp(timestamp + sleep)
+                        .setStateBatches(List.of(
+                            new ShareSnapshotValue.StateBatch()
+                                .setFirstOffset(0)
+                                .setLastOffset(10)
+                                .setDeliveryCount((short) 1)
+                                .setDeliveryState((byte) 0))),
+                    (short) 0
+                )
+            )
+        );
+
+        assertEquals(expectedRecords, 
shard.snapshotColdPartitions().records());
+
+        shard.replay(offset + 1, producerId, producerEpoch, 
expectedRecords.get(0));
+        
assertNotNull(shard.getShareStateMapValue(SharePartitionKey.getInstance(GROUP_ID,
 TOPIC_ID, 0)));
+
+        // Since all existing snapshots are already snapshotted, no new 
records will be created.
+        TIME.sleep(12000);
+
+        assertEquals(0, shard.snapshotColdPartitions().records().size());
+    }
+
+    @Test
+    public void testSnapshotColdPartitionsPartialEligiblePartitions() {
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        MetadataImage image = mock(MetadataImage.class);
+        shard.onNewMetadataImage(image, null);
+        int offset = 0;
+        int producerId = 0;
+        short producerEpoch = 0;
+        int leaderEpoch = 0;
+        SharePartitionKey key0 = SharePartitionKey.getInstance(GROUP_ID, 
TOPIC_ID, 0);
+        SharePartitionKey key1 = SharePartitionKey.getInstance(GROUP_ID, 
TOPIC_ID, 1);
+
+        long timestamp = TIME.milliseconds();
+        int record1SnapshotEpoch = 0;
+
+        CoordinatorRecord record1 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(0),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(record1SnapshotEpoch)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp)
+                    .setWriteTimestamp(timestamp)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        long delta = 15000; // 15 seconds
+
+        CoordinatorRecord record2 = CoordinatorRecord.record(
+            new ShareSnapshotKey()
+                .setGroupId(GROUP_ID)
+                .setTopicId(TOPIC_ID)
+                .setPartition(1),
+            new ApiMessageAndVersion(
+                new ShareSnapshotValue()
+                    .setSnapshotEpoch(0)
+                    .setStateEpoch(0)
+                    .setLeaderEpoch(leaderEpoch)
+                    .setCreateTimestamp(timestamp + delta)
+                    .setWriteTimestamp(timestamp + delta)
+                    .setStateBatches(List.of(
+                        new ShareSnapshotValue.StateBatch()
+                            .setFirstOffset(0)
+                            .setLastOffset(10)
+                            .setDeliveryCount((short) 1)
+                            .setDeliveryState((byte) 0))),
+                (short) 0
+            )
+        );
+
+        shard.replay(offset, producerId, producerEpoch, record1);
+        shard.replay(offset + 1, producerId, producerEpoch, record2);
+
+        assertNotNull(shard.getShareStateMapValue(key0));
+        assertNotNull(shard.getShareStateMapValue(key1));
+        assertEquals(timestamp, 
shard.getShareStateMapValue(key0).writeTimestamp());
+        assertEquals(timestamp + delta, 
shard.getShareStateMapValue(key1).writeTimestamp());
+
+        long sleep = 12000;
+        TIME.sleep(sleep);  // Record 1 is eligible now.
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            CoordinatorRecord.record(
+                new ShareSnapshotKey()
+                    .setGroupId(GROUP_ID)
+                    .setTopicId(TOPIC_ID)
+                    .setPartition(0),
+                new ApiMessageAndVersion(
+                    new ShareSnapshotValue()
+                        .setSnapshotEpoch(record1SnapshotEpoch + 1)
+                        .setStateEpoch(0)
+                        .setLeaderEpoch(leaderEpoch)
+                        .setCreateTimestamp(timestamp)
+                        .setWriteTimestamp(timestamp + sleep)
+                        .setStateBatches(List.of(
+                            new ShareSnapshotValue.StateBatch()
+                                .setFirstOffset(0)
+                                .setLastOffset(10)
+                                .setDeliveryCount((short) 1)
+                                .setDeliveryState((byte) 0))),
+                    (short) 0
+                )
+            )
+        );
+
+        List<CoordinatorRecord> records = 
shard.snapshotColdPartitions().records();
+        assertEquals(expectedRecords, records);
+
+        shard.replay(offset + 2, producerId, producerEpoch, records.get(0));
+
+        assertEquals(timestamp + delta, 
shard.getShareStateMapValue(key1).writeTimestamp());
+    }

Review Comment:
   And I suppose you should `assertEquals(timestamp + sleep, 
shard.getShareStateMapValue(key0).writeTimestamp())` also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to