mimaison commented on code in PR #15999: URL: https://github.com/apache/kafka/pull/15999#discussion_r1639833636
########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +169,34 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); + Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + + List<ConfigValue> invalidConfigs = new ArrayList<>(); + if (!emitCheckpointsValue && !syncGroupOffsetsValue) { + ConfigValue syncGroupOffsets = new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED); + ConfigValue emitCheckpoints = new ConfigValue(EMIT_CHECKPOINTS_ENABLED); + + String errorMessage = "MirrorCheckpointConnector can't run without both" + SYNC_GROUP_OFFSETS_ENABLED + ", " + + EMIT_CHECKPOINTS_ENABLED + "set to false"; + syncGroupOffsets.addErrorMessage(errorMessage); + emitCheckpoints.addErrorMessage(errorMessage); + invalidConfigs.add(syncGroupOffsets); + invalidConfigs.add(emitCheckpoints); + } + + boolean configuredWithDependincesOnOffsetSyncs = emitCheckpointsValue || syncGroupOffsetsValue; Review Comment: There's a typo in `configuredWithDependincesOnOffsetSyncs`. I propose `requireOffsetSyncs` as an alternative name. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +169,34 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); + Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + + List<ConfigValue> invalidConfigs = new ArrayList<>(); + if (!emitCheckpointsValue && !syncGroupOffsetsValue) { + ConfigValue syncGroupOffsets = new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED); + ConfigValue emitCheckpoints = new ConfigValue(EMIT_CHECKPOINTS_ENABLED); + + String errorMessage = "MirrorCheckpointConnector can't run without both" + SYNC_GROUP_OFFSETS_ENABLED + ", " + Review Comment: I guess running the connector with both these feature disable does not make much sense, but is it an error? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +169,34 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); Review Comment: This can be boolean. Same below ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +169,34 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); + Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + + List<ConfigValue> invalidConfigs = new ArrayList<>(); + if (!emitCheckpointsValue && !syncGroupOffsetsValue) { + ConfigValue syncGroupOffsets = new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED); + ConfigValue emitCheckpoints = new ConfigValue(EMIT_CHECKPOINTS_ENABLED); + + String errorMessage = "MirrorCheckpointConnector can't run without both" + SYNC_GROUP_OFFSETS_ENABLED + ", " + + EMIT_CHECKPOINTS_ENABLED + "set to false"; + syncGroupOffsets.addErrorMessage(errorMessage); + emitCheckpoints.addErrorMessage(errorMessage); + invalidConfigs.add(syncGroupOffsets); + invalidConfigs.add(emitCheckpoints); + } + + boolean configuredWithDependincesOnOffsetSyncs = emitCheckpointsValue || syncGroupOffsetsValue; + if (!"true".equals(Optional.ofNullable(this.originals().get(EMIT_OFFSET_SYNCS_ENABLED)).orElse("true")) & configuredWithDependincesOnOffsetSyncs) { + ConfigValue emitOffsetSync = new ConfigValue(EMIT_OFFSET_SYNCS_ENABLED); + emitOffsetSync.addErrorMessage("MirrorCheckpointConnector can't run with " + EMIT_OFFSET_SYNCS_ENABLED + " set to false while, " + + EMIT_CHECKPOINTS_ENABLED + "and/or" + SYNC_GROUP_OFFSETS_ENABLED + "set to true"); Review Comment: We need spaces before and after `and/or` and before `set` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -430,11 +453,16 @@ void syncTopicConfigs() } private void createOffsetSyncsTopic() { - MirrorUtils.createSinglePartitionCompactedTopic( - config.offsetSyncsTopic(), - config.offsetSyncsTopicReplicationFactor(), - offsetSyncsAdminClient - ); + if (config.emitOffsetSyncEnabled()) { + Admin offsetSyncsAdminClient = config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()); Review Comment: `Admin` is `AutoCloseable` so we can use a try-with-resource block instead of explicitly closing the client below. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -131,7 +120,7 @@ public void stop() { log.warn("Interrupted waiting for access to consumer. Will try closing anyway."); } Utils.closeQuietly(consumer, "source consumer"); - Utils.closeQuietly(offsetProducer, "offset producer"); + Utils.closeQuietly(offsetSyncWriter, "offset producer"); Review Comment: Can we adjust the name too? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -534,6 +537,121 @@ private void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws Inter assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal"); } + @Test + public void testReplicationWithoutOffsetSync() throws Exception { Review Comment: That's a pretty big test just to check the offset-syncs and checkpoints topics don't get created. Could we simplify it? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java: ########## @@ -290,6 +290,11 @@ protected static ConfigDef config() { .define(CLUSTERS_CONFIG, Type.LIST, Importance.HIGH, CLUSTERS_DOC) .define(ENABLE_INTERNAL_REST_CONFIG, Type.BOOLEAN, false, Importance.HIGH, ENABLE_INTERNAL_REST_DOC) .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC) + .define(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, Review Comment: Why are we defining the config here? Having it in MirrorSourceConfig should be enough. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +169,34 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { + Boolean emitCheckpointsValue = this.getBoolean(EMIT_CHECKPOINTS_ENABLED); + Boolean syncGroupOffsetsValue = this.getBoolean(SYNC_GROUP_OFFSETS_ENABLED); + + List<ConfigValue> invalidConfigs = new ArrayList<>(); + if (!emitCheckpointsValue && !syncGroupOffsetsValue) { + ConfigValue syncGroupOffsets = new ConfigValue(SYNC_GROUP_OFFSETS_ENABLED); + ConfigValue emitCheckpoints = new ConfigValue(EMIT_CHECKPOINTS_ENABLED); + + String errorMessage = "MirrorCheckpointConnector can't run without both" + SYNC_GROUP_OFFSETS_ENABLED + ", " + + EMIT_CHECKPOINTS_ENABLED + "set to false"; Review Comment: We need spaces after `both` and before `set` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java: ########## @@ -166,6 +169,34 @@ Duration consumerPollTimeout() { return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS)); } + public List<ConfigValue> validate() { Review Comment: Can we add a test for this new method? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java: ########## @@ -220,6 +220,10 @@ boolean addSourceAliasToMetrics() { return getBoolean(ADD_SOURCE_ALIAS_TO_METRICS); } + boolean emitOffsetSyncEnabled() { Review Comment: Should we name it `emitOffsetSyncsEnabled()` to match the configuration? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -52,57 +48,47 @@ public class MirrorSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(MirrorSourceTask.class); - private static final int MAX_OUTSTANDING_OFFSET_SYNCS = 10; - private KafkaConsumer<byte[], byte[]> consumer; - private KafkaProducer<byte[], byte[]> offsetProducer; private String sourceClusterAlias; - private String offsetSyncsTopic; private Duration pollTimeout; - private long maxOffsetLag; private Map<TopicPartition, PartitionState> partitionStates; private ReplicationPolicy replicationPolicy; private MirrorSourceMetrics metrics; private boolean stopping = false; - private final Map<TopicPartition, OffsetSync> delayedOffsetSyncs = new LinkedHashMap<>(); - private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>(); - private Semaphore outstandingOffsetSyncs; private Semaphore consumerAccess; + private OffsetSyncWriter offsetSyncWriter; public MirrorSourceTask() {} // for testing MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorSourceMetrics metrics, String sourceClusterAlias, ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer, Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates, - String offsetSyncsTopic) { + String offsetSyncsTopic, boolean emitOffsetSyncEnabled) { Review Comment: Would it make sense to provide an `OffsetSyncWriter` instance instead of `KafkaProducer<byte[], byte[]>` to this constructor? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java: ########## @@ -51,6 +51,30 @@ public class MirrorCheckpointConnectorTest { private static final String CONSUMER_GROUP = "consumer-group-1"; private static final Map<String, ?> SOURCE_OFFSET = MirrorUtils.wrapOffset(0); + @Test + public void testEmitCheckpointsAndSyncGroupOffsetsBothDisabled() { + // disable the checkpoint emission + MirrorCheckpointConfig config = new MirrorCheckpointConfig( + makeProps("emit.checkpoints.enabled", "false", + "sync.group.offsets.enabled", "false")); + + Set<String> knownConsumerGroups = new HashSet<>(); + knownConsumerGroups.add(CONSUMER_GROUP); + assertMirrorCheckpointConnectorDisabled(new MirrorCheckpointConnector(knownConsumerGroups, config)); + } + + @Test + public void testEmitOffsetSyncsDisabled() { Review Comment: The value of this test is not clear. It's practically the same as the one just below. Basically all it does assert if that setting `sync.group.offsets.enabled=false` does not have any impact. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -234,6 +234,30 @@ public ConfigDef config() { @Override public org.apache.kafka.common.config.Config validate(Map<String, String> props) { List<ConfigValue> configValues = super.validate(props).configValues(); + validateExactlyOnceConfigs(props, configValues); + validateEmitOffsetSyncConfigs(props, configValues); + + return new org.apache.kafka.common.config.Config(configValues); + } + + private static void validateEmitOffsetSyncConfigs(Map<String, String> props, List<ConfigValue> configValues) { Review Comment: I'm not sure I understand why we have this method. Today you don't need to enable exactly once support for the connector to emit offset-syncs. I see no mention about this in the KIP, why are we making it a requirement ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org