dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1340783778
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -190,6 +191,11 @@ public GroupCoordinatorService build() {
*/
private final CoordinatorRuntime<GroupCoordinatorShard, Record> runtime;
+ /**
+ * The scheduler that periodically deletes expired group metadata.
+ */
+ private final KafkaScheduler scheduler = new KafkaScheduler(1, true,
"group-metadata-expiration-");
Review Comment:
We already have a timer so we should rather use it. Thinking about this, I
think that the best would best to actually schedule the expiration for each
shard independently in CoordinatorShard::onLoaded. From there, we can access
the `CoordinatorTimer`. Would this work?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -400,6 +408,14 @@ public CoordinatorResult<OffsetDeleteResponseData, Record>
deleteOffsets(
return offsetMetadataManager.deleteOffsets(request);
}
+ public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+ List<Record> records = new ArrayList<>();
+ offsetMetadataManager.cleanupOffsetMetadata(records,
config.offsetsRetentionMs);
+ groupMetadataManager.cleanupGroupMetadata(records);
Review Comment:
It is a tad annoying that we have to iterate on all the groups with offsets
in `cleanupOffsetMetadata` and on the groups in `cleanupGroupMetadata`. Would
it be possible to iterate on all groups only once?
It is also annoying that we have to iterate on all the groups as well
because it won't scale well. We would need to build indices to avoid this but I
am not sure if it is worth it. Have you already considered this?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -653,7 +676,17 @@ private void removeOffset(
if (partitionOffsets.isEmpty())
topicOffsets.remove(topic);
- if (topicOffsets.isEmpty())
+ if (topicOffsets.isEmpty()) {
offsetsByGroup.remove(groupId);
+ // A group in offset metadata manager should always exist in the
group metadata manager.
+ // In the case the write operation fails, the generic group will
remain Dead.
+ // This is okay since it follows the old coordinator behavior;
delete expired offsets and
+ // transition group to Dead even if append fails. One caveat is
that groups that just transitioned
+ // to Dead will be deleted in the next expiration cycle.
+ Group group = groupMetadataManager.group(groupId);
+ if (group.isEmpty()) {
+ group.transitionToDead();
Review Comment:
I actually wonder if we need this after all. If the deletion fails, the
group will be restored at its previous state when the operation is reverted in
the timeline data structure.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -400,6 +408,14 @@ public CoordinatorResult<OffsetDeleteResponseData, Record>
deleteOffsets(
return offsetMetadataManager.deleteOffsets(request);
}
+ public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+ List<Record> records = new ArrayList<>();
+ offsetMetadataManager.cleanupOffsetMetadata(records,
config.offsetsRetentionMs);
+ groupMetadataManager.cleanupGroupMetadata(records);
+
+ return new CoordinatorResult<>(records, null);
Review Comment:
The number of records here is potentially unlimited. Therefore, we could hit
the max message size. If we do, the expiration process would basically fail and
never recover. Have you thought about how we could handle this?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -641,6 +643,26 @@ public List<Record> createGroupTombstoneRecords() {
);
}
+ @Override
+ public boolean isEmpty() {
+ return state() == ConsumerGroupState.EMPTY;
+ }
+
+ @Override
+ public void transitionToDead() {
+ state.set(ConsumerGroupState.DEAD);
+ }
+
+ @Override
+ public boolean eligibleForExpiration() {
+ return state() == ConsumerGroupState.DEAD;
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndMetadata> expiredOffsets(long
offsetsRetentionMs, TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> offsets) {
+ return null;
Review Comment:
Why is this null?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -536,6 +538,27 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup
fetchAllOffsets(
.setTopics(topicResponses);
}
+ public void cleanupOffsetMetadata(List<Record> records, long
offsetsRetentionMs) {
+ // TODO: get only committed offset groups?
+ offsetsByGroup.forEach((groupId, offsetsByTopic) -> {
+ try {
+ Group group = groupMetadataManager.group(groupId);
+ Map<TopicPartition, OffsetAndMetadata> expiredOffsets =
group.expiredOffsets(offsetsRetentionMs, offsetsByTopic);
Review Comment:
Is there a reason why we need to pass `offsetsByTopic` to the group? Would
it be possible to just ask the group is offsets could be expired and then
expire them if it is possible. This would follow the pattern that we have
established in @dongnuo123's PR.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3100,6 +3100,15 @@ void validateDeleteGroup(String groupId) throws
ApiException {
group.validateDeleteGroup();
}
+ public void cleanupGroupMetadata(List<Record> records) {
+ // TODO: committed offset?
+ groups.values().forEach(group -> {
+ if (group.eligibleForExpiration()) {
+ records.addAll(group.createGroupTombstoneRecords());
+ }
+ });
Review Comment:
It is a bit annoying that we have to iterate over all the groups to find the
one eligible for expiration. Have you thought about alternatives?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -536,6 +538,27 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup
fetchAllOffsets(
.setTopics(topicResponses);
}
+ public void cleanupOffsetMetadata(List<Record> records, long
offsetsRetentionMs) {
+ // TODO: get only committed offset groups?
+ offsetsByGroup.forEach((groupId, offsetsByTopic) -> {
+ try {
+ Group group = groupMetadataManager.group(groupId);
+ Map<TopicPartition, OffsetAndMetadata> expiredOffsets =
group.expiredOffsets(offsetsRetentionMs, offsetsByTopic);
+ log.debug("[GroupId {}] Expiring offsets: {}", groupId,
expiredOffsets.keySet());
+ expiredOffsets.forEach((topicPartition, offsetAndMetadata) -> {
Review Comment:
Is expiredOffsets really necessary? It seems that we could directly generate
the required records instead of building it, no?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]