C0urante commented on code in PR #15999:
URL: https://github.com/apache/kafka/pull/15999#discussion_r1653168584


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##########
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;

Review Comment:
   We've started enforcing an official import order with 
https://issues.apache.org/jira/browse/KAFKA-10787 (the `:connect:mirror` module 
was touched on in https://github.com/apache/kafka/pull/16299). I think changes 
like this are causing the CI build to fail; can you sort out the import 
ordering on this PR? IIRC you can do it automatically with `./gradlew 
:connect:mirror:spotlessApply`.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java:
##########
@@ -166,6 +169,34 @@ Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    public List<ConfigValue> validate() {
+        Boolean emitCheckpointsValue = 
this.getBoolean(EMIT_CHECKPOINTS_ENABLED);
+        Boolean syncGroupOffsetsValue = 
this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED);
+
+        List<ConfigValue> invalidConfigs = new ArrayList<>();
+        if (!emitCheckpointsValue && !syncGroupOffsetsValue) {
+            ConfigValue syncGroupOffsets = new 
ConfigValue(SYNC_GROUP_OFFSETS_ENABLED);
+            ConfigValue emitCheckpoints = new 
ConfigValue(EMIT_CHECKPOINTS_ENABLED);
+
+            String errorMessage = "MirrorCheckpointConnector can't run without 
both" + SYNC_GROUP_OFFSETS_ENABLED + ", " +

Review Comment:
   IMO we should keep these as errors. Preflight connector validation checks 
are super cheap and quick; as long as we give users actionable steps on how to 
correct their configs, this would (IMO) save more time than it costs.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -197,79 +183,18 @@ public void commitRecord(SourceRecord record, 
RecordMetadata metadata) {
         long latency = System.currentTimeMillis() - record.timestamp();
         metrics.countRecord(topicPartition);
         metrics.replicationLatency(topicPartition, latency);
-        TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
-        long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
-        long downstreamOffset = metadata.offset();
-        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
-        // We may be able to immediately publish an offset sync that we've 
queued up here
-        firePendingOffsetSyncs();
-    }
-
-    // updates partition state and queues up OffsetSync if necessary
-    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long 
upstreamOffset,
-                                       long downstreamOffset) {
-        PartitionState partitionState =
-            partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(maxOffsetLag));
-        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, 
downstreamOffset);
-        if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            // Queue this sync for an immediate send, as downstream state is 
sufficiently stale
-            synchronized (this) {
-                delayedOffsetSyncs.remove(topicPartition);
-                pendingOffsetSyncs.put(topicPartition, offsetSync);
-            }
-            partitionState.reset();
-        } else {
-            // Queue this sync to be delayed until the next periodic offset 
commit
-            synchronized (this) {
-                delayedOffsetSyncs.put(topicPartition, offsetSync);
-            }
+        // Queue offset syncs only when offsetWriter is available
+        if (offsetSyncWriter != null) {
+            TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
+            long upstreamOffset = 
MirrorUtils.unwrapOffset(record.sourceOffset());
+            long downstreamOffset = metadata.offset();
+            MirrorSourceTask.PartitionState partitionState =
+                    partitionStates.computeIfAbsent(topicPartition, x -> new 
MirrorSourceTask.PartitionState(offsetSyncWriter.maxOffsetLag()));

Review Comment:
   Nit:
   ```suggestion
               PartitionState partitionState =
                       partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(offsetSyncWriter.maxOffsetLag()));
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Used internally by MirrorMaker to write, queued, and promote the new 
translated offsets into offset-syncs topic.
+ */
+class OffsetSyncWriter implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetSyncWriter.class);
+    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
+
+    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new 
LinkedHashMap<>();
+    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new 
LinkedHashMap<>();
+    private final Semaphore outstandingOffsetSyncs;
+    private final KafkaProducer<byte[], byte[]> offsetProducer;
+    private final String offsetSyncsTopic;
+    protected final long maxOffsetLag;

Review Comment:
   Nit: why upgrade the visibility to `protected`? Was `private` when used in 
the `MirrorSourceTask` class.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Used internally by MirrorMaker to write, queued, and promote the new 
translated offsets into offset-syncs topic.
+ */
+class OffsetSyncWriter implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(OffsetSyncWriter.class);
+    private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10;
+
+    private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new 
LinkedHashMap<>();
+    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new 
LinkedHashMap<>();
+    private final Semaphore outstandingOffsetSyncs;
+    private final KafkaProducer<byte[], byte[]> offsetProducer;
+    private final String offsetSyncsTopic;
+    protected final long maxOffsetLag;
+
+    public OffsetSyncWriter(MirrorSourceTaskConfig config) {
+        outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
+        offsetSyncsTopic = config.offsetSyncsTopic();
+        offsetProducer = 
MirrorUtils.newProducer(config.offsetSyncsTopicProducerConfig());
+        maxOffsetLag = config.maxOffsetLag();
+    }
+
+    public OffsetSyncWriter(KafkaProducer<byte[], byte[]> producer,

Review Comment:
   ```suggestion
       // Visible for testing
       OffsetSyncWriter(KafkaProducer<byte[], byte[]> producer,
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -197,79 +183,18 @@ public void commitRecord(SourceRecord record, 
RecordMetadata metadata) {
         long latency = System.currentTimeMillis() - record.timestamp();
         metrics.countRecord(topicPartition);
         metrics.replicationLatency(topicPartition, latency);
-        TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
-        long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
-        long downstreamOffset = metadata.offset();
-        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
-        // We may be able to immediately publish an offset sync that we've 
queued up here
-        firePendingOffsetSyncs();
-    }
-
-    // updates partition state and queues up OffsetSync if necessary
-    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long 
upstreamOffset,
-                                       long downstreamOffset) {
-        PartitionState partitionState =
-            partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(maxOffsetLag));
-        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, 
downstreamOffset);
-        if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            // Queue this sync for an immediate send, as downstream state is 
sufficiently stale
-            synchronized (this) {
-                delayedOffsetSyncs.remove(topicPartition);
-                pendingOffsetSyncs.put(topicPartition, offsetSync);
-            }
-            partitionState.reset();
-        } else {
-            // Queue this sync to be delayed until the next periodic offset 
commit
-            synchronized (this) {
-                delayedOffsetSyncs.put(topicPartition, offsetSync);
-            }
+        // Queue offset syncs only when offsetWriter is available
+        if (offsetSyncWriter != null) {
+            TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
+            long upstreamOffset = 
MirrorUtils.unwrapOffset(record.sourceOffset());
+            long downstreamOffset = metadata.offset();
+            MirrorSourceTask.PartitionState partitionState =
+                    partitionStates.computeIfAbsent(topicPartition, x -> new 
MirrorSourceTask.PartitionState(offsetSyncWriter.maxOffsetLag()));

Review Comment:
   Actually, if it's not too much of a PITA, could we move the 
`partitionStates` field and the `PartitionState` class into the 
`OffsetSyncWriter` class?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java:
##########
@@ -290,6 +290,11 @@ protected static ConfigDef config() {
                 .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, 
CLUSTERS_DOC)
                 .define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, 
Importance.HIGH, ENABLE_INTERNAL_REST_DOC)
                 .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, 
Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC)
+                .define(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED,

Review Comment:
   The `MirrorMakerConfig` class is for properties related to dedicated mode. 
If we want to establish properties that are recognized and shared by multiple 
MM2 connectors, it's enough to do that in the `MirrorConnectorConfig` base 
class (which IIUC is already done, so this part can be removed safely).



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncWriter.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.mirror;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Used internally by MirrorMaker to write, queued, and promote the new 
translated offsets into offset-syncs topic.

Review Comment:
   ```suggestion
    * Used internally by MirrorMaker to write translated offsets into 
offset-syncs topic, with some buffering logic to limit the number of in-flight 
records.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to