dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1348548710
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java:
##########
@@ -623,4 +674,33 @@ public void testReplayGroupMetadataWithNullValue() {
verify(groupMetadataManager, times(1)).replay(key, null);
}
+
+ @Test
+ public void testScheduleCleanupGroupMetadata() {
Review Comment:
Do we also need a test which verifies that the implementation does the right
steps?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -544,6 +579,100 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup
fetchAllOffsets(
.setTopics(topicResponses);
}
+ /**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit
tombstone records.
+ *
+ * @return True if no offsets exist or if all offsets expired, false
otherwise.
+ */
+ public boolean cleanupExpiredOffsets(String groupId, List<Record> records)
{
+ TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic = offsetsByGroup.get(groupId);
+ if (offsetsByTopic == null) {
+ return true;
+ }
+
+ // We expect the group to exist.
+ Group group = groupMetadataManager.group(groupId);
+ Set<TopicPartition> expiredPartitions = new HashSet<>();
+ long currentTimestamp = time.milliseconds();
+ Optional<OffsetExpirationCondition> offsetExpirationCondition =
group.offsetExpirationCondition();
+
+ if (!offsetExpirationCondition.isPresent()) {
+ return false;
+ }
+
+ AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+ OffsetExpirationCondition condition = offsetExpirationCondition.get();
+
+ offsetsByTopic.forEach((topic, partitions) -> {
+ if (!group.isSubscribedToTopic(topic, false)) {
+ partitions.forEach((partition, offsetAndMetadata) -> {
+ if (condition.isOffsetExpired(offsetAndMetadata,
currentTimestamp, config.offsetsRetentionMs)) {
+
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition,
records));
+ } else {
+ hasAllOffsetsExpired.set(false);
+ }
+ });
+ } else {
+ hasAllOffsetsExpired.set(false);
+ }
+ });
+
+ log.debug("[GroupId {}] Expiring offsets: {}", groupId,
expiredPartitions);
Review Comment:
I wonder if we should log this with info level. What do you think? We should
also format `expiredPartitions` correctly here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -417,6 +441,40 @@ public CoordinatorResult<OffsetDeleteResponseData, Record>
deleteOffsets(
return offsetMetadataManager.deleteOffsets(request);
}
+ /**
+ * For each group, remove all expired offsets. If all offsets for the
group is removed and the group is eligible
+ * for deletion, delete the group.
+ *
+ * @return The list of tombstones (offset commit and group metadata) to
append.
+ */
+ public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+ List<Record> records = new ArrayList<>();
+ groupMetadataManager.groupIds()
+ .forEach(groupId -> {
Review Comment:
nit: Let's bring `forEach` on the previous line.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List<Record>
records) {
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
}
+ @Override
+ public boolean isGroupEmpty() {
Review Comment:
nit: `isEmpty`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -282,6 +311,17 @@ public void commitOffset(
int partition,
long offset,
int leaderEpoch
+ ) {
+ commitOffset(groupId, topic, partition, offset, leaderEpoch,
time.milliseconds());
+
+ }
+ public void commitOffset(
Review Comment:
nit: Let's add an empty line before the method declaration.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupTest.java:
##########
@@ -1085,6 +1093,101 @@ public void testValidateDeleteGroup() {
assertThrows(GroupIdNotFoundException.class,
group::validateDeleteGroup);
}
+ @Test
+ public void testOffsetExpirationCondition() {
+ MockedStatic<OffsetMetadataManager> offsetMetadataManager =
mockStatic(OffsetMetadataManager.class);
+ long currentTimestamp = 30000L;
+ long commitTimestamp = 20000L;
+ long offsetsRetentionMs = 10000L;
+ OptionalLong expireTimestamp = OptionalLong.of(35000);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L,
OptionalInt.empty(), "", commitTimestamp, expireTimestamp);
+ MockTime time = new MockTime();
+ long currentStateTimestamp = time.milliseconds();
+ GenericGroup group = new GenericGroup(new LogContext(), "groupId",
EMPTY, time);
+
+ // 1. Test no protocol type. Simple consumer case, Base timestamp
based off of last commit timestamp.
+ Optional<OffsetMetadataManager.OffsetExpirationCondition> condition =
group.offsetExpirationCondition();
+ assertTrue(condition.isPresent());
+
+ condition.get().isOffsetExpired(
Review Comment:
Should we assert the returned value? This applies to others as well.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -265,6 +293,7 @@ public
List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(
return response.topics();
}
+
Review Comment:
nit: This empty line could be removed.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType
groupType) {
assertEquals(3, numDeleteOffsets);
}
+ @Test
+ public void testIsExpiredOffset() {
+ long currentTimestamp = 1000L;
+ long baseTimestamp = 500L;
+ OptionalLong expireTimestampMs = OptionalLong.of(1500);
+ long offsetsRetentionMs = 500L;
+
+ // Current timestamp >= expire timestamp => should expire
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Current timestamp < expire timestamp => should not expire
+ currentTimestamp = 499;
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Expire timestamp does not exist (current version with no per
partition retention)
+ // Current timestamp - base timestamp >= offsets retention => should
expire
+ expireTimestampMs = OptionalLong.empty();
+ currentTimestamp = 1000L;
+ assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Current timestamp - base timestamp < offsets retention => should
not expire
+ currentTimestamp = 999L;
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+ }
+
+ @Test
+ public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .build();
+
+ List<Record> records = new ArrayList<>();
+ assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
+ assertEquals(Collections.emptyList(), records);
+ }
+
+ @Test
+ public void testCleanupExpiredOffsetsGroupEmptyOffsetExpirationCondition()
{
Review Comment:
Is `GroupEmpty` correct here?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -70,47 +70,66 @@
import java.util.OptionalLong;
import static
org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET;
+import static
org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition.DEFAULT_OFFSET_EXPIRATION_CONDITION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class OffsetMetadataManagerTest {
static class OffsetMetadataManagerTestContext {
public static class Builder {
- final private MockTime time = new MockTime();
- final private MockCoordinatorTimer<Void, Record> timer = new
MockCoordinatorTimer<>(time);
- final private LogContext logContext = new LogContext();
- final private SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
+ private final MockTime time = new MockTime();
+ private final MockCoordinatorTimer<Void, Record> timer = new
MockCoordinatorTimer<>(time);
+ private final LogContext logContext = new LogContext();
+ private final SnapshotRegistry snapshotRegistry = new
SnapshotRegistry(logContext);
+ private GroupMetadataManager groupMetadataManager = null;
private MetadataImage metadataImage = null;
- private int offsetMetadataMaxSize = 4096;
+ private GroupCoordinatorConfig config = null;
Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
- this.offsetMetadataMaxSize = offsetMetadataMaxSize;
+ config =
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize,
60000L, 24 * 60 * 1000);
+ return this;
+ }
+
+ Builder withOffsetsRetentionMs(long offsetsRetentionMs) {
+ config =
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L,
offsetsRetentionMs);
Review Comment:
I am not a big fan of this because it won't scale if we add another params.
It seems to be that we should have a config builder in the test package with
sane default for tests. Then we could pass the config object here or
instantiate the default one. What do you think?
I am fine with doing this as a follow-up though.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType
groupType) {
assertEquals(3, numDeleteOffsets);
}
+ @Test
+ public void testIsExpiredOffset() {
+ long currentTimestamp = 1000L;
+ long baseTimestamp = 500L;
+ OptionalLong expireTimestampMs = OptionalLong.of(1500);
+ long offsetsRetentionMs = 500L;
+
+ // Current timestamp >= expire timestamp => should expire
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Current timestamp < expire timestamp => should not expire
+ currentTimestamp = 499;
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Expire timestamp does not exist (current version with no per
partition retention)
+ // Current timestamp - base timestamp >= offsets retention => should
expire
+ expireTimestampMs = OptionalLong.empty();
+ currentTimestamp = 1000L;
+ assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Current timestamp - base timestamp < offsets retention => should
not expire
+ currentTimestamp = 999L;
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+ }
+
+ @Test
+ public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .build();
+
+ List<Record> records = new ArrayList<>();
+ assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
Review Comment:
Shouldn't this throw an `GroupIdNotFoundException` exception?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java:
##########
@@ -898,6 +900,52 @@ public void createGroupTombstoneRecords(List<Record>
records) {
records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId()));
}
+ @Override
+ public boolean isGroupEmpty() {
+ return isInState(EMPTY);
+ }
+
+ /**
+ * Return the offset expiration condition to be used for this group. This
is based on several factors
+ * such as the group state, the protocol type, and the GroupMetadata
record version.
+ *
+ * See {@link
org.apache.kafka.coordinator.group.OffsetMetadataManager.OffsetExpirationCondition}
+ *
+ * @return The offset expiration condition for the group or Empty of no
such condition exists.
+ */
+ @Override
+ public Optional<OffsetMetadataManager.OffsetExpirationCondition>
offsetExpirationCondition() {
+ if (protocolType.isPresent()) {
+ if (isInState(EMPTY)) {
+ // No consumer exists in the group =>
+ // - If current state timestamp exists and retention period
has passed since group became Empty,
+ // expire all offsets with no pending offset commit;
+ // - If there is no current state timestamp (old group
metadata schema) and retention period has passed
+ // since the last commit timestamp, expire the offset
+ return Optional.of(
+ (offsetAndMetadata, currentTimestamp, offsetsRetentionMs)
-> OffsetMetadataManager.isExpiredOffset(
+ currentTimestamp,
+
currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs),
+ offsetAndMetadata.expireTimestampMs,
+ offsetsRetentionMs
+ )
Review Comment:
The dependency between the classes feels a bit weird here. The
OffsetMetadataManager calls offsetExpirationCondition which returns an
OffsetExpirationCondition which calls the OffsetMetadataManager. I was not
expecting this when I suggested it.
We could perhaps have an implementation of the OffsetExpirationCondition
which contains the logic of `OffsetMetadataManager.isExpiredOffset` and takes a
function to extract the timestamp from the record. Here we could just
instantiate the new class with the correct way to get the timestamp. Would it
make sense?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType
groupType) {
assertEquals(3, numDeleteOffsets);
}
+ @Test
+ public void testIsExpiredOffset() {
+ long currentTimestamp = 1000L;
+ long baseTimestamp = 500L;
+ OptionalLong expireTimestampMs = OptionalLong.of(1500);
+ long offsetsRetentionMs = 500L;
+
+ // Current timestamp >= expire timestamp => should expire
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
Review Comment:
Either the comment or the assertion is incorrect.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -1763,6 +1803,120 @@ public void testDeleteGroupAllOffsets(Group.GroupType
groupType) {
assertEquals(3, numDeleteOffsets);
}
+ @Test
+ public void testIsExpiredOffset() {
+ long currentTimestamp = 1000L;
+ long baseTimestamp = 500L;
+ OptionalLong expireTimestampMs = OptionalLong.of(1500);
+ long offsetsRetentionMs = 500L;
+
+ // Current timestamp >= expire timestamp => should expire
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Current timestamp < expire timestamp => should not expire
+ currentTimestamp = 499;
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Expire timestamp does not exist (current version with no per
partition retention)
+ // Current timestamp - base timestamp >= offsets retention => should
expire
+ expireTimestampMs = OptionalLong.empty();
+ currentTimestamp = 1000L;
+ assertTrue(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+
+ // Current timestamp - base timestamp < offsets retention => should
not expire
+ currentTimestamp = 999L;
+ assertFalse(OffsetMetadataManager.isExpiredOffset(currentTimestamp,
baseTimestamp, expireTimestampMs, offsetsRetentionMs));
+ }
+
+ @Test
+ public void testCleanupExpiredOffsetsGroupDoesNotExist() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .build();
+
+ List<Record> records = new ArrayList<>();
+ assertTrue(context.cleanupExpiredOffsets("unknown-group-id", records));
+ assertEquals(Collections.emptyList(), records);
+ }
+
+ @Test
+ public void testCleanupExpiredOffsetsGroupEmptyOffsetExpirationCondition()
{
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ Group group = mock(Group.class);
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withGroupMetadataManager(groupMetadataManager)
+ .build();
+
+ context.commitOffset("group-id", "topic", 0, 100L, 0);
+
+ when(groupMetadataManager.group("group-id")).thenReturn(group);
+ when(group.offsetExpirationCondition()).thenReturn(Optional.empty());
+
+ List<Record> records = new ArrayList<>();
+ assertFalse(context.cleanupExpiredOffsets("group-id", records));
+ assertEquals(Collections.emptyList(), records);
+ }
+
+ @Test
+ public void testCleanupExpiredOffsets() {
Review Comment:
Here we basically test the default condition, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]