C0urante commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1653168584
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ########## @@ -16,6 +16,9 @@ */ package org.apache.kafka.connect.mirror; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; Review Comment: We've started enforcing an official import order with https://issues.apache.org/jira/browse/KAFKA-10787 (the `:connect:mirror` module was touched on in https://github.com/apache/kafka/pull/16299). I think changes like this are causing the CI build to fail; can you sort out the import ordering on this PR? IIRC you can do it automatically with `./gradlew :connect:mirror:spotlessApply`. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +169,34 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); + Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + + List<ConfigValue> invalidConfigs = new ArrayList<>(); + if (!emitCheckpointsValue && !syncGroupOffsetsValue) { + ConfigValue syncGroupOffsets = new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED); + ConfigValue emitCheckpoints = new ConfigValue(EMIT_CHECKPOINTS_ENABLED); + + String errorMessage = "MirrorCheckpointConnector can't run without both" + SYNC_GROUP_OFFSETS_ENABLED + ", " + Review Comment: IMO we should keep these as errors. Preflight connector validation checks are super cheap and quick; as long as we give users actionable steps on how to correct their configs, this would (IMO) save more time than it costs. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -197,79 +183,18 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); - TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); - long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); - long downstreamOffset = metadata.offset(); - maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset); - // We may be able to immediately publish an offset sync that we've queued up here - firePendingOffsetSyncs(); - } - - // updates partition state and queues up OffsetSync if necessary - private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, - long downstreamOffset) { - PartitionState partitionState = - partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); - OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); - if (partitionState.update(upstreamOffset, downstreamOffset)) { - // Queue this sync for an immediate send, as downstream state is sufficiently stale - synchronized (this) { - delayedOffsetSyncs.remove(topicPartition); - pendingOffsetSyncs.put(topicPartition, offsetSync); - } - partitionState.reset(); - } else { - // Queue this sync to be delayed until the next periodic offset commit - synchronized (this) { - delayedOffsetSyncs.put(topicPartition, offsetSync); - } + // Queue offset syncs only when offsetWriter is available + if (offsetSyncWriter != null) { + TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); + long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); + long downstreamOffset = metadata.offset(); + MirrorSourceTask.PartitionState partitionState = + partitionStates.computeIfAbsent(topicPartition, x -> new MirrorSourceTask.PartitionState(offsetSyncWriter.maxOffsetLag())); Review Comment: Nit: ```suggestion PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(offsetSyncWriter.maxOffsetLag())); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.mirror; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; + +/** + * Used internally by MirrorMaker to write, queued, and promote the new translated offsets into offset-syncs topic. + */ +class OffsetSyncWriter implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(OffsetSyncWriter.class); + private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10; + + private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new LinkedHashMap<>(); + private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>(); + private final Semaphore outstandingOffsetSyncs; + private final KafkaProducer<byte[], byte[]> offsetProducer; + private final String offsetSyncsTopic; + protected final long maxOffsetLag; Review Comment: Nit: why upgrade the visibility to `protected`? Was `private` when used in the `MirrorSourceTask` class. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.mirror; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; + +/** + * Used internally by MirrorMaker to write, queued, and promote the new translated offsets into offset-syncs topic. + */ +class OffsetSyncWriter implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(OffsetSyncWriter.class); + private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10; + + private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new LinkedHashMap<>(); + private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>(); + private final Semaphore outstandingOffsetSyncs; + private final KafkaProducer<byte[], byte[]> offsetProducer; + private final String offsetSyncsTopic; + protected final long maxOffsetLag; + + public OffsetSyncWriter(MirrorSourceTaskConfig config) { + outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS); + offsetSyncsTopic = config.offsetSyncsTopic(); + offsetProducer = MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig()); + maxOffsetLag = config.maxOffsetLag(); + } + + public OffsetSyncWriter(KafkaProducer<byte[], byte[]> producer, Review Comment: ```suggestion // Visible for testing OffsetSyncWriter(KafkaProducer<byte[], byte[]> producer, ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -197,79 +183,18 @@ public void commitRecord(SourceRecord record, RecordMetadata metadata) { long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); - TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); - long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); - long downstreamOffset = metadata.offset(); - maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset); - // We may be able to immediately publish an offset sync that we've queued up here - firePendingOffsetSyncs(); - } - - // updates partition state and queues up OffsetSync if necessary - private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, - long downstreamOffset) { - PartitionState partitionState = - partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); - OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); - if (partitionState.update(upstreamOffset, downstreamOffset)) { - // Queue this sync for an immediate send, as downstream state is sufficiently stale - synchronized (this) { - delayedOffsetSyncs.remove(topicPartition); - pendingOffsetSyncs.put(topicPartition, offsetSync); - } - partitionState.reset(); - } else { - // Queue this sync to be delayed until the next periodic offset commit - synchronized (this) { - delayedOffsetSyncs.put(topicPartition, offsetSync); - } + // Queue offset syncs only when offsetWriter is available + if (offsetSyncWriter != null) { + TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); + long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); + long downstreamOffset = metadata.offset(); + MirrorSourceTask.PartitionState partitionState = + partitionStates.computeIfAbsent(topicPartition, x -> new MirrorSourceTask.PartitionState(offsetSyncWriter.maxOffsetLag())); Review Comment: Actually, if it's not too much of a PITA, could we move the `partitionStates` field and the `PartitionState` class into the `OffsetSyncWriter` class? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java: ########## @@ -290,6 +290,11 @@ protected static ConfigDef config() { .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC) .define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, Importance.HIGH, ENABLE_INTERNAL_REST_DOC) .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC) + .define(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, Review Comment: The `MirrorMakerConfig` class is for properties related to dedicated mode. If we want to establish properties that are recognized and shared by multiple MM2 connectors, it's enough to do that in the `MirrorConnectorConfig` base class (which IIUC is already done, so this part can be removed safely). ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.mirror; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; + +/** + * Used internally by MirrorMaker to write, queued, and promote the new translated offsets into offset-syncs topic. Review Comment: ```suggestion * Used internally by MirrorMaker to write translated offsets into offset-syncs topic, with some buffering logic to limit the number of in-flight records. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org