C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r786311430



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,31 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * The default implementation will return {@code null}.
+     * @param connectorConfig the configuration that will be used for the 
connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support,
+     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code 
null}, it is assumed that the
+     * connector cannot.
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> 
connectorConfig) {
+        return null;
+    }
+
+    /**
+     * Signals whether the connector can define its own transaction boundaries 
with the proposed
+     * configuration. Developers must override this method if they wish to add 
connector-defined
+     * transaction boundary support; if they do not, users will be unable to 
create instances of
+     * this connector that use connector-defined transaction boundaries. The 
default implementation
+     * will return {@code UNSUPPORTED}.

Review comment:
       LGTM. I added a small note on not returning `null` with the same 
language as in `SourceConnector::exactlyOnceSupport` but otherwise added this 
verbatim. LMKWYT

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    public enum TransactionBoundary {

Review comment:
       Just making sure: is there anything left to address for this comment?

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    public enum TransactionBoundary {
+        POLL,
+        INTERVAL,
+        CONNECTOR;
+
+        public static final TransactionBoundary DEFAULT = POLL;
+
+        public static List<String> options() {

Review comment:
       Neither of these patterns play nicely with the validators offered by 
`ConfigDef`, though. Both `ValidString` and `CaseInsensitiveValidString` 
require an array (or really, a varargs list) of strings in their constructors, 
which prevents us from using `Enum::values` directly to create one of these 
validators.
   
   I've removed `options()` (from both `TransactionBoundary` and 
`ExactlyOnceSupportLevel`) in favor of a reusable `Utils::enumOptions` method, 
which returns a `String[]` containing the names (retrieved via `toString`) of 
each value for an enum class. This prevents us from having to duplicate this 
logic and allows us to use it without adding new methods to public API.
   
   The actual implementations for both enums now follow the `ConnectorType` 
pattern, with an overridden `toString` method that returns the lowercase name 
of the enum and a static `fromProperty` method (which mirrors the 
`ConnectorType::fromValue` method) that parses a string case-insensitively into 
a value for the enum. I opted for a different method name for the latter since 
the values given to `fromProperty` should always be user-supplied values and 
the motivation for this separate method (instead of just invoking 
`Enum::valueOf` directly) is directly related to parsing user-supplied values.
   
   We could also add a general-purpose `EnumValidator` class to `ConfigDef` 
(for public use) or in a section of the Connect code base reserved for private 
API, but that would still leave us with the trouble of having to re-implement 
logic to follow the phrase "Permitted values are" in property docstrings, and 
it's probably best to hold off on implementing a public-facing validator like 
that until its exact behavior can be agreed on with a KIP.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
##########
@@ -38,4 +38,29 @@
      * Get the OffsetStorageReader for this SourceTask.
      */
     OffsetStorageReader offsetStorageReader();
+
+    /**
+     * Get a {@link TransactionContext} that can be used to define producer 
transaction boundaries
+     * when exactly-once support is enabled for the connector.
+     *
+     * <p>This method was added in Apache Kafka 3.0. Source tasks that use 
this method but want to
+     * maintain backward compatibility so they can also be deployed to older 
Connect runtimes
+     * should guard the call to this method with a try-catch block, since 
calling this method will result in a
+     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the 
source connector is deployed to
+     * Connect runtimes older than Kafka 3.0. For example:
+     * <pre>
+     *     TransactionContext transactionContext;
+     *     try {
+     *         transactionContext = context.transactionContext();
+     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *         transactionContext = null;
+     *     }
+     * </pre>
+     *
+     * @return the transaction context, or null if the connector was not 
configured to specify transaction boundaries
+     * @since 3.0

Review comment:
       I believe 3.2 is the correct version now, right? Will update to that.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,39 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * For backwards compatibility, the default implementation will return 
{@code null}, but connector developers are

Review comment:
       Ack, done. (And added a few more paragraph breaks in other places too).

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer 
transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {

Review comment:
       The behavior specified in the KIP is for 
`SourceTaskContext::transactionContext` to return "the transaction context, or 
null if the connector was not configured to specify transaction boundaries" 
(see the Javadocs for that method in the code snippets in [the relevant KIP 
section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-ConnectorAPIexpansions)).
   
   This is implemented in the 
[`ExactlyOnceWorkerSourceTask`](https://github.com/C0urante/kafka/blob/c06207d64a76286316b862e16143f909181501c3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L113-L117)
 class and should prevent connectors from invoking `TransactionContext` methods 
and accumulating records when the user has not configured the connector to 
define its own transaction boundaries.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -192,6 +199,44 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests";
     public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
+    private enum ExactlyOnceSourceSupport {
+        DISABLED(false),
+        PREPARING(true),
+        ENABLED(true);
+
+        public final boolean usesTransactionalLeader;
+
+        ExactlyOnceSourceSupport(boolean usesTransactionalLeader) {
+            this.usesTransactionalLeader = usesTransactionalLeader;
+        }
+
+        public static List<String> options() {
+            return 
Stream.of(values()).map(ExactlyOnceSourceSupport::toString).collect(Collectors.toList());
+        }
+
+        public static ExactlyOnceSourceSupport fromProperty(String property) {
+            return 
ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT));
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
+            + "by writing source records and their offsets in a Kafka 
transaction, and by proactively fencing out old task generations before 
bringing up new ones. "

Review comment:
       Ack, done.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java
##########
@@ -294,4 +298,41 @@ public void 
shouldRemoveCompactionFromStatusTopicSettings() {
         assertEquals(expectedTopicSettings, actual);
         assertNotEquals(topicSettings, actual);
     }
+
+    @Test
+    public void shouldIdentifyNeedForTransactionalLeader() {
+        Map<String, String> workerProps = configs();
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
+        assertFalse(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing");
+        assertTrue(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+
+        workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+        assertTrue(new 
DistributedConfig(workerProps).transactionalLeaderEnabled());
+    }
+
+    @Test
+    public void shouldConstructExpectedTransactionalId() {
+        Map<String, String> workerProps = configs();
+
+        workerProps.put(GROUP_ID_CONFIG, "why did i stay up all night writing 
unit tests");
+        assertEquals(
+                "connect-cluster-why did i stay up all night writing unit 
tests",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+
+        workerProps.put(GROUP_ID_CONFIG, "connect-cluster");
+        assertEquals(
+                "connect-cluster-connect-cluster",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+
+        workerProps.put(GROUP_ID_CONFIG, "\u2603");
+        assertEquals(
+                "connect-cluster-\u2603",
+                new DistributedConfig(workerProps).transactionalProducerId()
+        );
+    }

Review comment:
       Ack, done. The tests are located elsewhere since the logic is 
implemented in a different class, but there are new cases for this in the 
`KafkaConfigBackingStoreTest`, `KafkaOffsetBackingStoreTest`, and `WorkerTest` 
test suites.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -125,28 +130,28 @@ public KafkaBasedLog(String topic,
      * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
      * {@link #start()} is invoked.
      *
-     * @param topic              the topic to treat as a log
-     * @param producerConfigs    configuration options to use when creating 
the internal producer. At a minimum this must
+     * @param topic                     the topic to treat as a log
+     * @param producerConfigs           configuration options to use when 
creating the internal producer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the number of acks, will be 
overridden to ensure correct behavior of this
      *                           class.
-     * @param consumerConfigs    configuration options to use when creating 
the internal consumer. At a minimum this must
+     * @param consumerConfigs           configuration options to use when 
creating the internal consumer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the auto offset reset 
policy, will be overridden to ensure correct
      *                           behavior of this class.
-     * @param topicAdminSupplier supplier function for an admin client, the 
lifecycle of which is expected to be controlled
+     * @param topicAdminSupplier        supplier function for an admin client, 
the lifecycle of which is expected to be controlled
      *                           by the calling component; may not be null
-     * @param consumedCallback   callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
-     * @param time               Time interface
-     * @param initializer        the function that should be run when this log 
is {@link #start() started}; may be null
+     * @param consumedCallback          callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
+     * @param time                      Time interface
+     * @param initializer               the function that should be run when 
this log is {@link #start() started}; may be null
      */
     public KafkaBasedLog(String topic,
-            Map<String, Object> producerConfigs,
-            Map<String, Object> consumerConfigs,
-            Supplier<TopicAdmin> topicAdminSupplier,
-            Callback<ConsumerRecord<K, V>> consumedCallback,
-            Time time,
-            java.util.function.Consumer<TopicAdmin> initializer) {
+                         Map<String, Object> producerConfigs,
+                         Map<String, Object> consumerConfigs,
+                         Supplier<TopicAdmin> topicAdminSupplier,
+                         Callback<ConsumerRecord<K, V>> consumedCallback,
+                         Time time,
+                         java.util.function.Consumer<TopicAdmin> initializer) {

Review comment:
       Apologies, will revert. Believe this was left in from an older approach 
that involved more invasive changes to this class.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -365,6 +413,10 @@ private void readToLogEnd() {
                 // This may happen with really old brokers that don't support 
the auto topic creation
                 // field in metadata requests
                 log.debug("Reading to end of log offsets with consumer since 
admin client is unsupported: {}", e.getMessage());
+                if (requireAdminForOffsets) {
+                    // Should be handled by the caller during log startup
+                    throw e;
+                }

Review comment:
       Ack on both points; done 👍 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -264,12 +279,62 @@ public void startConnector(
 
                 log.info("Creating connector {} of type {}", connName, 
connClass);
                 final Connector connector = plugins.newConnector(connClass);
-                final ConnectorConfig connConfig = 
ConnectUtils.isSinkConnector(connector)
-                        ? new SinkConnectorConfig(plugins, connProps)
-                        : new SourceConnectorConfig(plugins, connProps, 
config.topicCreationEnable());
+                final ConnectorConfig connConfig;
+                final CloseableOffsetStorageReader offsetReader;
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    connConfig = new SinkConnectorConfig(plugins, connProps);
+                    offsetReader = null;

Review comment:
       Ah, good catch! I think this is one more place where 
`Utils::closeQuietly` comes in handy since it does the null check for us, won't 
interrupt shutdown if an exception is thrown, and further standardizes the 
cleanup logic for the code base.
   
   Alternatively, we could retain the current behavior, but I don't see much 
use in constructing an offset reader for sink connectors that won't be used now 
and is unlikely to be used in the future.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+public class ConnectorOffsetBackingStore implements OffsetBackingStore {

Review comment:
       Ack, done. Added docs to the (now-three) static factory methods and to 
each of the overridden `OffsetBackingStore` methods.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.LoggingContext;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
+public class ConnectorOffsetBackingStore implements OffsetBackingStore {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ConnectorOffsetBackingStore.class);
+
+    private final Time time;
+    private final Supplier<LoggingContext> loggingContext;
+    private final String primaryOffsetsTopic;
+    private final OffsetBackingStore workerStore;
+    private final Optional<OffsetBackingStore> connectorStore;
+    private final Optional<TopicAdmin> connectorStoreAdmin;
+
+    public static ConnectorOffsetBackingStore withConnectorOffsetStore(

Review comment:
       Ack, done.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) {
         }
     }
 
+    /**
+     * Log a warning when the user attempts to override a property that cannot 
be overridden.
+     * @param props the configuration properties provided by the user
+     * @param key the name of the property to check on
+     * @param expectedValue the expected value for the property
+     * @param justification the reason the property cannot be overridden.
+     *                      Will follow the phrase "The value... for the... 
property will be ignored as it cannot be overridden ".
+     *                      For example, one might supply the message "in 
connectors with the DLQ feature enabled" for this parameter.
+     * @param caseSensitive whether the value should match case-insensitively
+     */
+    public static void warnOnOverriddenProperty(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        overriddenPropertyWarning(props, key, expectedValue, justification, 
caseSensitive).ifPresent(log::warn);
+    }
+
+    // Visible for testing
+    static Optional<String> overriddenPropertyWarning(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        Predicate<String> matchesExpectedValue = caseSensitive ? 
expectedValue::equals : expectedValue::equalsIgnoreCase;
+        String value = 
Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null);
+        if (value != null && !matchesExpectedValue.test(value)) {
+            return Optional.of(String.format(
+                    "The value '%s' for the '%s' property will be ignored as 
it cannot be overridden%s. "
+                            + "The value '%s' will be used instead.",

Review comment:
       I suppose not! There are two classes that currently leverage this 
utility method: `Worker` and `DistributedConfig`.
   
   In the former, each invocation of `warnOnOverriddenProperty` is followed 
closely (if not immediately) by a line that overrides any user-supplied values 
that don't match the expected value.
   
   However, in `DistributedConfig`, warnings are emitted for values that are 
ignored and overridden when constructing Kafka clients for some but not all 
`KafkaBasedLog` instances: specifically, the ones used for the config and 
offset backing stores. It's possible that a user might specify 
`enable.idempotence=false` in their worker config with the intention of 
disabling idempotent writes for the producers used for the status backing store 
(since the default for this property was updated from `false` to `true` in 
[KIP-679](https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default)).
 With the current behavior for this PR, that change would have an effect (and 
the values logged for the producer for the status backing store would confirm 
this), but the warning message would still be logged, which might lead to some 
confusion.
   
   At the bare minimum we can and should update this message to be more 
specific about where exactly these values will be ignored, but in addition to 
that, I've taken a stab at restructuring the reusable utility function to 
actually perform the override in addition to just warning about it, like you 
suggested in a comment below. This makes it harder to log warnings about 
overrides without actually ensuring that the overrides have taken place.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -264,12 +279,62 @@ public void startConnector(
 
                 log.info("Creating connector {} of type {}", connName, 
connClass);
                 final Connector connector = plugins.newConnector(connClass);
-                final ConnectorConfig connConfig = 
ConnectUtils.isSinkConnector(connector)
-                        ? new SinkConnectorConfig(plugins, connProps)
-                        : new SourceConnectorConfig(plugins, connProps, 
config.topicCreationEnable());
+                final ConnectorConfig connConfig;
+                final CloseableOffsetStorageReader offsetReader;
+                if (ConnectUtils.isSinkConnector(connector)) {
+                    connConfig = new SinkConnectorConfig(plugins, connProps);
+                    offsetReader = null;
+                } else {
+                    SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins, connProps, config.topicCreationEnable());
+                    connConfig = sourceConfig;
+
+                    String connectorOffsetsTopic = null;
+                    if (sourceConfig.offsetsTopic() != null) {
+                        connectorOffsetsTopic = sourceConfig.offsetsTopic();
+                    } else if (config.exactlyOnceSourceEnabled()) {
+                        connectorOffsetsTopic = config.offsetsTopic();
+                    }
+

Review comment:
       Ack, done.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -476,22 +541,95 @@ public boolean isRunning(String connName) {
     }
 
     /**
-     * Start a task managed by this worker.
+     * Start a sink task managed by this worker.
+     *
+     * @param id the task ID.
+     * @param configState the most recent {@link ClusterConfigState} known to 
the worker
+     * @param connProps the connector properties.
+     * @param taskProps the tasks properties.
+     * @param statusListener a listener for the runtime status transitions of 
the task.
+     * @param initialState the initial state of the connector.
+     * @return true if the task started successfully.
+     */
+    public boolean startSinkTask(
+            ConnectorTaskId id,
+            ClusterConfigState configState,
+            Map<String, String> connProps,
+            Map<String, String> taskProps,
+            TaskStatus.Listener statusListener,
+            TargetState initialState
+    ) {
+        return startTask(id, connProps, taskProps, statusListener,
+                new SinkTaskBuilder(id, configState, statusListener, 
initialState));
+    }
+
+    /**
+     * Start a source task managed by this worker.

Review comment:
       Ack, done.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -692,21 +823,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
         ConnectUtils.addMetricsContextProperties(consumerProps, config, 
clusterId);
         // Connector-specified overrides
         Map<String, Object> consumerOverrides =
-            connectorClientConfigOverrides(id, connConfig, connectorClass, 
ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
+            connectorClientConfigOverrides(connName, connConfig, 
connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX,
                                            ConnectorType.SINK, 
ConnectorClientConfigRequest.ClientType.CONSUMER,
                                            
connectorClientConfigOverridePolicy);
         consumerProps.putAll(consumerOverrides);
 
         return consumerProps;
     }
 
-    static Map<String, Object> adminConfigs(ConnectorTaskId id,
+    static Map<String, Object> adminConfigs(String connName,

Review comment:
       We have to be able to construct admin clients for `Connector` instances 
now, whereas before we only used them for `Task` instances.
   
   I do sympathize with the desire for task-specific tweaks, though. Right now 
the pattern is for the `adminConfigs` method to define a new parameter for 
anything that should be derived from the task ID/connector name and place the 
onus on the caller to derive that value (for example, this is done with the 
`defaultClientId` parameter). I think this works for now but one pattern we 
could consider is to add separate wrapper `adminConfigs` methods for clients 
constructed on behalf of connectors and on behalf of tasks, similar to the new 
wrapper methods I've added for (regular and exactly-once) offset consumers and 
exactly-once task producers.
   
   Thoughts?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {

Review comment:
       I've extracted a lot of this logic into separate methods for testability 
and readability, but tried to stick with the general spirit of this comment 
while doing so and left a few comments and added some `final boolean` local 
variables in the newly-isolated methods.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                Admin adminClient = Admin.create(adminOverrides);
+                admin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+
+                if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                    topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                }
+
+                if (sourceConfig.offsetsTopic() != null && 
config.connectorOffsetsTopicsPermitted()) {
+                    Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                            connectorClientConfigOverridePolicy, 
kafkaClusterId);
+                    // Users can disable this if they want to; it won't affect 
delivery guarantees since the task isn't exactly-once anyways
+                    
consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+                    KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+                    offsetStore = 
ConnectorOffsetBackingStore.withConnectorOffsetStore(
+                            () -> LoggingContext.forTask(id),
+                            globalOffsetBackingStore,
+                            sourceConfig.offsetsTopic(),
+                            producer,
+                            consumer,
+                            admin
+                    );
+                }
+            } else {
+                admin = null;
+            }
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-adminclient-" + id, config,

Review comment:
       This section has been pretty heavily refactored; I hope the current 
state of things honors the intent of this comment but if not LMK.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                Admin adminClient = Admin.create(adminOverrides);
+                admin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+
+                if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                    topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                }
+
+                if (sourceConfig.offsetsTopic() != null && 
config.connectorOffsetsTopicsPermitted()) {
+                    Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                            connectorClientConfigOverridePolicy, 
kafkaClusterId);
+                    // Users can disable this if they want to; it won't affect 
delivery guarantees since the task isn't exactly-once anyways
+                    
consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+                    KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+                    offsetStore = 
ConnectorOffsetBackingStore.withConnectorOffsetStore(
+                            () -> LoggingContext.forTask(id),
+                            globalOffsetBackingStore,
+                            sourceConfig.offsetsTopic(),
+                            producer,
+                            consumer,
+                            admin
+                    );
+                }
+            } else {
+                admin = null;
+            }
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-adminclient-" + id, config,
+                    sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+            Admin adminClient = Admin.create(adminOverrides);
+            TopicAdmin topicAdmin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+            }
+
+            String transactionalId = transactionalId(id);
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true",
+                    "for connectors when exactly-once source support is 
enabled",
+                    false
+            );
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId,
+                    "for connectors when exactly-once source support is 
enabled",
+                    true
+            );
+            producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+            producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId);

Review comment:
       Yep, done.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() {
         return workerMetricsGroup;
     }
 
+    abstract class TaskBuilder {
+
+        private final ConnectorTaskId id;
+        private final ClusterConfigState configState;
+        private final TaskStatus.Listener statusListener;
+        private final TargetState initialState;
+
+        private Task task;
+        private ConnectorConfig connectorConfig = null;
+        private Converter keyConverter = null;
+        private Converter valueConverter = null;
+        private HeaderConverter headerConverter = null;
+        private ClassLoader classLoader = null;
+
+        public TaskBuilder(ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState) {
+            this.id = id;
+            this.configState = configState;
+            this.statusListener = statusListener;
+            this.initialState = initialState;
+        }
+
+        public TaskBuilder withTask(Task task) {
+            this.task = task;
+            return this;
+        }
+
+        public TaskBuilder withConnectorConfig(ConnectorConfig 
connectorConfig) {
+            this.connectorConfig = connectorConfig;
+            return this;
+        }
+
+        public TaskBuilder withKeyConverter(Converter keyConverter) {
+            this.keyConverter = keyConverter;
+            return this;
+        }
+
+        public TaskBuilder withValueConverter(Converter valueConverter) {
+            this.valueConverter = valueConverter;
+            return this;
+        }
+
+        public TaskBuilder withHeaderConverter(HeaderConverter 
headerConverter) {
+            this.headerConverter = headerConverter;
+            return this;
+        }
+
+        public TaskBuilder withClassloader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
+        public WorkerTask build() {
+            Objects.requireNonNull(task, "Task cannot be null");
+            Objects.requireNonNull(connectorConfig, "Connector config used by 
task cannot be null");
+            Objects.requireNonNull(keyConverter, "Key converter used by task 
cannot be null");
+            Objects.requireNonNull(valueConverter, "Value converter used by 
task cannot be null");
+            Objects.requireNonNull(headerConverter, "Header converter used by 
task cannot be null");
+            Objects.requireNonNull(classLoader, "Classloader used by task 
cannot be null");
+
+            ErrorHandlingMetrics errorHandlingMetrics = 
errorHandlingMetrics(id);
+            final Class<? extends Connector> connectorClass = 
plugins.connectorClass(
+                    
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+            RetryWithToleranceOperator retryWithToleranceOperator = new 
RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
+                    connectorConfig.errorMaxDelayInMillis(), 
connectorConfig.errorToleranceType(), Time.SYSTEM);
+            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+
+            return doBuild(task, id, configState, statusListener, initialState,
+                    connectorConfig, keyConverter, valueConverter, 
headerConverter, classLoader,
+                    errorHandlingMetrics, connectorClass, 
retryWithToleranceOperator);
+        }
+
+        abstract WorkerTask doBuild(Task task,
+                                    ConnectorTaskId id,
+                                    ClusterConfigState configState,
+                                    TaskStatus.Listener statusListener,
+                                    TargetState initialState,
+                                    ConnectorConfig connectorConfig,
+                                    Converter keyConverter,
+                                    Converter valueConverter,
+                                    HeaderConverter headerConverter,
+                                    ClassLoader classLoader,
+                                    ErrorHandlingMetrics errorHandlingMetrics,
+                                    Class<? extends Connector> connectorClass,
+                                    RetryWithToleranceOperator 
retryWithToleranceOperator);
+
+    }
+
+    class SinkTaskBuilder extends TaskBuilder {
+        public SinkTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+            SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
+            retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
+            WorkerErrantRecordReporter workerErrantRecordReporter = 
createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
+                    keyConverter, valueConverter, headerConverter);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(),  "connector-consumer-" + id, config, 
connectorConfig, connectorClass, connectorClientConfigOverridePolicy, 
kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, 
initialState, config, configState, metrics, keyConverter,
+                    valueConverter, headerConverter, transformationChain, 
consumer, classLoader, time,
+                    retryWithToleranceOperator, workerErrantRecordReporter, 
herder.statusBackingStore());
+        }
+    }
+
+    class SourceTaskBuilder extends TaskBuilder {
+        public SourceTaskBuilder(ConnectorTaskId id,
+                               ClusterConfigState configState,
+                               TaskStatus.Listener statusListener,
+                               TargetState initialState) {
+            super(id, configState, statusListener, initialState);
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                           ConnectorTaskId id,
+                           ClusterConfigState configState,
+                           TaskStatus.Listener statusListener,
+                           TargetState initialState,
+                           ConnectorConfig connectorConfig,
+                           Converter keyConverter,
+                           Converter valueConverter,
+                           HeaderConverter headerConverter,
+                           ClassLoader classLoader,
+                           ErrorHandlingMetrics errorHandlingMetrics,
+                           Class<? extends Connector> connectorClass,
+                           RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            ConnectorOffsetBackingStore offsetStore = 
ConnectorOffsetBackingStore.withoutConnectorOffsetStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+            final TopicAdmin admin;
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (sourceConfig.offsetsTopic() != null || 
(config.topicCreationEnable() && sourceConfig.usesTopicCreation())) {
+                Map<String, Object> adminOverrides = 
adminConfigs(id.connector(), "connector-adminclient-" + id, config,
+                        sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+                Admin adminClient = Admin.create(adminOverrides);
+                admin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+
+                if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                    topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+                }
+
+                if (sourceConfig.offsetsTopic() != null && 
config.connectorOffsetsTopicsPermitted()) {
+                    Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                            connectorClientConfigOverridePolicy, 
kafkaClusterId);
+                    // Users can disable this if they want to; it won't affect 
delivery guarantees since the task isn't exactly-once anyways
+                    
consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+                    KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+                    offsetStore = 
ConnectorOffsetBackingStore.withConnectorOffsetStore(
+                            () -> LoggingContext.forTask(id),
+                            globalOffsetBackingStore,
+                            sourceConfig.offsetsTopic(),
+                            producer,
+                            consumer,
+                            admin
+                    );
+                }
+            } else {
+                admin = null;
+            }
+            offsetStore.configure(config);
+
+            CloseableOffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+            OffsetStorageWriter offsetWriter = new 
OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, 
internalValueConverter);
+
+            // Note we pass the configState as it performs dynamic 
transformations under the covers
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, 
initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, admin, 
topicCreationGroups,
+                    offsetReader, offsetWriter, offsetStore, config, 
configState, metrics, classLoader, time,
+                    retryWithToleranceOperator, herder.statusBackingStore(), 
executor);
+        }
+    }
+
+    class ExactlyOnceSourceTaskBuilder extends TaskBuilder {
+        private final Runnable preProducerCheck;
+        private final Runnable postProducerCheck;
+
+        public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id,
+                                            ClusterConfigState configState,
+                                            TaskStatus.Listener statusListener,
+                                            TargetState initialState,
+                                            Runnable preProducerCheck,
+                                            Runnable postProducerCheck) {
+            super(id, configState, statusListener, initialState);
+            this.preProducerCheck = preProducerCheck;
+            this.postProducerCheck = postProducerCheck;
+        }
+
+        @Override
+        public WorkerTask doBuild(Task task,
+                                  ConnectorTaskId id,
+                                  ClusterConfigState configState,
+                                  TaskStatus.Listener statusListener,
+                                  TargetState initialState,
+                                  ConnectorConfig connectorConfig,
+                                  Converter keyConverter,
+                                  Converter valueConverter,
+                                  HeaderConverter headerConverter,
+                                  ClassLoader classLoader,
+                                  ErrorHandlingMetrics errorHandlingMetrics,
+                                  Class<? extends Connector> connectorClass,
+                                  RetryWithToleranceOperator 
retryWithToleranceOperator) {
+
+            SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
+                    connectorConfig.originalsStrings(), 
config.topicCreationEnable());
+            retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            log.info("Initializing: {}", transformationChain);
+
+            Map<String, Object> adminOverrides = adminConfigs(id.connector(), 
"connector-adminclient-" + id, config,
+                    sourceConfig, connectorClass, 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+            Admin adminClient = Admin.create(adminOverrides);
+            TopicAdmin topicAdmin = new 
TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient);
+            Map<String, TopicCreationGroup> topicCreationGroups = null;
+            if (config.topicCreationEnable() && 
sourceConfig.usesTopicCreation()) {
+                topicCreationGroups = 
TopicCreationGroup.configuredGroups(sourceConfig);
+            }
+
+            String transactionalId = transactionalId(id);
+            Map<String, Object> producerProps = 
producerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true",
+                    "for connectors when exactly-once source support is 
enabled",
+                    false
+            );
+            ConnectUtils.warnOnOverriddenProperty(
+                    producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId,
+                    "for connectors when exactly-once source support is 
enabled",
+                    true
+            );
+            producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+            producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionalId);
+            KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps);
+
+            Map<String, Object> consumerProps = 
consumerConfigs(id.connector(), "connector-consumer-" + id, config, 
connectorConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            ConnectUtils.warnOnOverriddenProperty(
+                    consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
+                    "for connectors when exactly-once source support is 
enabled",
+                    false
+            );
+            consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            String offsetsTopic = 
Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic());
+
+            ConnectorOffsetBackingStore offsetStore;
+            // No need to do secondary writes to the global offsets topic if 
we're certain that the task's local offset store
+            // is going to be targeting it anyway
+            // Note that this may lead to a false positive if the user 
provides an overridden bootstrap servers value for their
+            // producer that resolves to the same Kafka cluster; we might 
consider looking up the Kafka cluster ID in the future
+            // to prevent these false positives but at the moment this is 
probably adequate, especially since we probably don't
+            // want to put a ping to a remote Kafka cluster inside the 
herder's tick thread (which is where this logic takes place
+            // right now) in case that takes a while.

Review comment:
       Ack, done. This logic has been moved into an isolated method and I've 
moved the relevant portions of the comment with it.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -342,6 +343,10 @@ private void 
logPluginPathConfigProviderWarning(Map<String, String> rawOriginals
         }
     }
 
+    public String bootstrapServers() {
+        return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG));
+    }

Review comment:
       Sorry, I'm a little unclear on what the benefits of Javadocs here are. 
I've added tags for each of the relevant methods but would you mind 
elaborating? In IntelliJ, I can see where this method is invoked by 
cmd+clicking the `bootstrapServers()` declaration, and wherever it's invoked, I 
can go to the declaration by cmd+clicking the invocation.
   
   I'm aware that not everyone uses my IDE and certainly don't expect them to 
start doing so; just wondering if there's a different IDE out there with 
behavior that makes Javadocs like these significantly more powerful.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
##########
@@ -74,15 +75,15 @@
     private volatile boolean cancelled; // indicates whether the Worker has 
cancelled the connector (e.g. because of slow shutdown)
 
     private State state;
-    private final OffsetStorageReader offsetStorageReader;
+    private final CloseableOffsetStorageReader offsetStorageReader;
 
     public WorkerConnector(String connName,
                            Connector connector,
                            ConnectorConfig connectorConfig,
                            CloseableConnectorContext ctx,
                            ConnectMetrics metrics,
                            ConnectorStatus.Listener statusListener,
-                           OffsetStorageReader offsetStorageReader,
+                           CloseableOffsetStorageReader offsetStorageReader,

Review comment:
       I opted for an alternative approach where the offset reader is allowed 
to be `null`, but I can also see the benefit of a conditional call to 
`Objects::requireNonNull`. I've added one in `WorkerConnector::initialize`; let 
me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to