rhauch commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r779947401



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
##########
@@ -38,4 +38,29 @@
      * Get the OffsetStorageReader for this SourceTask.
      */
     OffsetStorageReader offsetStorageReader();
+
+    /**
+     * Get a {@link TransactionContext} that can be used to define producer 
transaction boundaries
+     * when exactly-once support is enabled for the connector.
+     *
+     * <p>This method was added in Apache Kafka 3.0. Source tasks that use 
this method but want to
+     * maintain backward compatibility so they can also be deployed to older 
Connect runtimes
+     * should guard the call to this method with a try-catch block, since 
calling this method will result in a
+     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the 
source connector is deployed to
+     * Connect runtimes older than Kafka 3.0. For example:
+     * <pre>
+     *     TransactionContext transactionContext;
+     *     try {
+     *         transactionContext = context.transactionContext();
+     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *         transactionContext = null;
+     *     }
+     * </pre>
+     *
+     * @return the transaction context, or null if the connector was not 
configured to specify transaction boundaries
+     * @since 3.0

Review comment:
       Please change all of these `@since 3.0` to `@since 3.1`, plus any other 
`Kafka 3.0` references in JavaDoc.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -125,28 +130,28 @@ public KafkaBasedLog(String topic,
      * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
      * {@link #start()} is invoked.
      *
-     * @param topic              the topic to treat as a log
-     * @param producerConfigs    configuration options to use when creating 
the internal producer. At a minimum this must
+     * @param topic                     the topic to treat as a log
+     * @param producerConfigs           configuration options to use when 
creating the internal producer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the number of acks, will be 
overridden to ensure correct behavior of this
      *                           class.
-     * @param consumerConfigs    configuration options to use when creating 
the internal consumer. At a minimum this must
+     * @param consumerConfigs           configuration options to use when 
creating the internal consumer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the auto offset reset 
policy, will be overridden to ensure correct
      *                           behavior of this class.
-     * @param topicAdminSupplier supplier function for an admin client, the 
lifecycle of which is expected to be controlled
+     * @param topicAdminSupplier        supplier function for an admin client, 
the lifecycle of which is expected to be controlled
      *                           by the calling component; may not be null
-     * @param consumedCallback   callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
-     * @param time               Time interface
-     * @param initializer        the function that should be run when this log 
is {@link #start() started}; may be null
+     * @param consumedCallback          callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
+     * @param time                      Time interface
+     * @param initializer               the function that should be run when 
this log is {@link #start() started}; may be null
      */
     public KafkaBasedLog(String topic,
-            Map<String, Object> producerConfigs,
-            Map<String, Object> consumerConfigs,
-            Supplier<TopicAdmin> topicAdminSupplier,
-            Callback<ConsumerRecord<K, V>> consumedCallback,
-            Time time,
-            java.util.function.Consumer<TopicAdmin> initializer) {
+                         Map<String, Object> producerConfigs,
+                         Map<String, Object> consumerConfigs,
+                         Supplier<TopicAdmin> topicAdminSupplier,
+                         Callback<ConsumerRecord<K, V>> consumedCallback,
+                         Time time,
+                         java.util.function.Consumer<TopicAdmin> initializer) {

Review comment:
       Why make these unnecessary changes?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
##########
@@ -74,15 +75,15 @@
     private volatile boolean cancelled; // indicates whether the Worker has 
cancelled the connector (e.g. because of slow shutdown)
 
     private State state;
-    private final OffsetStorageReader offsetStorageReader;
+    private final CloseableOffsetStorageReader offsetStorageReader;
 
     public WorkerConnector(String connName,
                            Connector connector,
                            ConnectorConfig connectorConfig,
                            CloseableConnectorContext ctx,
                            ConnectMetrics metrics,
                            ConnectorStatus.Listener statusListener,
-                           OffsetStorageReader offsetStorageReader,
+                           CloseableOffsetStorageReader offsetStorageReader,

Review comment:
       As mentioned above, IIUC this PR will sometimes pass a null 
`offsetStorageReader` (IIRC for sink connectors), but this class currently 
expects that to be null. Might be worth adding a `Objects.requireNonNull(...)` 
call here to help catch that situation.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,39 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are

Review comment:
       Nit: using a new paragraph makes this stand out more.
   ```suggestion
        * <p>For backwards compatibility, the default implementation will 
return {@code null}, but connector developers are
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer 
transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {

Review comment:
       IIUC, the `WorkerTransactionContext` is the only implementation of this. 
That means that if a connector is configured with `transaction.boundary=poll` 
or `transaction.boundary=interval`, a poorly-implemented connector could still 
call these methods and they'd unnecessarily accumulate records.
   
   WDYT about in such cases the `SourceTaskContext#transactionContext()` method 
returning a no-op implementation of this interface, so no harm is done if a 
connector implementation still calls these methods when `transaction.boundary` 
is _not_ set to `connector`?
   
   Maybe we could consider a warning log message if these methods are called by 
a connector inappropriately. But we have to be careful. While such log messages 
might be useful for the **developer** of a connector plugin, I would argue that 
_prolific_ warnings are actually harmful for a **user** trying to _use_ a 
connector plugin they didn't develop with a connector configuration that 
includes `transaction.boundary=poll` or `transaction.boundary=interval`. So 
maybe it's worthwhile for the "no-op" implementation to only log each warning 
once per method per instance.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
##########
@@ -294,4 +298,41 @@ public void 
shouldRemoveCompactionFromStatusTopicSettings() {
         assertEquals(expectedTopicSettings, actual);
         assertNotEquals(topicSettings, actual);
     }
+
+    @Test
+    public void shouldIdentifyNeedForTransactionalLeader() {
+        Map<String, String> workerProps = configs();
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
+        assertFalse(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        assertTrue(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+        assertTrue(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+    }
+
+    @Test
+    public void shouldConstructExpectedTransactionalId() {
+        Map<String, String> workerProps = configs();
+
+        workerProps.put(GROUP_ID_CONFIG, "why did i stay up all night writing 
unit tests");
+        assertEquals(
+                "connect-cluster-why did i stay up all night writing unit 
tests",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+
+        workerProps.put(GROUP_ID_CONFIG, "connect-cluster");
+        assertEquals(
+                "connect-cluster-connect-cluster",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+
+        workerProps.put(GROUP_ID_CONFIG, "\u2603");
+        assertEquals(
+                "connect-cluster-\u2603",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+    }

Review comment:
       Let's add some negative tests that verify that invalid values for the 
new worker configuration properties are properly handled/identified by the 
validators.
   
   We should also add positive and negative tests for the new connector-level 
config properties in  `SourceConnectorConfigTest`

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -365,6 +413,10 @@ private void readToLogEnd() {
                 // This may happen with really old brokers that don't support 
the auto topic creation
                 // field in metadata requests
                 log.debug("Reading to end of log offsets with consumer since 
admin client is unsupported: {}", e.getMessage());
+                if (requireAdminForOffsets) {
+                    // Should be handled by the caller during log startup
+                    throw e;
+                }

Review comment:
       It's true that the `DistributedHerder.run()` is ultimately catching and 
handling this exception. But I feel like many users might not understand the 
significance of such an error nor how to correct their configuration. Rather 
than just re-throw that exception, we should probably wrap that exception with 
one that has a more instructive message, such as something like:
   > Enabling exactly once for source connectors requires a Kafka broker 
version that allows admin clients to read consumer offsets. Disable the 
worker's exactly once support for source connectors, or use a newer Kafka 
broker version.
   
   Plus, should this if block be before the `log.debug(...)` on the previous 
line? Seems like that log message might just confuse the situation since the 
worker will not read "to the end of log offsets with consumer".

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -264,12 +279,62 @@ public void startConnector(
 
                 log.info("Creating connector {} of type {}", connName, 
connClass);
                 final Connector connector = plugins.newConnector(connClass);
-                final ConnectorConfig connConfig = 
ConnectUtils.isSinkConnector(connector)
-                        ? new SinkConnectorConfig(plugins, connProps)
-                        : new SourceConnectorConfig(plugins, connProps, 
config.topicCreationEnable());
+                final ConnectorConfig connConfig;
+                final CloseableOffsetStorageReader offsetReader;
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    connConfig = new SinkConnectorConfig(plugins, connProps);
+                    offsetReader = null;

Review comment:
       IIUC, this changes the behavior of the `WorkerConnector` created below. 
Prior to this PR, the `WorkerConnector` was always created with the 
`Worker.offsetBackingStore`, even for sink connectors. 
   
   However, with this PR, the `WorkerConnector` will be instantiated with a 
null `offsetReader` parameter, which will cause a NPE in 
`WorkerConnector#doShutdown()` and `WorkerConnector#cancel()`  since 
`WorkerConnector` does not check for a null parameter there.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+public class ConnectorOffsetBackingStore implements OffsetBackingStore {

Review comment:
       Please add JavaDoc that explains the purpose of this class, and in 
particular, what the two modes are that correspond to the two static factory 
methods.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+public class ConnectorOffsetBackingStore implements OffsetBackingStore {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ConnectorOffsetBackingStore.class);
+
+    private final Time time;
+    private final Supplier<LoggingContext> loggingContext;
+    private final String primaryOffsetsTopic;
+    private final OffsetBackingStore workerStore;
+    private final Optional<OffsetBackingStore> connectorStore;
+    private final Optional<TopicAdmin> connectorStoreAdmin;
+
+    public static ConnectorOffsetBackingStore withConnectorOffsetStore(

Review comment:
       Nit: move these two static factory methods above the non-static member 
variables, so all static and non-static members are together.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -192,6 +199,44 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests";
     public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
+    private enum ExactlyOnceSourceSupport {
+        DISABLED(false),
+        PREPARING(true),
+        ENABLED(true);
+
+        public final boolean usesTransactionalLeader;
+
+        ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
+            this.usesTransactionalLeader = usesTransactionalLeader;
+        }
+
+        public static List<String> options() {
+            return 
Stream.of(values()).map(ExactlyOnceSourceSupport::toString).collect(Collectors.toList());
+        }
+
+        public static ExactlyOnceSourceSupport fromProperty(String property) {
+            return 
ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
+            + "by writing source records and their offsets in a Kafka 
transaction, and by proactively fencing out old task generations before 
bringing up new ones. "

Review comment:
       Nit, to improve readability and precision, especially around how many 
Kafka transactions would be used:
   > Whether to enable exactly-once support for source connectors in the 
cluster by using transactions to write source records and their source offsets, 
and by proactively fencing out old task generations before bringing up new ones.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) {
         }
     }
 
+    /**
+     * Log a warning when the user attempts to override a property that cannot 
be overridden.
+     * @param props the configuration properties provided by the user
+     * @param key the name of the property to check on
+     * @param expectedValue the expected value for the property
+     * @param justification the reason the property cannot be overridden.
+     *                      Will follow the phrase "The value... for the... 
property will be ignored as it cannot be overridden ".
+     *                      For example, one might supply the message "in 
connectors with the DLQ feature enabled" for this parameter.
+     * @param caseSensitive whether the value should match case-insensitively
+     */
+    public static void warnOnOverriddenProperty(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        overriddenPropertyWarning(props, key, expectedValue, justification, 
caseSensitive).ifPresent(log::warn);
+    }
+
+    // Visible for testing
+    static Optional<String> overriddenPropertyWarning(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        Predicate<String> matchesExpectedValue = caseSensitive ? 
expectedValue::equals : expectedValue::equalsIgnoreCase;
+        String value = 
Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null);
+        if (value != null && !matchesExpectedValue.test(value)) {
+            return Optional.of(String.format(
+                    "The value '%s' for the '%s' property will be ignored as 
it cannot be overridden%s. "
+                            + "The value '%s' will be used instead.",

Review comment:
       Is this really true that all of these incorrectly-overridden property 
values are ignored?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -264,12 +279,62 @@ public void startConnector(
 
                 log.info("Creating connector {} of type {}", connName, 
connClass);
                 final Connector connector = plugins.newConnector(connClass);
-                final ConnectorConfig connConfig = 
ConnectUtils.isSinkConnector(connector)
-                        ? new SinkConnectorConfig(plugins, connProps)
-                        : new SourceConnectorConfig(plugins, connProps, 
config.topicCreationEnable());
+                final ConnectorConfig connConfig;
+                final CloseableOffsetStorageReader offsetReader;
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    connConfig = new SinkConnectorConfig(plugins, connProps);
+                    offsetReader = null;
+                } else {
+                    SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+                    connConfig = sourceConfig;
+
+                    String connectorOffsetsTopic = null;
+                    if (sourceConfig.offsetsTopic() != null) {
+                        connectorOffsetsTopic = sourceConfig.offsetsTopic();
+                    } else if (config.exactlyOnceSourceEnabled()) {
+                        connectorOffsetsTopic = config.offsetsTopic();
+                    }
+

Review comment:
       Suggestion:
   ```suggestion
   
                       // Set up the offset backing store for this connector 
instance
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -678,10 +810,9 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
         // and through to the task
         Map<String, Object> consumerProps = new HashMap<>();
 
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
SinkUtils.consumerGroupId(id.connector()));
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, 
"connector-consumer-" + id);
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                  
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
SinkUtils.consumerGroupId(connName));
+        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, defaultClientId);
+        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
config.bootstrapServers());

Review comment:
       Thanks! This should have been done quite some time ago.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -476,22 +541,95 @@ public boolean isRunning(String connName) {
     }
 
     /**
-     * Start a task managed by this worker.
+     * Start a sink task managed by this worker.
+     *
+     * @param id the task ID.
+     * @param configState the most recent {@link ClusterConfigState} known to 
the worker
+     * @param connProps the connector properties.
+     * @param taskProps the tasks properties.
+     * @param statusListener a listener for the runtime status transitions of 
the task.
+     * @param initialState the initial state of the connector.
+     * @return true if the task started successfully.
+     */
+    public boolean startSinkTask(
+            ConnectorTaskId id,
+            ClusterConfigState configState,
+            Map<String, String> connProps,
+            Map<String, String> taskProps,
+            TaskStatus.Listener statusListener,
+            TargetState initialState
+    ) {
+        return startTask(id, connProps, taskProps, statusListener,
+                new SinkTaskBuilder(id, configState, statusListener, 
initialState));
+    }
+
+    /**
+     * Start a source task managed by this worker.

Review comment:
       It's probably worth while to mention that this method starts the task 
for a source connector with older behavior (without exactly once support).

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -264,12 +279,62 @@ public void startConnector(
 
                 log.info("Creating connector {} of type {}", connName, 
connClass);
                 final Connector connector = plugins.newConnector(connClass);
-                final ConnectorConfig connConfig = 
ConnectUtils.isSinkConnector(connector)
-                        ? new SinkConnectorConfig(plugins, connProps)
-                        : new SourceConnectorConfig(plugins, connProps, 
config.topicCreationEnable());
+                final ConnectorConfig connConfig;
+                final CloseableOffsetStorageReader offsetReader;
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    connConfig = new SinkConnectorConfig(plugins, connProps);
+                    offsetReader = null;
+                } else {
+                    SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+                    connConfig = sourceConfig;
+

Review comment:
       Suggestion:
   ```suggestion
   
                       // Use the desired topic for offsets
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {

Review comment:
       I think adding some comments here will help future developers. There are 
enough subtleties here (e.g., the `||` in this line) that makes this more 
challenging than it should be to understand the behavior. 
   
   Or, it might be worth adding some final booleans here to make these 
conditions a bit more clear and readable, especially since this method is a 
series of relatively independent parts/sections. For example:
   ```
   final boolean customOffsetTopic = sourceConfig.offsetTopic();
   final boolean createTopicsEnabled = config.topicCreationEnable() && 
sourceConfig.usesTopicCreation();
   if (customOffsetTopic || createTopicsEnabled) {
     // Create an admin client
     ...
     if (createTopicsEnabled) {
       topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig);
     }
     if (customOffsetTopic) {
       // Build custom offset store
       ...
     }
   } else {
     // No need for admin
     admin = null;
   }
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -692,21 +823,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
         ConnectUtils.addMetricsContextProperties(consumerProps, config, 
clusterId);
         // Connector-specified overrides
         Map<String, Object> consumerOverrides =
-            connectorClientConfigOverrides(id, connConfig, connectorClass, 
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+            connectorClientConfigOverrides(connName, connConfig, 
connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
                                            ConnectorType.SINK, 
ConnectorClientConfigRequest.ClientType.CONSUMER,
                                            
connectorClientConfigOverridePolicy);
         consumerProps.putAll(consumerOverrides);
 
         return consumerProps;
     }
 
-    static Map<String, Object> adminConfigs(ConnectorTaskId id,
+    static Map<String, Object> adminConfigs(String connName,

Review comment:
       Do we need to change these signatures from `ConnectorTaskId` to 
`String`? The `ConnectorTaskId` gives us the ability to define tasks-specific 
client configuration properties if necessary/desired. I'm afraid that switching 
to `String` will make it harder and more invasive to add that back in. Plus, if 
there's not a good reason to remove these, let's leave that for smaller PRs.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -342,6 +343,10 @@ private void 
logPluginPathConfigProviderWarning(Map<String, String> rawOriginals
         }
     }
 
+    public String bootstrapServers() {
+        return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG));
+    }

Review comment:
       Very nice. For this and the other new getter methods, can you add some 
JavaDoc just so that it's easier to follow in an IDE where these methods are 
_used_? I wanted that several times as I was reviewing this code.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -16,95 +16,46 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.header.internals.RecordHeaders;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.CumulativeSum;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.errors.RetriableException;
-import org.apache.kafka.connect.header.Header;
-import org.apache.kafka.connect.header.Headers;
-import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
-import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
-import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.ClusterConfigState;
+import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.storage.StatusBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.TopicAdmin;
-import org.apache.kafka.connect.util.TopicCreation;
 import org.apache.kafka.connect.util.TopicCreationGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
 import java.util.IdentityHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
 
 /**
  * WorkerTask that uses a SourceTask to ingest data into Kafka.
  */
-class WorkerSourceTask extends WorkerTask {
+class WorkerSourceTask extends AbstractWorkerSourceTask {

Review comment:
       Note to future me: I didn't get this far in the PR.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                Admin adminClient = Admin.create(adminOverrides);
+                admin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+
+                if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                    topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                }
+
+                if (sourceConfig.offsetsTopic() != null && 
config.connectorOffsetsTopicsPermitted()) {
+                    Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                            connectorClientConfigOverridePolicy, 
kafkaClusterId);
+                    // Users can disable this if they want to; it won't affect 
delivery guarantees since the task isn't exactly-once anyways
+                    
consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+                    KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+                    offsetStore = 
ConnectorOffsetBackingStore.withConnectorOffsetStore(
+                            () -> LoggingContext.forTask(id),
+                            globalOffsetBackingStore,
+                            sourceConfig.offsetsTopic(),
+                            producer,
+                            consumer,
+                            admin
+                    );
+                }
+            } else {
+                admin = null;
+            }
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-adminclient-" + id, config,

Review comment:
       In my comment about `SourceTaskBuilder#doBuild`, I suggested adding a 
few comments to help the different parts/sections of this method stand out more 
clearly. The same would be true here, such as:
   ```suggestion
               // Create an admin client
               Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                Admin adminClient = Admin.create(adminOverrides);
+                admin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+
+                if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                    topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                }
+
+                if (sourceConfig.offsetsTopic() != null && 
config.connectorOffsetsTopicsPermitted()) {
+                    Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                            connectorClientConfigOverridePolicy, 
kafkaClusterId);
+                    // Users can disable this if they want to; it won't affect 
delivery guarantees since the task isn't exactly-once anyways
+                    
consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+                    KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+                    offsetStore = 
ConnectorOffsetBackingStore.withConnectorOffsetStore(
+                            () -> LoggingContext.forTask(id),
+                            globalOffsetBackingStore,
+                            sourceConfig.offsetsTopic(),
+                            producer,
+                            consumer,
+                            admin
+                    );
+                }
+            } else {
+                admin = null;
+            }
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-adminclient-" + id, config,
+                    sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+            Admin adminClient = Admin.create(adminOverrides);
+            TopicAdmin topicAdmin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+            }
+
+            String transactionalId = transactionalId(id);
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true",
+                    "for connectors when exactly-once source support is 
enabled",
+                    false
+            );
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId,
+                    "for connectors when exactly-once source support is 
enabled",
+                    true
+            );
+            producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+            producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            ConnectUtils.warnOnOverriddenProperty(
+                    consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
+                    "for connectors when exactly-once source support is 
enabled",
+                    false
+            );
+            consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            String offsetsTopic = 
Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic());
+
+            ConnectorOffsetBackingStore offsetStore;
+            // No need to do secondary writes to the global offsets topic if 
we're certain that the task's local offset store
+            // is going to be targeting it anyway
+            // Note that this may lead to a false positive if the user 
provides an overridden bootstrap servers value for their
+            // producer that resolves to the same Kafka cluster; we might 
consider looking up the Kafka cluster ID in the future
+            // to prevent these false positives but at the moment this is 
probably adequate, especially since we probably don't
+            // want to put a ping to a remote Kafka cluster inside the 
herder's tick thread (which is where this logic takes place
+            // right now) in case that takes a while.

Review comment:
       This is more of a nit-type comment.
   
   It seems like this comment tries to describe why we don't need to create a 
connector-specific offset store **AND** then talk about the check might not 
always be accurate. But I found the comment a bit hard to follow.
   
   Also, the `ConnectorOffsetBackingStore offsetStore;` line should move under 
the multi-line comment.
   
   In such cases, it might be worth changing the code to help make things more 
clear. For example:
   ```
   // We can simply reuse the worker's offset store when the connector-specific 
offset topic
   // is the same as the worker's. We can check the offset topic name and the 
Kafka cluster's
   // bootstrap servers, although this isn't exact and can lead to some false 
positives if the user
   // provides an overridden bootstrap servers value for their producer that is 
different than
   // the worker's but still resolves to the same Kafka cluster used by the 
worker.
   // At the moment this is probably adequate, especially since we probably 
don't want to put
   // a network ping to a remote Kafka cluster inside the herder's tick thread 
(which is where this
   // logic takes place right now) in case that takes a while.
   ConnectorOffsetBackingStore offsetStore;
   final boolean sameOffsetTopicAsWorker = 
offsetsTopic.equals(config.offsetsTopic())
                       && 
producerProps.get(BOOTSTRAP_SERVERS_CONFIG).equals(config.bootstrapServers();
   if (sameOffsetTopicAsWorker) {
       offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
       ...
   } else {
       offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore(
       ...
   }
   ```

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    public enum TransactionBoundary {
+        POLL,
+        INTERVAL,
+        CONNECTOR;
+
+        public static final TransactionBoundary DEFAULT = POLL;
+
+        public static List<String> options() {

Review comment:
       I think the risk of introducing `options()` is that some developers 
might accidentally use `values()`. 
   
   The pattern used in `ConnectorType` is far better, as it overrides the 
`toString()` method. That doesn't handle the case-independence for parsing, 
though `ConverterType` is a better pattern to follow if that's required.
   
   Let's be consistent with the new enums, and have each follow one of those 
two patterns depending upon whether parsing case-independently is required.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,31 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * The default implementation will return {@code null}.
+     * @param connectorConfig the configuration that will be used for the 
connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support,
+     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code 
null}, it is assumed that the
+     * connector cannot.
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> 
connectorConfig) {
+        return null;
+    }
+
+    /**
+     * Signals whether the connector can define its own transaction boundaries 
with the proposed
+     * configuration. Developers must override this method if they wish to add 
connector-defined
+     * transaction boundary support; if they do not, users will be unable to 
create instances of
+     * this connector that use connector-defined transaction boundaries. The 
default implementation
+     * will return {@code UNSUPPORTED}.

Review comment:
       WDYT about something like this:
   ```
   /**
    * Signals whether the connector implementation is capable of defining the 
transaction boundaries for a
    * connector with the given configuration. This method is called before 
{@link #start(Map)}, only when the
    * runtime supports exactly-once and the connector configuration includes 
{@code transaction.boundary=connector}.
    *
    * <p>This method need not be implemented if the connector implementation 
does not support definiting
    * transaction boundaries.
    *
    * @param connectorConfig the configuration that will be used for the 
connector
    * @return {@link ConnectorTransactionBoundaries.SUPPORTED} if the connector 
will define its own transaction boundaries, 
    *         or {@link ConnectorTransactionBoundaries.UNSUPPORTED} otherwise.
    * @see TransactionContext
    */
   ```
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                Admin adminClient = Admin.create(adminOverrides);
+                admin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+
+                if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                    topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                }
+
+                if (sourceConfig.offsetsTopic() != null && 
config.connectorOffsetsTopicsPermitted()) {
+                    Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                            connectorClientConfigOverridePolicy, 
kafkaClusterId);
+                    // Users can disable this if they want to; it won't affect 
delivery guarantees since the task isn't exactly-once anyways
+                    
consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+                    KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+                    offsetStore = 
ConnectorOffsetBackingStore.withConnectorOffsetStore(
+                            () -> LoggingContext.forTask(id),
+                            globalOffsetBackingStore,
+                            sourceConfig.offsetsTopic(),
+                            producer,
+                            consumer,
+                            admin
+                    );
+                }
+            } else {
+                admin = null;
+            }
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-adminclient-" + id, config,
+                    sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+            Admin adminClient = Admin.create(adminOverrides);
+            TopicAdmin topicAdmin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+            }
+
+            String transactionalId = transactionalId(id);
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true",
+                    "for connectors when exactly-once source support is 
enabled",
+                    false
+            );
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId,
+                    "for connectors when exactly-once source support is 
enabled",
+                    true
+            );
+            producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+            producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId);

Review comment:
       Ah, so here's one example of how these properties cannot be overridden 
by the connector config. But that's not quite so obvious for some other calls 
to `ConnectUtils.warnOnOverriddenProperty`.
   
   Is it not feasible to reset the expected value on the props within the 
`ConnectUtils#warnOnOverriddenProperty` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to