jeffkbkim commented on code in PR #14067: URL: https://github.com/apache/kafka/pull/14067#discussion_r1274173286
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; + +/** + * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains + * a mapping from group id to topic-partition to offset. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class OffsetMetadataManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private Time time = null; + private GroupMetadataManager groupMetadataManager = null; + private int offsetMetadataMaxSize = 4096; + private MetadataImage metadataImage = null; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withTime(Time time) { + this.time = time; + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; + return this; + } + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; + return this; + } + + public OffsetMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; + + if (groupMetadataManager == null) { + throw new IllegalArgumentException("GroupMetadataManager cannot be null"); + } + + return new OffsetMetadataManager( + snapshotRegistry, + logContext, + time, + metadataImage, + groupMetadataManager, + offsetMetadataMaxSize + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The system time. + */ + private final Time time; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The group metadata manager. + */ + private final GroupMetadataManager groupMetadataManager; + + /** + * The maximum allowed metadata for any offset commit. + */ + private final int offsetMetadataMaxSize; + + /** + * The offsets keyed by topic-partition and group id. + */ + private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup; + + OffsetMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + Time time, + MetadataImage metadataImage, + GroupMetadataManager groupMetadataManager, + int offsetMetadataMaxSize + ) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(OffsetMetadataManager.class); + this.time = time; + this.metadataImage = metadataImage; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Validates an OffsetCommit request. + * + * @param context The request context. + * @param request The actual request. + */ + private void validateOffsetCommit( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group; + try { + group = groupMetadataManager.group(request.groupId()); + } catch (GroupIdNotFoundException ex) { + if (request.generationIdOrMemberEpoch() < 0) { + // If the group does not exist and generation id is -1, the request comes from + // either the admin client or a consumer which does not use the group management + // facility. In this case, a so-called simple group is created and the request + // is accepted. + group = groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true); + } else { + // Maintain backward compatibility. This is a bit weird in the + // context of the new protocol though. + throw Errors.ILLEGAL_GENERATION.exception(); + } + } + + // Validate the request based on the group type. + switch (group.type()) { + case GENERIC: + validateOffsetCommitForGenericGroup( + (GenericGroup) group, + request + ); + break; + + case CONSUMER: + validateOffsetCommitForConsumerGroup( + (ConsumerGroup) group, + context, + request + ); + break; + } + } + + /** + * Validates an OffsetCommit request for a generic group. + * + * @param group The generic group. + * @param request The actual request. + */ + public void validateOffsetCommitForGenericGroup( + GenericGroup group, + OffsetCommitRequestData request + ) throws KafkaException { + if (group.isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (request.generationIdOrMemberEpoch() < 0 && group.isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + Optional<String> groupInstanceId = OffsetCommitRequest.groupInstanceId(request); + if (request.generationIdOrMemberEpoch() >= 0 || !request.memberId().isEmpty() || groupInstanceId.isPresent()) { + // We are validating three things: + // 1. If the `groupInstanceId` is present, then it exists and is mapped to `memberId`; + // 2. The `memberId` exists in the group; and + // 3. The `generationId` matches the current generation id. + if (groupInstanceId.isPresent()) { + String memberId = group.staticMemberId(groupInstanceId.get()); + if (memberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!request.memberId().equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!group.hasMemberId(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (request.generationIdOrMemberEpoch() != group.generationId()) { + throw Errors.ILLEGAL_GENERATION.exception(); + } + } else if (!group.isInState(EMPTY)) { + // If the request does not contain the member id and the generation + // id (version 0), offset commits are only accepted when the group + // is empty. + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (group.isInState(COMPLETING_REBALANCE)) { + // We should not receive a commit request if the group has not completed rebalance; + // but since the consumer's member.id and generation is valid, it means it has received + // the latest group generation information from the JoinResponse. + // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. + throw Errors.REBALANCE_IN_PROGRESS.exception(); + } + } + + /** + * Validates an OffsetCommit request for a consumer group. + * + * @param group The consumer group. + * @param context The request context. + * @param request The actual request. + */ + public void validateOffsetCommitForConsumerGroup( + ConsumerGroup group, + RequestContext context, + OffsetCommitRequestData request + ) throws KafkaException { + if (request.generationIdOrMemberEpoch() < 0 && group.members().isEmpty()) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (!group.hasMember(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + final int memberEpoch = group.getOrMaybeCreateMember(request.memberId(), false).memberEpoch(); + if (request.generationIdOrMemberEpoch() != memberEpoch) { + // Consumers using the new consumer group protocol (KIP-848) should be using the + // OffsetCommit API >= 9. As we don't support upgrading from the old to the new + // protocol yet, we return an UNSUPPORTED_VERSION error if an older version is + // used. We will revise this when the upgrade path is implemented. + if (context.header.apiVersion() >= 9) { + throw Errors.STALE_MEMBER_EPOCH.exception(); + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } + } + + /** + * Handles an OffsetCommit request. + * + * @param context The request context. + * @param request The OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + validateOffsetCommit(context, request); + Review Comment: yes, don't we need to reschedule the heartbeat? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; + +/** + * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains + * a mapping from group id to topic-partition to offset. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class OffsetMetadataManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private Time time = null; + private GroupMetadataManager groupMetadataManager = null; + private int offsetMetadataMaxSize = 4096; + private MetadataImage metadataImage = null; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withTime(Time time) { + this.time = time; + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; + return this; + } + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; + return this; + } + + public OffsetMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; + + if (groupMetadataManager == null) { + throw new IllegalArgumentException("GroupMetadataManager cannot be null"); + } + + return new OffsetMetadataManager( + snapshotRegistry, + logContext, + time, + metadataImage, + groupMetadataManager, + offsetMetadataMaxSize + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The system time. + */ + private final Time time; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The group metadata manager. + */ + private final GroupMetadataManager groupMetadataManager; + + /** + * The maximum allowed metadata for any offset commit. + */ + private final int offsetMetadataMaxSize; + + /** + * The offsets keyed by topic-partition and group id. + */ + private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup; + + OffsetMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + Time time, + MetadataImage metadataImage, + GroupMetadataManager groupMetadataManager, + int offsetMetadataMaxSize + ) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(OffsetMetadataManager.class); + this.time = time; + this.metadataImage = metadataImage; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Validates an OffsetCommit request. + * + * @param context The request context. + * @param request The actual request. + */ + private void validateOffsetCommit( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group; + try { + // If the group does not exist and generation id is -1, the request comes from + // either the admin client or a consumer which does not use the group management + // facility. In this case, a so-called simple group is created and the request + // is accepted. + group = groupMetadataManager.getOrMaybeCreateSimpleGroup( + request.groupId(), + request.generationIdOrMemberEpoch() < 0 + ); + } catch (GroupIdNotFoundException ex) { + // Maintain backward compatibility. This is a bit weird in the + // context of the new protocol though. + throw Errors.ILLEGAL_GENERATION.exception(); + } + + // Validate the request based on the group type. + switch (group.type()) { + case GENERIC: + validateOffsetCommitForGenericGroup( + (GenericGroup) group, + request + ); + break; + + case CONSUMER: + validateOffsetCommitForConsumerGroup( + (ConsumerGroup) group, + context, + request + ); + break; + } + } + + /** + * Validates an OffsetCommit request for a generic group. + * + * @param group The generic group. + * @param request The actual request. + */ + public void validateOffsetCommitForGenericGroup( + GenericGroup group, + OffsetCommitRequestData request + ) throws KafkaException { + if (group.isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (request.generationIdOrMemberEpoch() < 0 && group.isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + Optional<String> groupInstanceId = OffsetCommitRequest.groupInstanceId(request); + if (request.generationIdOrMemberEpoch() >= 0 || !request.memberId().isEmpty() || groupInstanceId.isPresent()) { + // We are validating three things: + // 1. If the `groupInstanceId` is present, then it exists and is mapped to `memberId`; + // 2. The `memberId` exists in the group; and + // 3. The `generationId` matches the current generation id. + if (groupInstanceId.isPresent()) { + String memberId = group.staticMemberId(groupInstanceId.get()); + if (memberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!request.memberId().equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!group.hasMemberId(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (request.generationIdOrMemberEpoch() != group.generationId()) { + throw Errors.ILLEGAL_GENERATION.exception(); + } + } else if (!group.isInState(EMPTY)) { + // If the request does not contain the member id and the generation + // id (version 0), offset commits are only accepted when the group + // is not empty. + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (group.isInState(COMPLETING_REBALANCE)) { + // We should not receive a commit request if the group has not completed rebalance; + // but since the consumer's member.id and generation is valid, it means it has received + // the latest group generation information from the JoinResponse. + // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. + throw Errors.REBALANCE_IN_PROGRESS.exception(); + } + } + + /** + * Validates an OffsetCommit request for a consumer group. + * + * @param group The consumer group. + * @param context The request context. + * @param request The actual request. + */ + public void validateOffsetCommitForConsumerGroup( + ConsumerGroup group, + RequestContext context, + OffsetCommitRequestData request + ) throws KafkaException { + if (request.generationIdOrMemberEpoch() < 0 && group.members().isEmpty()) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (!group.hasMember(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + final int memberEpoch = group.getOrMaybeCreateMember(request.memberId(), false).memberEpoch(); + if (request.generationIdOrMemberEpoch() != memberEpoch) { + // Consumers using the new consumer group protocol (KIP-848) should be using the + // OffsetCommit API >= 9. As we don't support upgrading from the old to the new + // protocol yet, we return an UNSUPPORTED_VERSION error if an older version is + // used. We will revise this when the upgrade path is implemented. + if (context.header.apiVersion() >= 9) { + throw Errors.STALE_MEMBER_EPOCH.exception(); + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } + } + + /** + * Handles an OffsetCommit request. + * + * @param context The request context. + * @param request The OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + validateOffsetCommit(context, request); + + final long currentTimeMs = time.milliseconds(); + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + final OptionalLong expireTimestampMs = request.retentionTimeMs() == OffsetCommitRequest.DEFAULT_RETENTION_TIME ? + OptionalLong.empty() : OptionalLong.of(currentTimeMs + request.retentionTimeMs()); + final OffsetCommitResponseData response = new OffsetCommitResponseData(); + final List<Record> records = new ArrayList<>(); + + request.topics().forEach(topic -> { + final OffsetCommitResponseData.OffsetCommitResponseTopic topicResponse = + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName(topic.name()); + response.topics().add(topicResponse); + + topic.partitions().forEach(partition -> { + if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) { + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); + } else { + log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + request.memberId(), partition.committedLeaderEpoch()); + + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.NONE.code())); + + final OptionalInt leaderEpoch = partition.committedLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? Review Comment: looks much better. thanks! ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; + +/** + * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains + * a mapping from group id to topic-partition to offset. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class OffsetMetadataManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private Time time = null; + private GroupMetadataManager groupMetadataManager = null; + private int offsetMetadataMaxSize = 4096; + private MetadataImage metadataImage = null; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withTime(Time time) { + this.time = time; + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; + return this; + } + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; + return this; + } + + public OffsetMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; + + if (groupMetadataManager == null) { + throw new IllegalArgumentException("GroupMetadataManager cannot be null"); + } + + return new OffsetMetadataManager( + snapshotRegistry, + logContext, + time, + metadataImage, + groupMetadataManager, + offsetMetadataMaxSize + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The system time. + */ + private final Time time; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The group metadata manager. + */ + private final GroupMetadataManager groupMetadataManager; + + /** + * The maximum allowed metadata for any offset commit. + */ + private final int offsetMetadataMaxSize; + + /** + * The offsets keyed by topic-partition and group id. + */ + private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup; + + OffsetMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + Time time, + MetadataImage metadataImage, + GroupMetadataManager groupMetadataManager, + int offsetMetadataMaxSize + ) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(OffsetMetadataManager.class); + this.time = time; + this.metadataImage = metadataImage; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Validates an OffsetCommit request. + * + * @param context The request context. + * @param request The actual request. + */ + private void validateOffsetCommit( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group; + try { + group = groupMetadataManager.group(request.groupId()); + } catch (GroupIdNotFoundException ex) { + if (request.generationIdOrMemberEpoch() < 0) { + // If the group does not exist and generation id is -1, the request comes from + // either the admin client or a consumer which does not use the group management + // facility. In this case, a so-called simple group is created and the request + // is accepted. + group = groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true); + } else { + // Maintain backward compatibility. This is a bit weird in the + // context of the new protocol though. + throw Errors.ILLEGAL_GENERATION.exception(); + } + } + + // Validate the request based on the group type. + switch (group.type()) { + case GENERIC: + validateOffsetCommitForGenericGroup( + (GenericGroup) group, + request + ); + break; + + case CONSUMER: + validateOffsetCommitForConsumerGroup( + (ConsumerGroup) group, + context, + request + ); + break; + } + } + + /** + * Validates an OffsetCommit request for a generic group. + * + * @param group The generic group. + * @param request The actual request. + */ + public void validateOffsetCommitForGenericGroup( + GenericGroup group, + OffsetCommitRequestData request + ) throws KafkaException { + if (group.isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (request.generationIdOrMemberEpoch() < 0 && group.isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + Optional<String> groupInstanceId = OffsetCommitRequest.groupInstanceId(request); + if (request.generationIdOrMemberEpoch() >= 0 || !request.memberId().isEmpty() || groupInstanceId.isPresent()) { + // We are validating three things: + // 1. If the `groupInstanceId` is present, then it exists and is mapped to `memberId`; + // 2. The `memberId` exists in the group; and + // 3. The `generationId` matches the current generation id. + if (groupInstanceId.isPresent()) { + String memberId = group.staticMemberId(groupInstanceId.get()); + if (memberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!request.memberId().equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!group.hasMemberId(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (request.generationIdOrMemberEpoch() != group.generationId()) { + throw Errors.ILLEGAL_GENERATION.exception(); + } + } else if (!group.isInState(EMPTY)) { + // If the request does not contain the member id and the generation + // id (version 0), offset commits are only accepted when the group + // is empty. + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (group.isInState(COMPLETING_REBALANCE)) { + // We should not receive a commit request if the group has not completed rebalance; + // but since the consumer's member.id and generation is valid, it means it has received + // the latest group generation information from the JoinResponse. + // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. + throw Errors.REBALANCE_IN_PROGRESS.exception(); + } + } + + /** + * Validates an OffsetCommit request for a consumer group. + * + * @param group The consumer group. + * @param context The request context. + * @param request The actual request. + */ + public void validateOffsetCommitForConsumerGroup( + ConsumerGroup group, + RequestContext context, + OffsetCommitRequestData request + ) throws KafkaException { + if (request.generationIdOrMemberEpoch() < 0 && group.members().isEmpty()) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (!group.hasMember(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + final int memberEpoch = group.getOrMaybeCreateMember(request.memberId(), false).memberEpoch(); + if (request.generationIdOrMemberEpoch() != memberEpoch) { + // Consumers using the new consumer group protocol (KIP-848) should be using the + // OffsetCommit API >= 9. As we don't support upgrading from the old to the new + // protocol yet, we return an UNSUPPORTED_VERSION error if an older version is + // used. We will revise this when the upgrade path is implemented. + if (context.header.apiVersion() >= 9) { + throw Errors.STALE_MEMBER_EPOCH.exception(); + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } + } + + /** + * Handles an OffsetCommit request. + * + * @param context The request context. + * @param request The OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + validateOffsetCommit(context, request); + + final long currentTimeMs = time.milliseconds(); + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + final OptionalLong expireTimestampMs = request.retentionTimeMs() == OffsetCommitRequest.DEFAULT_RETENTION_TIME ? + OptionalLong.empty() : OptionalLong.of(currentTimeMs + request.retentionTimeMs()); + final OffsetCommitResponseData response = new OffsetCommitResponseData(); + final List<Record> records = new ArrayList<>(); + + request.topics().forEach(topic -> { + final OffsetCommitResponseData.OffsetCommitResponseTopic topicResponse = + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName(topic.name()); + response.topics().add(topicResponse); + + topic.partitions().forEach(partition -> { + if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) { + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); + } else { + log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + request.memberId(), partition.committedLeaderEpoch()); + + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.NONE.code())); + + final OptionalInt leaderEpoch = partition.committedLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? + OptionalInt.empty() : OptionalInt.of(partition.committedLeaderEpoch()); + final String metadata = partition.committedMetadata() == null ? + OffsetAndMetadata.NO_METADATA : partition.committedMetadata(); + final long commitTimestampMs = partition.commitTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ? + currentTimeMs : partition.commitTimestamp(); + + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( + partition.committedOffset(), + leaderEpoch, + metadata, + commitTimestampMs, + expireTimestampMs + ); + + records.add(RecordHelpers.newOffsetCommitRecord( + request.groupId(), + topic.name(), + partition.partitionIndex(), + offsetAndMetadata, + metadataImage.features().metadataVersion() + )); + } + }); + }); + + return new CoordinatorResult<>(records, response); Review Comment: for instance, the existing behavior only updates if the group is not dead but we don't do that here. and in `onOffsetCommitAppend()`: ``` if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata)) offsets.put(topicPartition, offsetWithCommitRecordMetadata) ``` we only append if the commit time is newer whereas we don't do these checks here. i guess checking the response status doesn't apply here since we only handle one __consumer_offsets partition per coordinator. for pending offset commits, you're saying their use is to check whether an append is in-flight or not which is already taken care of by the timeline map right? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; + +/** + * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains + * a mapping from group id to topic-partition to offset. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class OffsetMetadataManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private Time time = null; + private GroupMetadataManager groupMetadataManager = null; + private int offsetMetadataMaxSize = 4096; + private MetadataImage metadataImage = null; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withTime(Time time) { + this.time = time; + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; + return this; + } + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; + return this; + } + + public OffsetMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; + + if (groupMetadataManager == null) { + throw new IllegalArgumentException("GroupMetadataManager cannot be null"); + } + + return new OffsetMetadataManager( + snapshotRegistry, + logContext, + time, + metadataImage, + groupMetadataManager, + offsetMetadataMaxSize + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The system time. + */ + private final Time time; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The group metadata manager. + */ + private final GroupMetadataManager groupMetadataManager; + + /** + * The maximum allowed metadata for any offset commit. + */ + private final int offsetMetadataMaxSize; + + /** + * The offsets keyed by topic-partition and group id. + */ + private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup; + + OffsetMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + Time time, + MetadataImage metadataImage, + GroupMetadataManager groupMetadataManager, + int offsetMetadataMaxSize + ) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(OffsetMetadataManager.class); + this.time = time; + this.metadataImage = metadataImage; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Validates an OffsetCommit request. + * + * @param context The request context. + * @param request The actual request. + */ + private void validateOffsetCommit( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group; + try { + group = groupMetadataManager.group(request.groupId()); + } catch (GroupIdNotFoundException ex) { + if (request.generationIdOrMemberEpoch() < 0) { + // If the group does not exist and generation id is -1, the request comes from + // either the admin client or a consumer which does not use the group management + // facility. In this case, a so-called simple group is created and the request + // is accepted. + group = groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true); + } else { + // Maintain backward compatibility. This is a bit weird in the + // context of the new protocol though. + throw Errors.ILLEGAL_GENERATION.exception(); + } + } + + // Validate the request based on the group type. + switch (group.type()) { + case GENERIC: + validateOffsetCommitForGenericGroup( + (GenericGroup) group, + request + ); + break; + + case CONSUMER: + validateOffsetCommitForConsumerGroup( + (ConsumerGroup) group, + context, + request + ); + break; + } + } + + /** + * Validates an OffsetCommit request for a generic group. + * + * @param group The generic group. + * @param request The actual request. + */ + public void validateOffsetCommitForGenericGroup( + GenericGroup group, + OffsetCommitRequestData request + ) throws KafkaException { + if (group.isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (request.generationIdOrMemberEpoch() < 0 && group.isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + Optional<String> groupInstanceId = OffsetCommitRequest.groupInstanceId(request); + if (request.generationIdOrMemberEpoch() >= 0 || !request.memberId().isEmpty() || groupInstanceId.isPresent()) { + // We are validating three things: + // 1. If the `groupInstanceId` is present, then it exists and is mapped to `memberId`; + // 2. The `memberId` exists in the group; and + // 3. The `generationId` matches the current generation id. + if (groupInstanceId.isPresent()) { + String memberId = group.staticMemberId(groupInstanceId.get()); + if (memberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!request.memberId().equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!group.hasMemberId(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (request.generationIdOrMemberEpoch() != group.generationId()) { + throw Errors.ILLEGAL_GENERATION.exception(); + } + } else if (!group.isInState(EMPTY)) { + // If the request does not contain the member id and the generation + // id (version 0), offset commits are only accepted when the group + // is empty. + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (group.isInState(COMPLETING_REBALANCE)) { + // We should not receive a commit request if the group has not completed rebalance; + // but since the consumer's member.id and generation is valid, it means it has received + // the latest group generation information from the JoinResponse. + // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. + throw Errors.REBALANCE_IN_PROGRESS.exception(); + } + } + + /** + * Validates an OffsetCommit request for a consumer group. + * + * @param group The consumer group. + * @param context The request context. + * @param request The actual request. + */ + public void validateOffsetCommitForConsumerGroup( + ConsumerGroup group, + RequestContext context, + OffsetCommitRequestData request + ) throws KafkaException { + if (request.generationIdOrMemberEpoch() < 0 && group.members().isEmpty()) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (!group.hasMember(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + final int memberEpoch = group.getOrMaybeCreateMember(request.memberId(), false).memberEpoch(); + if (request.generationIdOrMemberEpoch() != memberEpoch) { + // Consumers using the new consumer group protocol (KIP-848) should be using the + // OffsetCommit API >= 9. As we don't support upgrading from the old to the new + // protocol yet, we return an UNSUPPORTED_VERSION error if an older version is + // used. We will revise this when the upgrade path is implemented. + if (context.header.apiVersion() >= 9) { + throw Errors.STALE_MEMBER_EPOCH.exception(); + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } + } + + /** + * Handles an OffsetCommit request. + * + * @param context The request context. + * @param request The OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + validateOffsetCommit(context, request); + + final long currentTimeMs = time.milliseconds(); + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + final OptionalLong expireTimestampMs = request.retentionTimeMs() == OffsetCommitRequest.DEFAULT_RETENTION_TIME ? + OptionalLong.empty() : OptionalLong.of(currentTimeMs + request.retentionTimeMs()); + final OffsetCommitResponseData response = new OffsetCommitResponseData(); + final List<Record> records = new ArrayList<>(); + + request.topics().forEach(topic -> { + final OffsetCommitResponseData.OffsetCommitResponseTopic topicResponse = + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName(topic.name()); + response.topics().add(topicResponse); + + topic.partitions().forEach(partition -> { + if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) { + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); + } else { + log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + request.memberId(), partition.committedLeaderEpoch()); + + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.NONE.code())); + + final OptionalInt leaderEpoch = partition.committedLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? + OptionalInt.empty() : OptionalInt.of(partition.committedLeaderEpoch()); + final String metadata = partition.committedMetadata() == null ? + OffsetAndMetadata.NO_METADATA : partition.committedMetadata(); + final long commitTimestampMs = partition.commitTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ? + currentTimeMs : partition.commitTimestamp(); + + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( + partition.committedOffset(), + leaderEpoch, + metadata, + commitTimestampMs, + expireTimestampMs + ); + + records.add(RecordHelpers.newOffsetCommitRecord( + request.groupId(), + topic.name(), + partition.partitionIndex(), + offsetAndMetadata, + metadataImage.features().metadataVersion() + )); + } + }); + }); + + return new CoordinatorResult<>(records, response); + } + + /** + * Replays OffsetCommitKey/Value to update or delete the corresponding offsets. + * + * @param key A OffsetCommitKey key. + * @param value A OffsetCommitValue value. + */ + public void replay( + OffsetCommitKey key, + OffsetCommitValue value + ) { + final String groupId = key.group(); + final TopicPartition tp = new TopicPartition(key.topic(), key.partition()); + + if (value != null) { + // Ensures that there is a corresponding group for the offsets. If a group does + // not exist, a generic group is created as a so-called "simple group". + try { + groupMetadataManager.group(groupId); + } catch (GroupIdNotFoundException ex) { + groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, true); + } + + final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRecord(value); + TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId); + if (offsets == null) { + offsets = new TimelineHashMap<>(snapshotRegistry, 0); + offsetsByGroup.put(groupId, offsets); + } + + offsets.put(tp, offsetAndMetadata); + } else { + TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId); Review Comment: i did it for GroupMetadata record 😅 i'll remove it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; + +/** + * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains + * a mapping from group id to topic-partition to offset. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class OffsetMetadataManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private Time time = null; + private GroupMetadataManager groupMetadataManager = null; + private int offsetMetadataMaxSize = 4096; + private MetadataImage metadataImage = null; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withTime(Time time) { + this.time = time; + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; + return this; + } + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; + return this; + } + + public OffsetMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; + + if (groupMetadataManager == null) { + throw new IllegalArgumentException("GroupMetadataManager cannot be null"); + } + + return new OffsetMetadataManager( + snapshotRegistry, + logContext, + time, + metadataImage, + groupMetadataManager, + offsetMetadataMaxSize + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The system time. + */ + private final Time time; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The group metadata manager. + */ + private final GroupMetadataManager groupMetadataManager; + + /** + * The maximum allowed metadata for any offset commit. + */ + private final int offsetMetadataMaxSize; + + /** + * The offsets keyed by topic-partition and group id. + */ + private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup; + + OffsetMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + Time time, + MetadataImage metadataImage, + GroupMetadataManager groupMetadataManager, + int offsetMetadataMaxSize + ) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(OffsetMetadataManager.class); + this.time = time; + this.metadataImage = metadataImage; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Validates an OffsetCommit request. + * + * @param context The request context. + * @param request The actual request. + */ + private void validateOffsetCommit( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group; + try { + group = groupMetadataManager.group(request.groupId()); + } catch (GroupIdNotFoundException ex) { + if (request.generationIdOrMemberEpoch() < 0) { + // If the group does not exist and generation id is -1, the request comes from + // either the admin client or a consumer which does not use the group management + // facility. In this case, a so-called simple group is created and the request + // is accepted. + group = groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true); + } else { + // Maintain backward compatibility. This is a bit weird in the + // context of the new protocol though. + throw Errors.ILLEGAL_GENERATION.exception(); + } + } + + // Validate the request based on the group type. + switch (group.type()) { + case GENERIC: + validateOffsetCommitForGenericGroup( + (GenericGroup) group, + request + ); + break; + + case CONSUMER: + validateOffsetCommitForConsumerGroup( + (ConsumerGroup) group, + context, + request + ); + break; + } + } + + /** + * Validates an OffsetCommit request for a generic group. + * + * @param group The generic group. + * @param request The actual request. + */ + public void validateOffsetCommitForGenericGroup( + GenericGroup group, + OffsetCommitRequestData request + ) throws KafkaException { + if (group.isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (request.generationIdOrMemberEpoch() < 0 && group.isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + Optional<String> groupInstanceId = OffsetCommitRequest.groupInstanceId(request); + if (request.generationIdOrMemberEpoch() >= 0 || !request.memberId().isEmpty() || groupInstanceId.isPresent()) { + // We are validating three things: + // 1. If the `groupInstanceId` is present, then it exists and is mapped to `memberId`; + // 2. The `memberId` exists in the group; and + // 3. The `generationId` matches the current generation id. + if (groupInstanceId.isPresent()) { + String memberId = group.staticMemberId(groupInstanceId.get()); + if (memberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!request.memberId().equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!group.hasMemberId(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (request.generationIdOrMemberEpoch() != group.generationId()) { + throw Errors.ILLEGAL_GENERATION.exception(); + } + } else if (!group.isInState(EMPTY)) { + // If the request does not contain the member id and the generation + // id (version 0), offset commits are only accepted when the group + // is empty. + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (group.isInState(COMPLETING_REBALANCE)) { + // We should not receive a commit request if the group has not completed rebalance; + // but since the consumer's member.id and generation is valid, it means it has received + // the latest group generation information from the JoinResponse. + // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. + throw Errors.REBALANCE_IN_PROGRESS.exception(); + } + } + + /** + * Validates an OffsetCommit request for a consumer group. + * + * @param group The consumer group. + * @param context The request context. + * @param request The actual request. + */ + public void validateOffsetCommitForConsumerGroup( + ConsumerGroup group, + RequestContext context, + OffsetCommitRequestData request + ) throws KafkaException { + if (request.generationIdOrMemberEpoch() < 0 && group.members().isEmpty()) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (!group.hasMember(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + final int memberEpoch = group.getOrMaybeCreateMember(request.memberId(), false).memberEpoch(); + if (request.generationIdOrMemberEpoch() != memberEpoch) { + // Consumers using the new consumer group protocol (KIP-848) should be using the + // OffsetCommit API >= 9. As we don't support upgrading from the old to the new + // protocol yet, we return an UNSUPPORTED_VERSION error if an older version is + // used. We will revise this when the upgrade path is implemented. + if (context.header.apiVersion() >= 9) { + throw Errors.STALE_MEMBER_EPOCH.exception(); + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } + } + + /** + * Handles an OffsetCommit request. + * + * @param context The request context. + * @param request The OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + validateOffsetCommit(context, request); + + final long currentTimeMs = time.milliseconds(); + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + final OptionalLong expireTimestampMs = request.retentionTimeMs() == OffsetCommitRequest.DEFAULT_RETENTION_TIME ? + OptionalLong.empty() : OptionalLong.of(currentTimeMs + request.retentionTimeMs()); + final OffsetCommitResponseData response = new OffsetCommitResponseData(); + final List<Record> records = new ArrayList<>(); + + request.topics().forEach(topic -> { + final OffsetCommitResponseData.OffsetCommitResponseTopic topicResponse = + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName(topic.name()); + response.topics().add(topicResponse); + + topic.partitions().forEach(partition -> { + if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) { + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); + } else { + log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + request.memberId(), partition.committedLeaderEpoch()); + + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.NONE.code())); + + final OptionalInt leaderEpoch = partition.committedLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? + OptionalInt.empty() : OptionalInt.of(partition.committedLeaderEpoch()); + final String metadata = partition.committedMetadata() == null ? + OffsetAndMetadata.NO_METADATA : partition.committedMetadata(); + final long commitTimestampMs = partition.commitTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ? + currentTimeMs : partition.commitTimestamp(); + + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( + partition.committedOffset(), + leaderEpoch, + metadata, + commitTimestampMs, + expireTimestampMs + ); + + records.add(RecordHelpers.newOffsetCommitRecord( + request.groupId(), + topic.name(), + partition.partitionIndex(), + offsetAndMetadata, + metadataImage.features().metadataVersion() + )); + } + }); + }); + + return new CoordinatorResult<>(records, response); + } + + /** + * Replays OffsetCommitKey/Value to update or delete the corresponding offsets. + * + * @param key A OffsetCommitKey key. + * @param value A OffsetCommitValue value. + */ + public void replay( + OffsetCommitKey key, + OffsetCommitValue value + ) { + final String groupId = key.group(); + final TopicPartition tp = new TopicPartition(key.topic(), key.partition()); + + if (value != null) { + // Ensures that there is a corresponding group for the offsets. If a group does + // not exist, a generic group is created as a so-called "simple group". + try { + groupMetadataManager.group(groupId); + } catch (GroupIdNotFoundException ex) { + groupMetadataManager.getOrMaybeCreateGenericGroup(groupId, true); + } + + final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRecord(value); + TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId); + if (offsets == null) { + offsets = new TimelineHashMap<>(snapshotRegistry, 0); + offsetsByGroup.put(groupId, offsets); + } + + offsets.put(tp, offsetAndMetadata); + } else { + TimelineHashMap<TopicPartition, OffsetAndMetadata> offsets = offsetsByGroup.get(groupId); Review Comment: i did it for GroupMetadata record 😅 i'll remove it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.COMPLETING_REBALANCE; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.DEAD; +import static org.apache.kafka.coordinator.group.generic.GenericGroupState.EMPTY; + +/** + * The OffsetMetadataManager manages the offsets of all the groups. It basically maintains + * a mapping from group id to topic-partition to offset. This class has two kinds of methods: + * 1) The request handlers which handle the requests and generate a response and records to + * mutate the hard state. Those records will be written by the runtime and applied to the + * hard state via the replay methods. + * 2) The replay methods which apply records to the hard state. Those are used in the request + * handling as well as during the initial loading of the records from the partitions. + */ +public class OffsetMetadataManager { + public static class Builder { + private LogContext logContext = null; + private SnapshotRegistry snapshotRegistry = null; + private Time time = null; + private GroupMetadataManager groupMetadataManager = null; + private int offsetMetadataMaxSize = 4096; + private MetadataImage metadataImage = null; + + Builder withLogContext(LogContext logContext) { + this.logContext = logContext; + return this; + } + + Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) { + this.snapshotRegistry = snapshotRegistry; + return this; + } + + Builder withTime(Time time) { + this.time = time; + return this; + } + + Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) { + this.groupMetadataManager = groupMetadataManager; + return this; + } + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; + return this; + } + + public OffsetMetadataManager build() { + if (logContext == null) logContext = new LogContext(); + if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; + + if (groupMetadataManager == null) { + throw new IllegalArgumentException("GroupMetadataManager cannot be null"); + } + + return new OffsetMetadataManager( + snapshotRegistry, + logContext, + time, + metadataImage, + groupMetadataManager, + offsetMetadataMaxSize + ); + } + } + + /** + * The logger. + */ + private final Logger log; + + /** + * The snapshot registry. + */ + private final SnapshotRegistry snapshotRegistry; + + /** + * The system time. + */ + private final Time time; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; + + /** + * The group metadata manager. + */ + private final GroupMetadataManager groupMetadataManager; + + /** + * The maximum allowed metadata for any offset commit. + */ + private final int offsetMetadataMaxSize; + + /** + * The offsets keyed by topic-partition and group id. + */ + private final TimelineHashMap<String, TimelineHashMap<TopicPartition, OffsetAndMetadata>> offsetsByGroup; + + OffsetMetadataManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + Time time, + MetadataImage metadataImage, + GroupMetadataManager groupMetadataManager, + int offsetMetadataMaxSize + ) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(OffsetMetadataManager.class); + this.time = time; + this.metadataImage = metadataImage; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + /** + * Validates an OffsetCommit request. + * + * @param context The request context. + * @param request The actual request. + */ + private void validateOffsetCommit( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + Group group; + try { + group = groupMetadataManager.group(request.groupId()); + } catch (GroupIdNotFoundException ex) { + if (request.generationIdOrMemberEpoch() < 0) { + // If the group does not exist and generation id is -1, the request comes from + // either the admin client or a consumer which does not use the group management + // facility. In this case, a so-called simple group is created and the request + // is accepted. + group = groupMetadataManager.getOrMaybeCreateGenericGroup(request.groupId(), true); + } else { + // Maintain backward compatibility. This is a bit weird in the + // context of the new protocol though. + throw Errors.ILLEGAL_GENERATION.exception(); + } + } + + // Validate the request based on the group type. + switch (group.type()) { + case GENERIC: + validateOffsetCommitForGenericGroup( + (GenericGroup) group, + request + ); + break; + + case CONSUMER: + validateOffsetCommitForConsumerGroup( + (ConsumerGroup) group, + context, + request + ); + break; + } + } + + /** + * Validates an OffsetCommit request for a generic group. + * + * @param group The generic group. + * @param request The actual request. + */ + public void validateOffsetCommitForGenericGroup( + GenericGroup group, + OffsetCommitRequestData request + ) throws KafkaException { + if (group.isInState(DEAD)) { + throw Errors.COORDINATOR_NOT_AVAILABLE.exception(); + } + + if (request.generationIdOrMemberEpoch() < 0 && group.isInState(EMPTY)) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + Optional<String> groupInstanceId = OffsetCommitRequest.groupInstanceId(request); + if (request.generationIdOrMemberEpoch() >= 0 || !request.memberId().isEmpty() || groupInstanceId.isPresent()) { + // We are validating three things: + // 1. If the `groupInstanceId` is present, then it exists and is mapped to `memberId`; + // 2. The `memberId` exists in the group; and + // 3. The `generationId` matches the current generation id. + if (groupInstanceId.isPresent()) { + String memberId = group.staticMemberId(groupInstanceId.get()); + if (memberId == null) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } else if (!request.memberId().equals(memberId)) { + throw Errors.FENCED_INSTANCE_ID.exception(); + } + } + + if (!group.hasMemberId(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (request.generationIdOrMemberEpoch() != group.generationId()) { + throw Errors.ILLEGAL_GENERATION.exception(); + } + } else if (!group.isInState(EMPTY)) { + // If the request does not contain the member id and the generation + // id (version 0), offset commits are only accepted when the group + // is empty. + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + if (group.isInState(COMPLETING_REBALANCE)) { + // We should not receive a commit request if the group has not completed rebalance; + // but since the consumer's member.id and generation is valid, it means it has received + // the latest group generation information from the JoinResponse. + // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully. + throw Errors.REBALANCE_IN_PROGRESS.exception(); + } + } + + /** + * Validates an OffsetCommit request for a consumer group. + * + * @param group The consumer group. + * @param context The request context. + * @param request The actual request. + */ + public void validateOffsetCommitForConsumerGroup( + ConsumerGroup group, + RequestContext context, + OffsetCommitRequestData request + ) throws KafkaException { + if (request.generationIdOrMemberEpoch() < 0 && group.members().isEmpty()) { + // When the generation id is -1, the request comes from either the admin client + // or a consumer which does not use the group management facility. In this case, + // the request can commit offsets if the group is empty. + return; + } + + if (!group.hasMember(request.memberId())) { + throw Errors.UNKNOWN_MEMBER_ID.exception(); + } + + final int memberEpoch = group.getOrMaybeCreateMember(request.memberId(), false).memberEpoch(); + if (request.generationIdOrMemberEpoch() != memberEpoch) { + // Consumers using the new consumer group protocol (KIP-848) should be using the + // OffsetCommit API >= 9. As we don't support upgrading from the old to the new + // protocol yet, we return an UNSUPPORTED_VERSION error if an older version is + // used. We will revise this when the upgrade path is implemented. + if (context.header.apiVersion() >= 9) { + throw Errors.STALE_MEMBER_EPOCH.exception(); + } else { + throw Errors.UNSUPPORTED_VERSION.exception(); + } + } + } + + /** + * Handles an OffsetCommit request. + * + * @param context The request context. + * @param request The OffsetCommit request. + * + * @return A Result containing the OffsetCommitResponseData response and + * a list of records to update the state machine. + */ + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + RequestContext context, + OffsetCommitRequestData request + ) throws ApiException { + validateOffsetCommit(context, request); + + final long currentTimeMs = time.milliseconds(); + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + final OptionalLong expireTimestampMs = request.retentionTimeMs() == OffsetCommitRequest.DEFAULT_RETENTION_TIME ? + OptionalLong.empty() : OptionalLong.of(currentTimeMs + request.retentionTimeMs()); + final OffsetCommitResponseData response = new OffsetCommitResponseData(); + final List<Record> records = new ArrayList<>(); + + request.topics().forEach(topic -> { + final OffsetCommitResponseData.OffsetCommitResponseTopic topicResponse = + new OffsetCommitResponseData.OffsetCommitResponseTopic() + .setName(topic.name()); + response.topics().add(topicResponse); + + topic.partitions().forEach(partition -> { + if (partition.committedMetadata() != null && partition.committedMetadata().length() > offsetMetadataMaxSize) { + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code())); + } else { + log.debug("[GroupId {}] Committing offsets {} for partition {}-{} from member {} with leader epoch {}.", + request.groupId(), partition.committedOffset(), topic.name(), partition.partitionIndex(), + request.memberId(), partition.committedLeaderEpoch()); + + topicResponse.partitions().add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(Errors.NONE.code())); + + final OptionalInt leaderEpoch = partition.committedLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ? + OptionalInt.empty() : OptionalInt.of(partition.committedLeaderEpoch()); + final String metadata = partition.committedMetadata() == null ? + OffsetAndMetadata.NO_METADATA : partition.committedMetadata(); + final long commitTimestampMs = partition.commitTimestamp() == OffsetCommitRequest.DEFAULT_TIMESTAMP ? + currentTimeMs : partition.commitTimestamp(); + + final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( + partition.committedOffset(), + leaderEpoch, + metadata, + commitTimestampMs, + expireTimestampMs + ); + + records.add(RecordHelpers.newOffsetCommitRecord( Review Comment: ah right. thanks for the clarification. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -0,0 +1,1113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.IllegalGenerationException; +import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.errors.StaleMemberEpochException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.JoinGroupRequestData; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.message.OffsetCommitResponseData; +import org.apache.kafka.common.network.ClientInformation; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; +import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generic.GenericGroup; +import org.apache.kafka.coordinator.group.generic.GenericGroupMember; +import org.apache.kafka.coordinator.group.generic.GenericGroupState; +import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class OffsetMetadataManagerTest { + static class OffsetMetadataManagerTestContext { + public static class Builder { + final private MockTime time = new MockTime(); + final private LogContext logContext = new LogContext(); + final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + private MetadataImage metadataImage = null; + private int offsetMetadataMaxSize = 4096; + + Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { + this.offsetMetadataMaxSize = offsetMetadataMaxSize; + return this; + } + + OffsetMetadataManagerTestContext build() { + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + + GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder() + .withTime(time) + .withTimer(new MockCoordinatorTimer<>(time)) + .withSnapshotRegistry(snapshotRegistry) + .withLogContext(logContext) + .withMetadataImage(metadataImage) + .withTopicPartition(new TopicPartition("__consumer_offsets", 0)) + .withAssignors(Collections.singletonList(new RangeAssignor())) + .build(); + + OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder() + .withTime(time) + .withLogContext(logContext) + .withSnapshotRegistry(snapshotRegistry) + .withMetadataImage(metadataImage) + .withGroupMetadataManager(groupMetadataManager) + .withOffsetMetadataMaxSize(offsetMetadataMaxSize) + .build(); + + return new OffsetMetadataManagerTestContext( + time, + snapshotRegistry, + groupMetadataManager, + offsetMetadataManager + ); + } + } + + final MockTime time; + final SnapshotRegistry snapshotRegistry; + final GroupMetadataManager groupMetadataManager; + final OffsetMetadataManager offsetMetadataManager; + + long lastCommittedOffset = 0L; + long lastWrittenOffset = 0L; + + OffsetMetadataManagerTestContext( + MockTime time, + SnapshotRegistry snapshotRegistry, + GroupMetadataManager groupMetadataManager, + OffsetMetadataManager offsetMetadataManager + ) { + this.time = time; + this.snapshotRegistry = snapshotRegistry; + this.groupMetadataManager = groupMetadataManager; + this.offsetMetadataManager = offsetMetadataManager; + } + + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + OffsetCommitRequestData request + ) { + return commitOffset(ApiKeys.OFFSET_COMMIT.latestVersion(), request); + } + + public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset( + short version, + OffsetCommitRequestData request + ) { + snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset); + + RequestContext context = new RequestContext( + new RequestHeader( + ApiKeys.OFFSET_COMMIT, + version, + "client", + 0 + ), + "1", + InetAddress.getLoopbackAddress(), + KafkaPrincipal.ANONYMOUS, + ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), + SecurityProtocol.PLAINTEXT, + ClientInformation.EMPTY, + false + ); + + CoordinatorResult<OffsetCommitResponseData, Record> result = offsetMetadataManager.commitOffset( + context, + request + ); + + result.records().forEach(this::replay); + return result; + } + + private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) { + if (apiMessageAndVersion == null) { + return null; + } else { + return apiMessageAndVersion.message(); + } + } + + private void replay( + Record record + ) { + ApiMessageAndVersion key = record.key(); + ApiMessageAndVersion value = record.value(); + + if (key == null) { + throw new IllegalStateException("Received a null key in " + record); + } + + switch (key.version()) { + case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION: + offsetMetadataManager.replay( + (OffsetCommitKey) key.message(), + (OffsetCommitValue) messageOrNull(value) + ); + break; + + default: + throw new IllegalStateException("Received an unknown record type " + key.version() + + " in " + record); + } + + lastWrittenOffset++; + } + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testOffsetCommitWithUnknownGroup(short version) { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + Class<? extends Throwable> expectedType; + if (version >= 9) { + expectedType = GroupIdNotFoundException.class; + } else { + expectedType = IllegalGenerationException.class; + } + + // Verify that the request is rejected with the correct exception. + assertThrows(expectedType, () -> context.commitOffset( + version, + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithDeadGroup() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create a dead group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + group.transitionTo(GenericGroupState.DEAD); + + // Verify that the request is rejected with the correct exception. + assertThrows(CoordinatorNotAvailableException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithUnknownMemberId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithIllegalGeneration() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(new GenericGroupMember( + "member", + Optional.of("new-instance-id"), + "client-id", + "host", + 5000, + 5000, + "consumer", + new JoinGroupRequestData.JoinGroupRequestProtocolCollection( + Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(new byte[0]) + ).iterator() + ) + )); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + + // Verify that the request is rejected with the correct exception. + assertThrows(IllegalGenerationException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithUnknownInstanceId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGroupInstanceId("instanceid") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithFencedInstanceId() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member with static id. + group.add(new GenericGroupMember( + "member", + Optional.of("new-instance-id"), + "client-id", + "host", + 5000, + 5000, + "consumer", + new JoinGroupRequestData.JoinGroupRequestProtocolCollection( + Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(new byte[0]) + ).iterator() + ) + )); + + // Verify that the request is rejected with the correct exception. + assertThrows(UnknownMemberIdException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGroupInstanceId("old-instance-id") + .setGenerationIdOrMemberEpoch(10) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWhileInCompletingRebalanceState() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(new GenericGroupMember( + "member", + Optional.of("new-instance-id"), + "client-id", + "host", + 5000, + 5000, + "consumer", + new JoinGroupRequestData.JoinGroupRequestProtocolCollection( + Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(new byte[0]) + ).iterator() + ) + )); + + // Transition to next generation. + group.transitionTo(GenericGroupState.PREPARING_REBALANCE); + group.initNextGeneration(); + assertEquals(1, group.generationId()); + + // Verify that the request is rejected with the correct exception. + assertThrows(RebalanceInProgressException.class, () -> context.commitOffset( + new OffsetCommitRequestData() + .setGroupId("foo") + .setMemberId("member") + .setGenerationIdOrMemberEpoch(1) + .setTopics(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestTopic() + .setName("bar") + .setPartitions(Collections.singletonList( + new OffsetCommitRequestData.OffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100L) + )) + )) + ) + ); + } + + @Test + public void testGenericGroupOffsetCommitWithoutMemberIdAndGeneration() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Create an empty group. + GenericGroup group = context.groupMetadataManager.getOrMaybeCreateGenericGroup( + "foo", + true + ); + + // Add member. + group.add(new GenericGroupMember( + "member", + Optional.of("new-instance-id"), + "client-id", + "host", + 5000, + 5000, + "consumer", + new JoinGroupRequestData.JoinGroupRequestProtocolCollection( + Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol() + .setName("range") + .setMetadata(new byte[0]) + ).iterator() + ) + )); Review Comment: nit: should we add a helper method? i think we only care about the member id and group instance id in these tests right ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java: ########## @@ -239,6 +283,14 @@ public void replay(Record record) throws RuntimeException { ApiMessageAndVersion value = record.value(); switch (key.version()) { + case 0: Review Comment: yeah, i assume that we were going to use the value record version only for offset commits then we started adding other record types -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org