rhauch commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r779947401
########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java ########## @@ -38,4 +38,29 @@ * Get the OffsetStorageReader for this SourceTask. */ OffsetStorageReader offsetStorageReader(); + + /** + * Get a {@link TransactionContext} that can be used to define producer transaction boundaries + * when exactly-once support is enabled for the connector. + * + * <p>This method was added in Apache Kafka 3.0. Source tasks that use this method but want to + * maintain backward compatibility so they can also be deployed to older Connect runtimes + * should guard the call to this method with a try-catch block, since calling this method will result in a + * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to + * Connect runtimes older than Kafka 3.0. For example: + * <pre> + * TransactionContext transactionContext; + * try { + * transactionContext = context.transactionContext(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * transactionContext = null; + * } + * </pre> + * + * @return the transaction context, or null if the connector was not configured to specify transaction boundaries + * @since 3.0 Review comment: Please change all of these `@since 3.0` to `@since 3.1`, plus any other `Kafka 3.0` references in JavaDoc. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -125,28 +130,28 @@ public KafkaBasedLog(String topic, * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until * {@link #start()} is invoked. * - * @param topic the topic to treat as a log - * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the number of acks, will be overridden to ensure correct behavior of this * class. - * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the auto offset reset policy, will be overridden to ensure correct * behavior of this class. - * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled + * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled * by the calling component; may not be null - * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log - * @param time Time interface - * @param initializer the function that should be run when this log is {@link #start() started}; may be null + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + * @param initializer the function that should be run when this log is {@link #start() started}; may be null */ public KafkaBasedLog(String topic, - Map<String, Object> producerConfigs, - Map<String, Object> consumerConfigs, - Supplier<TopicAdmin> topicAdminSupplier, - Callback<ConsumerRecord<K, V>> consumedCallback, - Time time, - java.util.function.Consumer<TopicAdmin> initializer) { + Map<String, Object> producerConfigs, + Map<String, Object> consumerConfigs, + Supplier<TopicAdmin> topicAdminSupplier, + Callback<ConsumerRecord<K, V>> consumedCallback, + Time time, + java.util.function.Consumer<TopicAdmin> initializer) { Review comment: Why make these unnecessary changes? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java ########## @@ -74,15 +75,15 @@ private volatile boolean cancelled; // indicates whether the Worker has cancelled the connector (e.g. because of slow shutdown) private State state; - private final OffsetStorageReader offsetStorageReader; + private final CloseableOffsetStorageReader offsetStorageReader; public WorkerConnector(String connName, Connector connector, ConnectorConfig connectorConfig, CloseableConnectorContext ctx, ConnectMetrics metrics, ConnectorStatus.Listener statusListener, - OffsetStorageReader offsetStorageReader, + CloseableOffsetStorageReader offsetStorageReader, Review comment: As mentioned above, IIUC this PR will sometimes pass a null `offsetStorageReader` (IIRC for sink connectors), but this class currently expects that to be null. Might be worth adding a `Objects.requireNonNull(...)` call here to help catch that situation. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,39 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * For backwards compatibility, the default implementation will return {@code null}, but connector developers are Review comment: Nit: using a new paragraph makes this stand out more. ```suggestion * <p>For backwards compatibility, the default implementation will return {@code null}, but connector developers are ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +/** + * Provided to source tasks to allow them to define their own producer transaction boundaries when + * exactly-once support is enabled. + */ +public interface TransactionContext { Review comment: IIUC, the `WorkerTransactionContext` is the only implementation of this. That means that if a connector is configured with `transaction.boundary=poll` or `transaction.boundary=interval`, a poorly-implemented connector could still call these methods and they'd unnecessarily accumulate records. WDYT about in such cases the `SourceTaskContext#transactionContext()` method returning a no-op implementation of this interface, so no harm is done if a connector implementation still calls these methods when `transaction.boundary` is _not_ set to `connector`? Maybe we could consider a warning log message if these methods are called by a connector inappropriately. But we have to be careful. While such log messages might be useful for the **developer** of a connector plugin, I would argue that _prolific_ warnings are actually harmful for a **user** trying to _use_ a connector plugin they didn't develop with a connector configuration that includes `transaction.boundary=poll` or `transaction.boundary=interval`. So maybe it's worthwhile for the "no-op" implementation to only log each warning once per method per instance. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java ########## @@ -294,4 +298,41 @@ public void shouldRemoveCompactionFromStatusTopicSettings() { assertEquals(expectedTopicSettings, actual); assertNotEquals(topicSettings, actual); } + + @Test + public void shouldIdentifyNeedForTransactionalLeader() { + Map<String, String> workerProps = configs(); + + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled"); + assertFalse(new DistributedConfig(workerProps).transactionalLeaderEnabled()); + + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); + assertTrue(new DistributedConfig(workerProps).transactionalLeaderEnabled()); + + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + assertTrue(new DistributedConfig(workerProps).transactionalLeaderEnabled()); + } + + @Test + public void shouldConstructExpectedTransactionalId() { + Map<String, String> workerProps = configs(); + + workerProps.put(GROUP_ID_CONFIG, "why did i stay up all night writing unit tests"); + assertEquals( + "connect-cluster-why did i stay up all night writing unit tests", + new DistributedConfig(workerProps).transactionalProducerId() + ); + + workerProps.put(GROUP_ID_CONFIG, "connect-cluster"); + assertEquals( + "connect-cluster-connect-cluster", + new DistributedConfig(workerProps).transactionalProducerId() + ); + + workerProps.put(GROUP_ID_CONFIG, "\u2603"); + assertEquals( + "connect-cluster-\u2603", + new DistributedConfig(workerProps).transactionalProducerId() + ); + } Review comment: Let's add some negative tests that verify that invalid values for the new worker configuration properties are properly handled/identified by the validators. We should also add positive and negative tests for the new connector-level config properties in `SourceConnectorConfigTest` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -365,6 +413,10 @@ private void readToLogEnd() { // This may happen with really old brokers that don't support the auto topic creation // field in metadata requests log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage()); + if (requireAdminForOffsets) { + // Should be handled by the caller during log startup + throw e; + } Review comment: It's true that the `DistributedHerder.run()` is ultimately catching and handling this exception. But I feel like many users might not understand the significance of such an error nor how to correct their configuration. Rather than just re-throw that exception, we should probably wrap that exception with one that has a more instructive message, such as something like: > Enabling exactly once for source connectors requires a Kafka broker version that allows admin clients to read consumer offsets. Disable the worker's exactly once support for source connectors, or use a newer Kafka broker version. Plus, should this if block be before the `log.debug(...)` on the previous line? Seems like that log message might just confuse the situation since the worker will not read "to the end of log offsets with consumer". ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -264,12 +279,62 @@ public void startConnector( log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); - final ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector) - ? new SinkConnectorConfig(plugins, connProps) - : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + final ConnectorConfig connConfig; + final CloseableOffsetStorageReader offsetReader; + if (ConnectUtils.isSinkConnector(connector)) { + connConfig = new SinkConnectorConfig(plugins, connProps); + offsetReader = null; Review comment: IIUC, this changes the behavior of the `WorkerConnector` created below. Prior to this PR, the `WorkerConnector` was always created with the `Worker.offsetBackingStore`, even for sink connectors. However, with this PR, the `WorkerConnector` will be instantiated with a null `offsetReader` parameter, which will cause a NPE in `WorkerConnector#doShutdown()` and `WorkerConnector#cancel()` since `WorkerConnector` does not check for a null parameter there. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +public class ConnectorOffsetBackingStore implements OffsetBackingStore { Review comment: Please add JavaDoc that explains the purpose of this class, and in particular, what the two modes are that correspond to the two static factory methods. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +public class ConnectorOffsetBackingStore implements OffsetBackingStore { + + private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class); + + private final Time time; + private final Supplier<LoggingContext> loggingContext; + private final String primaryOffsetsTopic; + private final OffsetBackingStore workerStore; + private final Optional<OffsetBackingStore> connectorStore; + private final Optional<TopicAdmin> connectorStoreAdmin; + + public static ConnectorOffsetBackingStore withConnectorOffsetStore( Review comment: Nit: move these two static factory methods above the non-static member variables, so all static and non-static members are together. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -192,6 +199,44 @@ public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests"; public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT); + private enum ExactlyOnceSourceSupport { + DISABLED(false), + PREPARING(true), + ENABLED(true); + + public final boolean usesTransactionalLeader; + + ExactlyOnceSourceSupport(boolean usesTransactionalLeader) { + this.usesTransactionalLeader = usesTransactionalLeader; + } + + public static List<String> options() { + return Stream.of(values()).map(ExactlyOnceSourceSupport::toString).collect(Collectors.toList()); + } + + public static ExactlyOnceSourceSupport fromProperty(String property) { + return ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT)); + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } + + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support"; + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster " + + "by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. " Review comment: Nit, to improve readability and precision, especially around how many Kafka transactions would be used: > Whether to enable exactly-once support for source connectors in the cluster by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ########## @@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) { } } + /** + * Log a warning when the user attempts to override a property that cannot be overridden. + * @param props the configuration properties provided by the user + * @param key the name of the property to check on + * @param expectedValue the expected value for the property + * @param justification the reason the property cannot be overridden. + * Will follow the phrase "The value... for the... property will be ignored as it cannot be overridden ". + * For example, one might supply the message "in connectors with the DLQ feature enabled" for this parameter. + * @param caseSensitive whether the value should match case-insensitively + */ + public static void warnOnOverriddenProperty( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + overriddenPropertyWarning(props, key, expectedValue, justification, caseSensitive).ifPresent(log::warn); + } + + // Visible for testing + static Optional<String> overriddenPropertyWarning( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + Predicate<String> matchesExpectedValue = caseSensitive ? expectedValue::equals : expectedValue::equalsIgnoreCase; + String value = Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null); + if (value != null && !matchesExpectedValue.test(value)) { + return Optional.of(String.format( + "The value '%s' for the '%s' property will be ignored as it cannot be overridden%s. " + + "The value '%s' will be used instead.", Review comment: Is this really true that all of these incorrectly-overridden property values are ignored? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -264,12 +279,62 @@ public void startConnector( log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); - final ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector) - ? new SinkConnectorConfig(plugins, connProps) - : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + final ConnectorConfig connConfig; + final CloseableOffsetStorageReader offsetReader; + if (ConnectUtils.isSinkConnector(connector)) { + connConfig = new SinkConnectorConfig(plugins, connProps); + offsetReader = null; + } else { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + connConfig = sourceConfig; + + String connectorOffsetsTopic = null; + if (sourceConfig.offsetsTopic() != null) { + connectorOffsetsTopic = sourceConfig.offsetsTopic(); + } else if (config.exactlyOnceSourceEnabled()) { + connectorOffsetsTopic = config.offsetsTopic(); + } + Review comment: Suggestion: ```suggestion // Set up the offset backing store for this connector instance ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -678,10 +810,9 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, // and through to the task Map<String, Object> consumerProps = new HashMap<>(); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(id.connector())); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "connector-consumer-" + id); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, SinkUtils.consumerGroupId(connName)); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, defaultClientId); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers()); Review comment: Thanks! This should have been done quite some time ago. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -476,22 +541,95 @@ public boolean isRunning(String connName) { } /** - * Start a task managed by this worker. + * Start a sink task managed by this worker. + * + * @param id the task ID. + * @param configState the most recent {@link ClusterConfigState} known to the worker + * @param connProps the connector properties. + * @param taskProps the tasks properties. + * @param statusListener a listener for the runtime status transitions of the task. + * @param initialState the initial state of the connector. + * @return true if the task started successfully. + */ + public boolean startSinkTask( + ConnectorTaskId id, + ClusterConfigState configState, + Map<String, String> connProps, + Map<String, String> taskProps, + TaskStatus.Listener statusListener, + TargetState initialState + ) { + return startTask(id, connProps, taskProps, statusListener, + new SinkTaskBuilder(id, configState, statusListener, initialState)); + } + + /** + * Start a source task managed by this worker. Review comment: It's probably worth while to mention that this method starts the task for a source connector with older behavior (without exactly once support). ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -264,12 +279,62 @@ public void startConnector( log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); - final ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector) - ? new SinkConnectorConfig(plugins, connProps) - : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + final ConnectorConfig connConfig; + final CloseableOffsetStorageReader offsetReader; + if (ConnectUtils.isSinkConnector(connector)) { + connConfig = new SinkConnectorConfig(plugins, connProps); + offsetReader = null; + } else { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + connConfig = sourceConfig; + Review comment: Suggestion: ```suggestion // Use the desired topic for offsets ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { Review comment: I think adding some comments here will help future developers. There are enough subtleties here (e.g., the `||` in this line) that makes this more challenging than it should be to understand the behavior. Or, it might be worth adding some final booleans here to make these conditions a bit more clear and readable, especially since this method is a series of relatively independent parts/sections. For example: ``` final boolean customOffsetTopic = sourceConfig.offsetTopic(); final boolean createTopicsEnabled = config.topicCreationEnable() && sourceConfig.usesTopicCreation(); if (customOffsetTopic || createTopicsEnabled) { // Create an admin client ... if (createTopicsEnabled) { topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); } if (customOffsetTopic) { // Build custom offset store ... } } else { // No need for admin admin = null; } ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -692,21 +823,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId); // Connector-specified overrides Map<String, Object> consumerOverrides = - connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, + connectorClientConfigOverrides(connName, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER, connectorClientConfigOverridePolicy); consumerProps.putAll(consumerOverrides); return consumerProps; } - static Map<String, Object> adminConfigs(ConnectorTaskId id, + static Map<String, Object> adminConfigs(String connName, Review comment: Do we need to change these signatures from `ConnectorTaskId` to `String`? The `ConnectorTaskId` gives us the ability to define tasks-specific client configuration properties if necessary/desired. I'm afraid that switching to `String` will make it harder and more invasive to add that back in. Plus, if there's not a good reason to remove these, let's leave that for smaller PRs. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -342,6 +343,10 @@ private void logPluginPathConfigProviderWarning(Map<String, String> rawOriginals } } + public String bootstrapServers() { + return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG)); + } Review comment: Very nice. For this and the other new getter methods, can you add some JavaDoc just so that it's easier to follow in an IDE where these methods are _used_? I wanted that several times as I was reviewing this code. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ########## @@ -16,95 +16,46 @@ */ package org.apache.kafka.connect.runtime; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.header.internals.RecordHeaders; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.metrics.stats.Avg; -import org.apache.kafka.common.metrics.stats.CumulativeSum; -import org.apache.kafka.common.metrics.stats.Max; -import org.apache.kafka.common.metrics.stats.Rate; -import org.apache.kafka.common.metrics.stats.Value; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.RetriableException; -import org.apache.kafka.connect.header.Header; -import org.apache.kafka.connect.header.Headers; -import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup; -import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; -import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; +import org.apache.kafka.connect.storage.ClusterConfigState; +import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.storage.StatusBackingStore; -import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.TopicAdmin; -import org.apache.kafka.connect.util.TopicCreation; import org.apache.kafka.connect.util.TopicCreationGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.IdentityHashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; /** * WorkerTask that uses a SourceTask to ingest data into Kafka. */ -class WorkerSourceTask extends WorkerTask { +class WorkerSourceTask extends AbstractWorkerSourceTask { Review comment: Note to future me: I didn't get this far in the PR. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + admin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + if (sourceConfig.offsetsTopic() != null && config.connectorOffsetsTopicsPermitted()) { + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + // Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways + consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + sourceConfig.offsetsTopic(), + producer, + consumer, + admin + ); + } + } else { + admin = null; + } + offsetStore.configure(config); + + CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + + // Note we pass the configState as it performs dynamic transformations under the covers + return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, + headerConverter, transformationChain, producer, admin, topicCreationGroups, + offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, + retryWithToleranceOperator, herder.statusBackingStore(), executor); + } + } + + class ExactlyOnceSourceTaskBuilder extends TaskBuilder { + private final Runnable preProducerCheck; + private final Runnable postProducerCheck; + + public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + Runnable preProducerCheck, + Runnable postProducerCheck) { + super(id, configState, statusListener, initialState); + this.preProducerCheck = preProducerCheck; + this.postProducerCheck = postProducerCheck; + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, Review comment: In my comment about `SourceTaskBuilder#doBuild`, I suggested adding a few comments to help the different parts/sections of this method stand out more clearly. The same would be true here, such as: ```suggestion // Create an admin client Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + admin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + if (sourceConfig.offsetsTopic() != null && config.connectorOffsetsTopicsPermitted()) { + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + // Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways + consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + sourceConfig.offsetsTopic(), + producer, + consumer, + admin + ); + } + } else { + admin = null; + } + offsetStore.configure(config); + + CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + + // Note we pass the configState as it performs dynamic transformations under the covers + return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, + headerConverter, transformationChain, producer, admin, topicCreationGroups, + offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, + retryWithToleranceOperator, herder.statusBackingStore(), executor); + } + } + + class ExactlyOnceSourceTaskBuilder extends TaskBuilder { + private final Runnable preProducerCheck; + private final Runnable postProducerCheck; + + public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + Runnable preProducerCheck, + Runnable postProducerCheck) { + super(id, configState, statusListener, initialState); + this.preProducerCheck = preProducerCheck; + this.postProducerCheck = postProducerCheck; + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + TopicAdmin topicAdmin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + String transactionalId = transactionalId(id); + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true", + "for connectors when exactly-once source support is enabled", + false + ); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId, + "for connectors when exactly-once source support is enabled", + true + ); + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + ConnectUtils.warnOnOverriddenProperty( + consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT), + "for connectors when exactly-once source support is enabled", + false + ); + consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + String offsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); + + ConnectorOffsetBackingStore offsetStore; + // No need to do secondary writes to the global offsets topic if we're certain that the task's local offset store + // is going to be targeting it anyway + // Note that this may lead to a false positive if the user provides an overridden bootstrap servers value for their + // producer that resolves to the same Kafka cluster; we might consider looking up the Kafka cluster ID in the future + // to prevent these false positives but at the moment this is probably adequate, especially since we probably don't + // want to put a ping to a remote Kafka cluster inside the herder's tick thread (which is where this logic takes place + // right now) in case that takes a while. Review comment: This is more of a nit-type comment. It seems like this comment tries to describe why we don't need to create a connector-specific offset store **AND** then talk about the check might not always be accurate. But I found the comment a bit hard to follow. Also, the `ConnectorOffsetBackingStore offsetStore;` line should move under the multi-line comment. In such cases, it might be worth changing the code to help make things more clear. For example: ``` // We can simply reuse the worker's offset store when the connector-specific offset topic // is the same as the worker's. We can check the offset topic name and the Kafka cluster's // bootstrap servers, although this isn't exact and can lead to some false positives if the user // provides an overridden bootstrap servers value for their producer that is different than // the worker's but still resolves to the same Kafka cluster used by the worker. // At the moment this is probably adequate, especially since we probably don't want to put // a network ping to a remote Kafka cluster inside the herder's tick thread (which is where this // logic takes place right now) in case that takes a while. ConnectorOffsetBackingStore offsetStore; final boolean sameOffsetTopicAsWorker = offsetsTopic.equals(config.offsetsTopic()) && producerProps.get(BOOTSTRAP_SERVERS_CONFIG).equals(config.bootstrapServers(); if (sameOffsetTopicAsWorker) { offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( ... } else { offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore( ... } ``` ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + public enum TransactionBoundary { + POLL, + INTERVAL, + CONNECTOR; + + public static final TransactionBoundary DEFAULT = POLL; + + public static List<String> options() { Review comment: I think the risk of introducing `options()` is that some developers might accidentally use `values()`. The pattern used in `ConnectorType` is far better, as it overrides the `toString()` method. That doesn't handle the case-independence for parsing, though `ConverterType` is a better pattern to follow if that's required. Let's be consistent with the new enums, and have each follow one of those two patterns depending upon whether parsing case-independently is required. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support, + * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the + * connector cannot. + */ + public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) { + return null; + } + + /** + * Signals whether the connector can define its own transaction boundaries with the proposed + * configuration. Developers must override this method if they wish to add connector-defined + * transaction boundary support; if they do not, users will be unable to create instances of + * this connector that use connector-defined transaction boundaries. The default implementation + * will return {@code UNSUPPORTED}. Review comment: WDYT about something like this: ``` /** * Signals whether the connector implementation is capable of defining the transaction boundaries for a * connector with the given configuration. This method is called before {@link #start(Map)}, only when the * runtime supports exactly-once and the connector configuration includes {@code transaction.boundary=connector}. * * <p>This method need not be implemented if the connector implementation does not support definiting * transaction boundaries. * * @param connectorConfig the configuration that will be used for the connector * @return {@link ConnectorTransactionBoundaries.SUPPORTED} if the connector will define its own transaction boundaries, * or {@link ConnectorTransactionBoundaries.UNSUPPORTED} otherwise. * @see TransactionContext */ ``` ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + admin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + if (sourceConfig.offsetsTopic() != null && config.connectorOffsetsTopicsPermitted()) { + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + // Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways + consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + sourceConfig.offsetsTopic(), + producer, + consumer, + admin + ); + } + } else { + admin = null; + } + offsetStore.configure(config); + + CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + + // Note we pass the configState as it performs dynamic transformations under the covers + return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, + headerConverter, transformationChain, producer, admin, topicCreationGroups, + offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, + retryWithToleranceOperator, herder.statusBackingStore(), executor); + } + } + + class ExactlyOnceSourceTaskBuilder extends TaskBuilder { + private final Runnable preProducerCheck; + private final Runnable postProducerCheck; + + public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + Runnable preProducerCheck, + Runnable postProducerCheck) { + super(id, configState, statusListener, initialState); + this.preProducerCheck = preProducerCheck; + this.postProducerCheck = postProducerCheck; + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + TopicAdmin topicAdmin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + String transactionalId = transactionalId(id); + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true", + "for connectors when exactly-once source support is enabled", + false + ); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId, + "for connectors when exactly-once source support is enabled", + true + ); + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); Review comment: Ah, so here's one example of how these properties cannot be overridden by the connector config. But that's not quite so obvious for some other calls to `ConnectUtils.warnOnOverriddenProperty`. Is it not feasible to reset the expected value on the props within the `ConnectUtils#warnOnOverriddenProperty` method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org