kkonstantine commented on a change in pull request #8722:
URL: https://github.com/apache/kafka/pull/8722#discussion_r430089754



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##########
@@ -678,7 +701,8 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
         if (topic != null && !topic.isEmpty()) {
             Map<String, Object> producerProps = producerConfigs(id, 
"connector-dlq-producer-" + id, config, connConfig, connectorClass,
                                                                 
connectorClientConfigOverridePolicy);
-            Map<String, Object> adminProps = adminConfigs(id, config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy);
+            // Leaving default client id empty means that the admin client 
will set the default at instantiation time
+            Map<String, Object> adminProps = adminConfigs(id, "", config, 
connConfig, connectorClass, connectorClientConfigOverridePolicy);

Review comment:
       Definitely. I'm in favor too if we don't mind the change. Added 
`connector-dlq-adminclient-`

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +192,76 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final String name;
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig 
config) {
+            this.name = group;
+            this.inclusionPattern = Pattern.compile(String.join("|", 
config.topicCreationInclude(group)));
+            this.exclusionPattern = Pattern.compile(String.join("|", 
config.topicCreationExclude(group)));
+            this.numPartitions = config.topicCreationPartitions(group);
+            this.replicationFactor = 
config.topicCreationReplicationFactor(group);
+            this.otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && 
inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }
+
+        public static Map<String, NewTopicCreationGroup> 
configuredGroups(SourceConnectorConfig config) {
+            List<String> groupNames = 
config.getList(TOPIC_CREATION_GROUPS_CONFIG);
+            Map<String, NewTopicCreationGroup> groups = new LinkedHashMap<>();
+            for (String group : groupNames) {
+                groups.put(group, new NewTopicCreationGroup(group, config));
+            }
+            // Even if there was a group called 'default' in the config, it 
will be overriden here.
+            // Order matters for all the topic groups besides the default, 
since it will be
+            // removed from this collection by the Worker
+            groups.put(DEFAULT_TOPIC_CREATION_GROUP, new 
NewTopicCreationGroup(DEFAULT_TOPIC_CREATION_GROUP, config));
+            return groups;
+        }

Review comment:
       I have to admit two things: 
   a) I tried it when I wrote it, because I remembered that auto formatting in 
IntelliJ applies a similar type of ordering (although I can't reproduce with my 
current settings). 
   b) I'm not a huge fan. I think I prefer the ordering that says: 
   static member fields first, followed by non-static member fields, then 
constructors, then some logical ordering of the rest of the methods. I can see 
factory methods following constructors, but other than that I think a logical 
ordering of methods gives us good flexibility. 
   
   I checked in case I had missed a recent updated guideline in [Google Java 
Style 
Guide](https://google.github.io/styleguide/javaguide.html#s3.4.2-ordering-class-contents)
 which we loosely follow, or elsewhere. Found another old but reasonable 
[recommendation](https://www.oracle.com/technetwork/java/codeconventions-150003.pdf).
 But I didn't find any clear guideline for adding static methods before 
constructors, member fields and other methods. Given that this imposes an 
ordering other than logical for methods, I feel we could skip on that one. wdyt?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a 
record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || 
topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. 
Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = 
topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());

Review comment:
       Good observation. Done. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -384,6 +452,40 @@ public void onCompletion(RecordMetadata recordMetadata, 
Exception e) {
         return true;
     }
 
+    // Due to transformations that may change the destination topic of a 
record (such as
+    // RegexRouter) topic creation can not be batched for multiple topics
+    private void maybeCreateTopic(String topic) {
+        if (!topicCreation.isTopicCreationEnabled() || 
topicCreation.topicCache().contains(topic)) {
+            return;
+        }
+        log.info("The task will send records to topic '{}' for the first time. 
Checking "
+                + "whether topic exists", topic);
+        Map<String, TopicDescription> existing = admin.describeTopics(topic);
+        if (!existing.isEmpty()) {
+            log.info("Topic '{}' already exists.", topic);
+            topicCreation.topicCache().add(topic);
+            return;
+        }
+
+        log.info("Creating topic '{}'", topic);
+        NewTopicCreationGroup topicGroup = 
topicCreation.topicGroups().values().stream()
+                .filter(group -> group.matches(topic))
+                .findFirst()
+                .orElse(topicCreation.defaultTopicGroup());
+        log.debug("Topic '{}' matched topic creation group: {}", topic, 
topicGroup);
+        NewTopic newTopic = topicGroup.newTopic(topic);
+
+        if (admin.createTopic(newTopic)) {
+            topicCreation.topicCache().add(topic);
+            log.info("Created topic '{}' using creation group {}", newTopic, 
topicGroup);

Review comment:
       👍 good catch for a second time. I added it, then removed after I added 
the `name` field and didn't add it back. It should be there now. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -178,6 +191,47 @@ public NewTopic build() {
         }
     }
 
+    public static class NewTopicCreationGroup {
+        private final Pattern inclusionPattern;
+        private final Pattern exclusionPattern;
+        private final int numPartitions;
+        private final short replicationFactor;
+        private final Map<String, Object> otherConfigs;
+
+        protected NewTopicCreationGroup(String group, SourceConnectorConfig 
config) {
+            inclusionPattern = Pattern.compile(String.join("|", 
config.topicCreationInclude(group)));
+            exclusionPattern = Pattern.compile(String.join("|", 
config.topicCreationExclude(group)));
+            numPartitions = config.topicCreationPartitions(group);
+            replicationFactor = config.topicCreationReplicationFactor(group);
+            otherConfigs = config.topicCreationOtherConfigs(group);
+        }
+
+        public boolean matches(String topic) {
+            return !exclusionPattern.matcher(topic).matches() && 
inclusionPattern.matcher(topic).matches();
+        }
+
+        public NewTopic newTopic(String topic) {
+            NewTopicBuilder builder = new NewTopicBuilder(topic);
+            return builder.partitions(numPartitions)
+                    .replicationFactor(replicationFactor)
+                    .config(otherConfigs)
+                    .build();
+        }

Review comment:
       Added for real this time :)

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -142,6 +155,57 @@ public WorkerSourceTask(ConnectorTaskId id,
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
         this.producerSendException = new AtomicReference<>();
         this.isTopicTrackingEnabled = 
workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
+        this.topicCreation = TopicCreation.newTopicCreation(workerConfig, 
topicGroups);
+    }
+
+    public static class TopicCreation {
+        private static final TopicCreation EMPTY =
+                new TopicCreation(false, null, Collections.emptyMap(), 
Collections.emptySet());
+
+        private final boolean isTopicCreationEnabled;
+        private final NewTopicCreationGroup defaultTopicGroup;
+        private final Map<String, NewTopicCreationGroup> topicGroups;
+        private final Set<String> topicCache;
+
+        protected TopicCreation(boolean isTopicCreationEnabled,
+                                NewTopicCreationGroup defaultTopicGroup,
+                                Map<String, NewTopicCreationGroup> topicGroups,
+                                Set<String> topicCache) {
+            this.isTopicCreationEnabled = isTopicCreationEnabled;
+            this.defaultTopicGroup = defaultTopicGroup;
+            this.topicGroups = topicGroups;
+            this.topicCache = topicCache;
+        }
+
+        public static TopicCreation newTopicCreation(WorkerConfig workerConfig,
+                                                     Map<String, 
NewTopicCreationGroup> topicGroups) {
+            if (!workerConfig.topicCreationEnable() || topicGroups == null) {
+                return EMPTY;
+            }
+            Map<String, NewTopicCreationGroup> groups = new 
LinkedHashMap<>(topicGroups);
+            groups.remove(DEFAULT_TOPIC_CREATION_GROUP);
+            return new TopicCreation(true, 
topicGroups.get(DEFAULT_TOPIC_CREATION_GROUP), groups, new HashSet<>());
+        }

Review comment:
       See reply on the subject on the similar comment. This factory method is 
right below the constructor here. 

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskWithTopicCreationTest.java
##########
@@ -0,0 +1,1490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.integration.MonitorableSourceConnector;
+import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
+import 
org.apache.kafka.connect.runtime.WorkerSourceTask.SourceTaskMetricsGroup;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.OffsetStorageWriter;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.apache.kafka.connect.util.TopicAdmin;
+import org.apache.kafka.connect.util.TopicCreation;
+import org.apache.kafka.connect.util.TopicCreationGroup;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.SourceConnectorConfig.TOPIC_CREATION_GROUPS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@PowerMockIgnore({"javax.management.*",
+                  "org.apache.log4j.*"})
+@RunWith(PowerMockRunner.class)
+public class WorkerSourceTaskWithTopicCreationTest extends ThreadedTest {
+    private static final String TOPIC = "topic";
+    private static final String OTHER_TOPIC = "other-topic";
+    private static final Map<String, byte[]> PARTITION = 
Collections.singletonMap("key", "partition".getBytes());
+    private static final Map<String, Integer> OFFSET = 
Collections.singletonMap("key", 12);
+
+    // Connect-format data
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+    private static final Integer KEY = -1;
+    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
+    private static final Long RECORD = 12L;
+    // Serialized data. The actual format of this data doesn't matter -- we 
just want to see that the right version
+    // is used in the right place.
+    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
+    private static final byte[] SERIALIZED_RECORD = 
"converted-record".getBytes();
+
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
+    private WorkerConfig config;
+    private SourceConnectorConfig sourceConfig;
+    private Plugins plugins;
+    private MockConnectMetrics metrics;
+    @Mock private SourceTask sourceTask;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+    @Mock private HeaderConverter headerConverter;
+    @Mock private TransformationChain<SourceRecord> transformationChain;
+    @Mock private KafkaProducer<byte[], byte[]> producer;
+    @Mock private TopicAdmin admin;
+    @Mock private CloseableOffsetStorageReader offsetReader;
+    @Mock private OffsetStorageWriter offsetWriter;
+    @Mock private ClusterConfigState clusterConfigState;
+    private WorkerSourceTask workerTask;
+    @Mock private Future<RecordMetadata> sendFuture;
+    @MockStrict private TaskStatus.Listener statusListener;
+    @Mock private StatusBackingStore statusBackingStore;
+
+    private Capture<org.apache.kafka.clients.producer.Callback> 
producerCallbacks;
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, 
TestSourceTask.class.getName());
+    }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
+    private static final List<SourceRecord> RECORDS = Arrays.asList(
+            new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, KEY, 
RECORD_SCHEMA, RECORD)
+    );
+
+    // when this test becomes parameterized, this variable will be a test 
parameter
+    public boolean enableTopicCreation = true;
+
+    @Override
+    public void setup() {
+        super.setup();
+        Map<String, String> workerProps = workerProps();
+        plugins = new Plugins(workerProps);
+        config = new StandaloneConfig(workerProps);
+        sourceConfig = new SourceConnectorConfig(plugins, 
sourceConnectorPropsWithGroups(TOPIC), true);
+        producerCallbacks = EasyMock.newCapture();
+        metrics = new MockConnectMetrics();
+    }
+
+    private Map<String, String> workerProps() {
+        Map<String, String> props = new HashMap<>();
+        props.put("key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.value.converter", 
"org.apache.kafka.connect.json.JsonConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+        props.put(TOPIC_CREATION_ENABLE_CONFIG, 
String.valueOf(enableTopicCreation));
+        return props;
+    }
+
+    private Map<String, String> sourceConnectorPropsWithGroups(String topic) {
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "foo-connector");
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(1));
+        props.put(TOPIC_CONFIG, topic);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+        props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", "foo", 
"bar"));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, 
String.valueOf(1));
+        props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, 
String.valueOf(1));
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "foo" + "." + 
INCLUDE_REGEX_CONFIG, topic);
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + 
INCLUDE_REGEX_CONFIG, ".*");
+        props.put(SourceConnectorConfig.TOPIC_CREATION_PREFIX + "bar" + "." + 
EXCLUDE_REGEX_CONFIG, topic);
+        return props;
+    }
+
+    @After
+    public void tearDown() {
+        if (metrics != null) metrics.stop();
+    }
+
+    private void createWorkerTask() {
+        createWorkerTask(TargetState.STARTED);
+    }
+
+    private void createWorkerTask(TargetState initialState) {
+        createWorkerTask(initialState, keyConverter, valueConverter, 
headerConverter);
+    }
+
+    private void createWorkerTask(TargetState initialState, Converter 
keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, 
initialState, keyConverter, valueConverter, headerConverter,
+                transformationChain, producer, admin, 
TopicCreationGroup.configuredGroups(sourceConfig),
+                offsetReader, offsetWriter, config, clusterConfigState, 
metrics, plugins.delegatingLoader(), Time.SYSTEM,
+                RetryWithToleranceOperatorTest.NOOP_OPERATOR, 
statusBackingStore);
+    }
+
+    @Test
+    public void testStartPaused() throws Exception {
+        final CountDownLatch pauseLatch = new CountDownLatch(1);
+
+        createWorkerTask(TargetState.PAUSED);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                pauseLatch.countDown();
+                return null;
+            }
+        });
+
+        expectClose();
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(pauseLatch.await(5, TimeUnit.SECONDS));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPause() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        AtomicInteger count = new AtomicInteger(0);
+        CountDownLatch pollLatch = expectPolls(10, count);
+        // In this test, we don't flush, so nothing goes any further than the 
offset writer
+
+        expectTopicCreation(TOPIC);
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+        assertTrue(awaitLatch(pollLatch));
+
+        workerTask.transitionTo(TargetState.PAUSED);
+
+        int priorCount = count.get();
+        Thread.sleep(100);
+
+        // since the transition is observed asynchronously, the count could be 
off by one loop iteration
+        assertTrue(count.get() - priorCount <= 1);
+
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollsInBackground() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = expectPolls(10);
+        // In this test, we don't flush, so nothing goes any further than the 
offset writer
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(10);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFailureInPoll() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = new CountDownLatch(1);
+        final RuntimeException exception = new RuntimeException();
+        EasyMock.expect(sourceTask.poll()).andAnswer(new 
IAnswer<List<SourceRecord>>() {
+            @Override
+            public List<SourceRecord> answer() throws Throwable {
+                pollLatch.countDown();
+                throw exception;
+            }
+        });
+
+        statusListener.onFailure(taskId, exception);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPollReturnsNoRecords() throws Exception {
+        // Test that the task handles an empty list of records
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectEmptyPolls(1, new 
AtomicInteger());
+        expectOffsetFlush(true);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(0);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitFailure() throws Exception {
+        // Test that the task commits properly when prompted
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        // We'll wait for some data, then trigger a flush
+        final CountDownLatch pollLatch = expectPolls(1);
+        expectOffsetFlush(true);
+
+        expectTopicCreation(TOPIC);
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(false);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
+        assertTrue(workerTask.commitOffsets());
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        taskFuture.get();
+        assertPollMetrics(1);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsConvertsData() throws Exception {
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        // Can just use the same record for key and value
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsPropagatesTimestamp() throws Exception {
+        final Long timestamp = System.currentTimeMillis();
+
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(timestamp, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = InvalidRecordException.class)
+    public void testSendRecordsCorruptTimestamp() throws Exception {
+        final Long timestamp = -3L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsNoTimestamp() throws Exception {
+        final Long timestamp = -1L;
+        createWorkerTask();
+
+        List<SourceRecord> records = Collections.singletonList(
+                new SourceRecord(PARTITION, OFFSET, TOPIC, null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+        );
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = 
expectSendRecordAnyTimes();
+
+        expectTopicCreation(TOPIC);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(null, sent.getValue().timestamp());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsRetries() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // First round
+        expectSendRecordOnce(false);
+        // Any Producer retriable exception should work here
+        expectSendRecordSyncFailure(new 
org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
+
+        // Second round
+        expectSendRecordOnce(true);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record2, record3), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerCallbackFail() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        expectSendRecordProducerCallbackFail();
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testSendRecordsProducerSendFailsImmediately() throws Exception 
{
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        expectTopicCreation(TOPIC);
+
+        EasyMock.expect(producer.send(EasyMock.anyObject(), 
EasyMock.anyObject()))
+                .andThrow(new KafkaException("Producer closed while send in 
progress", new InvalidTopicException(TOPIC)));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTaskCommitRecordFail() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, TOPIC, 3, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectTopicCreation(TOPIC);
+
+        // Source task commit record failure will not cause the task to abort
+        expectSendRecordOnce(false);
+        expectSendRecordTaskCommitRecordFail(false, false);
+        expectSendRecordOnce(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSlowTaskStart() throws Exception {
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch finishStartupLatch = new CountDownLatch(1);
+
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                startupLatch.countDown();
+                assertTrue(awaitLatch(finishStartupLatch));
+                return null;
+            }
+        });
+
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        expectClose();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        Future<?> workerTaskFuture = executor.submit(workerTask);
+
+        // Stopping immediately while the other thread has work to do should 
result in no polling, no offset commits,
+        // exiting the work thread immediately, and the stop() method will be 
invoked in the background thread since it
+        // cannot be invoked immediately in the thread trying to stop the task.
+        assertTrue(awaitLatch(startupLatch));
+        workerTask.stop();
+        finishStartupLatch.countDown();
+        assertTrue(workerTask.awaitStop(1000));
+
+        workerTaskFuture.get();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCancel() {
+        createWorkerTask();
+
+        offsetReader.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.cancel();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testMetricsGroup() {
+        SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, 
metrics);
+        SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, 
metrics);
+        for (int i = 0; i != 10; ++i) {
+            group.recordPoll(100, 1000 + i * 100);
+            group.recordWrite(10);
+        }
+        for (int i = 0; i != 20; ++i) {
+            group1.recordPoll(100, 1000 + i * 100);
+            group1.recordWrite(10);
+        }
+        assertEquals(1900.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1450.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(33.333, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-poll-rate"), 0.001d);
+        assertEquals(1000, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-poll-total"), 0.001d);
+        assertEquals(3.3333, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-rate"), 0.001d);
+        assertEquals(100, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-write-total"), 0.001d);
+        assertEquals(900.0, 
metrics.currentMetricValueAsDouble(group.metricGroup(), 
"source-record-active-count"), 0.001d);
+
+        // Close the group
+        group.close();
+
+        for (MetricName metricName : 
group.metricGroup().metrics().metrics().keySet()) {
+            // Metrics for this group should no longer exist
+            assertFalse(group.metricGroup().groupId().includes(metricName));
+        }
+        // Sensors for this group should no longer exist
+        
assertNull(group.metricGroup().metrics().getSensor("sink-record-read"));
+        
assertNull(group.metricGroup().metrics().getSensor("sink-record-send"));
+        
assertNull(group.metricGroup().metrics().getSensor("sink-record-active-count"));
+        assertNull(group.metricGroup().metrics().getSensor("partition-count"));
+        
assertNull(group.metricGroup().metrics().getSensor("offset-seq-number"));
+        
assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion"));
+        
assertNull(group.metricGroup().metrics().getSensor("offset-commit-completion-skip"));
+        assertNull(group.metricGroup().metrics().getSensor("put-batch-time"));
+
+        assertEquals(2900.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"poll-batch-max-time-ms"), 0.001d);
+        assertEquals(1950.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"poll-batch-avg-time-ms"), 0.001d);
+        assertEquals(66.667, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-poll-rate"), 0.001d);
+        assertEquals(2000, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-poll-total"), 0.001d);
+        assertEquals(6.667, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-rate"), 0.001d);
+        assertEquals(200, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-write-total"), 0.001d);
+        assertEquals(1800.0, 
metrics.currentMetricValueAsDouble(group1.metricGroup(), 
"source-record-active-count"), 0.001d);
+    }
+
+    @Test
+    public void testHeaders() throws Exception {
+        Headers headers = new RecordHeaders();
+        headers.add("header_key", "header_value".getBytes());
+
+        org.apache.kafka.connect.header.Headers connectHeaders = new 
ConnectHeaders();
+        connectHeaders.add("header_key", new 
SchemaAndValue(Schema.STRING_SCHEMA, "header_value"));
+
+        createWorkerTask();
+
+        List<SourceRecord> records = new ArrayList<>();
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, null, connectHeaders));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, 
true, false, true, true, true, headers);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+        assertEquals(headers, sent.getValue().headers());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testHeadersWithCustomConverter() throws Exception {
+        StringConverter stringConverter = new StringConverter();
+        TestConverterWithHeaders testConverter = new 
TestConverterWithHeaders();
+
+        createWorkerTask(TargetState.STARTED, stringConverter, testConverter, 
stringConverter);
+
+        List<SourceRecord> records = new ArrayList<>();
+
+        String stringA = "Árvíztűrő tükörfúrógép";
+        org.apache.kafka.connect.header.Headers headersA = new 
ConnectHeaders();
+        String encodingA = "latin2";
+        headersA.addString("encoding", encodingA);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
Schema.STRING_SCHEMA, "a", Schema.STRING_SCHEMA, stringA, null, headersA));
+
+        String stringB = "Тестовое сообщение";
+        org.apache.kafka.connect.header.Headers headersB = new 
ConnectHeaders();
+        String encodingB = "koi8_r";
+        headersB.addString("encoding", encodingB);
+
+        records.add(new SourceRecord(PARTITION, OFFSET, TOPIC, null, 
Schema.STRING_SCHEMA, "b", Schema.STRING_SCHEMA, stringB, null, headersB));
+
+        expectTopicCreation(TOPIC);
+
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordA = 
expectSendRecord(TOPIC, false, false, true, true, false, null);
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordB = 
expectSendRecord(TOPIC, false, false, true, true, false, null);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", records);
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+
+        assertEquals(ByteBuffer.wrap("a".getBytes()), 
ByteBuffer.wrap(sentRecordA.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringA.getBytes(encodingA)),
+            ByteBuffer.wrap(sentRecordA.getValue().value())
+        );
+        assertEquals(encodingA, new 
String(sentRecordA.getValue().headers().lastHeader("encoding").value()));
+
+        assertEquals(ByteBuffer.wrap("b".getBytes()), 
ByteBuffer.wrap(sentRecordB.getValue().key()));
+        assertEquals(
+            ByteBuffer.wrap(stringB.getBytes(encodingB)),
+            ByteBuffer.wrap(sentRecordB.getValue().value())
+        );
+        assertEquals(encodingB, new 
String(sentRecordB.getValue().headers().lastHeader("encoding").value()));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTopicCreateWhenTopicExists() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
null, Collections.emptyList(), Collections.emptyList());
+        TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC,
 topicDesc));
+
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        // First round - call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round - calls to describe and create succeed
+        expectTopicCreation(TOPIC);
+        // Exactly two records are sent
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetries() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First call to describe the topic times out
+        expectPreliminaryCalls();
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record1, record2), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+    }
+
+    @Test
+    public void testSendRecordsTopicDescribeRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        // First call to describe the topic times out
+        EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, 
emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record3), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendRecordsTopicCreateRetriesMidway() throws Exception {
+        createWorkerTask();
+
+        // Differentiate only by Kafka partition so we can reuse conversion 
expectations
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        // First round
+        expectPreliminaryCalls(OTHER_TOPIC);
+        expectTopicCreation(TOPIC);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false, false);
+
+        
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
+        // First call to create the topic times out
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
+
+        // Second round
+        expectTopicCreation(OTHER_TOPIC);
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, 
emptyHeaders());
+
+        PowerMock.replayAll();
+
+        // Try to send 3, make first pass, second fail. Should save last two
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertEquals(Arrays.asList(record3), 
Whitebox.getInternalState(workerTask, "toSend"));
+
+        // Next they all succeed
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
+        assertNull(Whitebox.getInternalState(workerTask, "toSend"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicDescribeFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        EasyMock.expect(admin.describeTopics(TOPIC))
+                .andThrow(new ConnectException(new 
TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFails() throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture)))
+                .andThrow(new ConnectException(new 
TopicAuthorizationException("unauthorized")));
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test(expected = ConnectException.class)
+    public void testTopicCreateFailsWithExceptionWhenCreateReturnsFalse() 
throws Exception {
+        createWorkerTask();
+
+        SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+        SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
+
+        expectPreliminaryCalls();
+        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+
+        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+        
EasyMock.expect(admin.createTopic(EasyMock.capture(newTopicCapture))).andReturn(false);
+
+        PowerMock.replayAll();
+
+        Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
+        Whitebox.invokeMethod(workerTask, "sendRecords");
+        assertNotNull(newTopicCapture.getValue());
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsEnabled() {
+        TopicCreationGroup expectedDefaultGroup =
+                
TopicCreationGroup.configuredGroups(sourceConfig).get(DEFAULT_TOPIC_CREATION_GROUP);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertTrue(topicCreation.isTopicCreationEnabled());
+        assertTrue(topicCreation.isTopicCreationRequired(TOPIC));
+        assertThat(topicCreation.defaultTopicGroup(), 
is(expectedDefaultGroup));
+        assertEquals(2, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups().keySet(), hasItems("foo", 
"bar"));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testTopicCreationClassWhenTopicCreationIsDisabled() {
+        Map<String, String> workerProps = workerProps();
+        workerProps.put(TOPIC_CREATION_ENABLE_CONFIG, String.valueOf(false));
+        config = new StandaloneConfig(workerProps);
+
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config,
+                TopicCreationGroup.configuredGroups(sourceConfig));
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }
+
+    @Test
+    public void testEmptyTopicCreationClass() {
+        TopicCreation topicCreation = TopicCreation.newTopicCreation(config, 
null);
+
+        assertFalse(topicCreation.isTopicCreationEnabled());
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+        assertNull(topicCreation.defaultTopicGroup());
+        assertEquals(0, topicCreation.topicGroups().size());
+        assertThat(topicCreation.topicGroups(), is(Collections.emptyMap()));
+        topicCreation.addTopic(TOPIC);
+        assertFalse(topicCreation.isTopicCreationRequired(TOPIC));
+    }

Review comment:
       Indeed. That's in my latest commit. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,157 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = 
TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of 
configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, 
"Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, 
TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, 
TOPIC_CREATION_GROUPS_DISPLAY);
+    }
+
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, 
TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code 
ConfigDef}, using the current configuration specified in {@code props} as an 
input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, 
String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, 
props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + 
DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = 
defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = 
defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + 
TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, 
defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    @Override
+    public Object get(String key) {

Review comment:
       Good catch on that one. I'll move it below. The configs are the classes 
that often have the most random layout.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";

Review comment:
       Accepting and will fix typo in a follow up commit. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";

Review comment:
       Accepting and will fix typo in a follow up commit. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to 
{}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, 
e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), 
recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, 
recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        
recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} 
partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), 
recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, 
recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
+            } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: 
", this, producerRecord, e);

Review comment:
       Lmk if it looks ok now. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new 
topics created for"
+            + " this connector. For the default group this configuration is 
required. For any "
+            + "other group defined in topic.creation.groups this config is 
optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in 
regular expression");

Review comment:
       Yeah, sure. I missed that in other cases we add the exception message. 
This exception does not accept a Throwable as a cause. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##########
@@ -250,9 +251,17 @@
             + "user requests to reset the set of active topics per connector.";
     protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+    public static final String TOPIC_CREATION_ENABLE_CONFIG = 
"topic.creation.enable";
+    protected static final String TOPIC_CREATION_ENABLE_DOC = "If set to true, 
it allows "
+            + "source connectors to create topics by specifying topic creation 
properties "
+            + "with the prefix `" + TOPIC_CREATION_PREFIX + "`. Each task will 
use an "
+            + "admin client to create its topics and will not depend on the 
Kafka brokers "
+            + "to create topics automatically.";

Review comment:
       Accepted the suggestion here and I'll return to fix the typo in `source 
connector`

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";

Review comment:
       I assume you mean to `match the topic names` 
   I added what I had on the kip here. I agree it's a bit convoluted. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to 
{}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, 
e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), 
recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, 
recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        
recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} 
partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), 
recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, 
recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
+            } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: 
", this, producerRecord, e);

Review comment:
       True. I kept what we were doing. Backports won't be easy due to 
conflicts from `trunk` to `2.5` anyways. I'll merge here and add it to the list 
of things we should consider backporting in `2.5` and back. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -344,37 +357,38 @@ private boolean sendRecords() {
                 }
             }
             try {
+                maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
                 producer.send(
-                        producerRecord,
-                        new Callback() {
-                            @Override
-                            public void onCompletion(RecordMetadata 
recordMetadata, Exception e) {
-                                if (e != null) {
-                                    log.error("{} failed to send record to 
{}:", WorkerSourceTask.this, topic, e);
-                                    log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
-                                    producerSendException.compareAndSet(null, 
e);
-                                } else {
-                                    recordSent(producerRecord);
-                                    counter.completeRecord();
-                                    log.trace("{} Wrote record successfully: 
topic {} partition {} offset {}",
-                                            WorkerSourceTask.this,
-                                            recordMetadata.topic(), 
recordMetadata.partition(),
-                                            recordMetadata.offset());
-                                    commitTaskRecord(preTransformRecord, 
recordMetadata);
-                                    if (isTopicTrackingEnabled) {
-                                        
recordActiveTopic(producerRecord.topic());
-                                    }
-                                }
+                    producerRecord,
+                    (recordMetadata, e) -> {
+                        if (e != null) {
+                            log.error("{} failed to send record to {}: ", 
WorkerSourceTask.this, topic, e);
+                            log.debug("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
+                            producerSendException.compareAndSet(null, e);
+                        } else {
+                            recordSent(producerRecord);
+                            counter.completeRecord();
+                            log.trace("{} Wrote record successfully: topic {} 
partition {} offset {}",
+                                    WorkerSourceTask.this,
+                                    recordMetadata.topic(), 
recordMetadata.partition(),
+                                    recordMetadata.offset());
+                            commitTaskRecord(preTransformRecord, 
recordMetadata);
+                            if (isTopicTrackingEnabled) {
+                                recordActiveTopic(producerRecord.topic());
                             }
-                        });
+                        }
+                    });
                 lastSendFailed = false;
-            } catch (org.apache.kafka.common.errors.RetriableException e) {
-                log.warn("{} Failed to send {}, backing off before retrying:", 
this, producerRecord, e);
+            } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
+                log.warn("{} Failed to send {}, backing off before retrying: 
", this, producerRecord, e);
                 toSend = toSend.subList(processed, toSend.size());
                 lastSendFailed = true;
                 counter.retryRemaining();
                 return false;
+            } catch (ConnectException e) {
+                log.warn("{} Failed to send {} with unrecoverable exception: 
", this, producerRecord, e);

Review comment:
       Changed as above. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -166,11 +176,14 @@ protected void close() {
                 log.warn("Could not close producer", t);
             }
         }
-        try {
-            transformationChain.close();
-        } catch (Throwable t) {
-            log.warn("Could not close transformation chain", t);
+        if (admin != null) {
+            try {
+                admin.close(Duration.ofSeconds(30));
+            } catch (Throwable t) {
+                log.warn("Failed to close admin client on time", t);
+            }

Review comment:
       We don't do that with other clients or in `closeQuietly`
   I'd say we don't need to because although not required by `AutoCloseable` 
the admin implementation of `close` seems idempotent. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
##########
@@ -16,21 +16,167 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_GROUP;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.EXCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 
 public class SourceConnectorConfig extends ConnectorConfig {
 
-    private static ConfigDef config = ConnectorConfig.configDef();
+    protected static final String TOPIC_CREATION_GROUP = "Topic Creation";
+
+    public static final String TOPIC_CREATION_PREFIX = "topic.creation.";
+
+    public static final String TOPIC_CREATION_GROUPS_CONFIG = 
TOPIC_CREATION_PREFIX + "groups";
+    private static final String TOPIC_CREATION_GROUPS_DOC = "Groups of 
configurations for topics "
+            + "created by source connectors";
+    private static final String TOPIC_CREATION_GROUPS_DISPLAY = "Topic 
Creation Groups";
+
+    private static class EnrichedSourceConnectorConfig extends AbstractConfig {
+        EnrichedSourceConnectorConfig(ConfigDef configDef, Map<String, String> 
props) {
+            super(configDef, props);
+        }
+
+        @Override
+        public Object get(String key) {
+            return super.get(key);
+        }
+    }
+
+    private static ConfigDef config = SourceConnectorConfig.configDef();
+    private final EnrichedSourceConnectorConfig enrichedSourceConfig;
 
     public static ConfigDef configDef() {
-        return config;
+        int orderInGroup = 0;
+        return new ConfigDef(ConnectorConfig.configDef())
+                .define(TOPIC_CREATION_GROUPS_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        ConfigDef.CompositeValidator.of(new 
ConfigDef.NonNullValidator(), ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                List<?> groupAliases = (List<?>) value;
+                                if (groupAliases.size() > new 
HashSet<>(groupAliases).size()) {
+                                    throw new ConfigException(name, value, 
"Duplicate alias provided.");
+                                }
+                            },
+                            () -> "unique topic creation groups")),
+                        ConfigDef.Importance.LOW, TOPIC_CREATION_GROUPS_DOC, 
TOPIC_CREATION_GROUP,
+                        ++orderInGroup, ConfigDef.Width.LONG, 
TOPIC_CREATION_GROUPS_DISPLAY);
     }
 
-    public SourceConnectorConfig(Plugins plugins, Map<String, String> props) {
+    public static ConfigDef embedDefaultGroup(ConfigDef baseConfigDef) {
+        String defaultGroup = "default";
+        ConfigDef newDefaultDef = new ConfigDef(baseConfigDef);
+        newDefaultDef.embed(DEFAULT_TOPIC_CREATION_PREFIX, defaultGroup, 0, 
TopicCreationConfig.defaultGroupConfigDef());
+        return newDefaultDef;
+    }
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code 
ConfigDef}, using the current configuration specified in {@code props} as an 
input.
+     *
+     * @param baseConfigDef the base configuration definition to be enriched
+     * @param props the non parsed configuration properties
+     * @return the enriched configuration definition
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, 
String> props, AbstractConfig defaultGroupConfig) {
+        List<Object> topicCreationGroups = new ArrayList<>();
+        Object aliases = ConfigDef.parseType(TOPIC_CREATION_GROUPS_CONFIG, 
props.get(TOPIC_CREATION_GROUPS_CONFIG), ConfigDef.Type.LIST);
+        if (aliases instanceof List) {
+            topicCreationGroups.addAll((List<?>) aliases);
+        }
+
+        ConfigDef newDef = new ConfigDef(baseConfigDef);
+        String defaultGroupPrefix = TOPIC_CREATION_PREFIX + 
DEFAULT_TOPIC_CREATION_GROUP + ".";
+        short defaultGroupReplicationFactor = 
defaultGroupConfig.getShort(defaultGroupPrefix + REPLICATION_FACTOR_CONFIG);
+        int defaultGroupPartitions = 
defaultGroupConfig.getInt(defaultGroupPrefix + PARTITIONS_CONFIG);
+        topicCreationGroups.stream().distinct().forEach(group -> {
+            if (!(group instanceof String)) {
+                throw new ConfigException("Item in " + 
TOPIC_CREATION_GROUPS_CONFIG + " property is not of type String");
+            }
+            String alias = (String) group;
+            String prefix = TOPIC_CREATION_PREFIX + alias + ".";
+            String configGroup = TOPIC_CREATION_GROUP + ": " + alias;
+            newDef.embed(prefix, configGroup, 0,
+                    TopicCreationConfig.configDef(configGroup, 
defaultGroupReplicationFactor, defaultGroupPartitions));
+        });
+        return newDef;
+    }
+
+    public SourceConnectorConfig(Plugins plugins, Map<String, String> props, 
boolean createTopics) {
         super(plugins, config, props);
+        if (createTopics && props.entrySet().stream().anyMatch(e -> 
e.getKey().startsWith(TOPIC_CREATION_PREFIX))) {
+            ConfigDef defaultConfigDef = embedDefaultGroup(config);
+            // This config is only used to set default values for partitions 
and replication
+            // factor from the default group and otherwise it remains unused
+            AbstractConfig defaultGroup = new AbstractConfig(defaultConfigDef, 
props, false);
+
+            // If the user has added regex of include or exclude patterns in 
the default group,
+            // they should be ignored.

Review comment:
       TDD :) 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular 
expression literals "
+            + "used to match the names topics used by the source connector. 
This list is used "
+            + "to include topics that should be created using the topic 
settings defined by this group.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String INCLUDE_REGEX_DOC = "A list of regular 
expression literals "

Review comment:
       NP. Fixed. 

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.util.TopicAdmin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+public class TopicCreationConfig {
+
+    public static final String DEFAULT_TOPIC_CREATION_PREFIX = 
"topic.creation.default.";
+    public static final String DEFAULT_TOPIC_CREATION_GROUP = "default";
+
+    public static final String INCLUDE_REGEX_CONFIG = "include";
+    private static final String INCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
include topics that "
+            + "match their values and apply this group's specific 
configuration to the topics "
+            + "that match this inclusion list.";
+
+    public static final String EXCLUDE_REGEX_CONFIG = "exclude";
+    private static final String EXCLUDE_REGEX_DOC = "A list of strings that 
represent regular "
+            + "expressions that may match topic names. This list is used to 
exclude topics that "
+            + "match their values and refrain from applying this group's 
specific configuration "
+            + "to the topics that match this exclusion list. Note that 
exclusion rules have "
+            + "precedent and override any inclusion rules for topics. ";
+
+    public static final String REPLICATION_FACTOR_CONFIG = 
"replication.factor";
+    private static final String REPLICATION_FACTOR_DOC = "The replication 
factor for new topics "
+            + "created for this connector. This value must not be larger than 
the number of "
+            + "brokers in the Kafka cluster, or otherwise an error will be 
thrown when the "
+            + "connector will attempt to create a topic. For the default group 
this configuration"
+            + " is required. For any other group defined in 
topic.creation.groups this config is "
+            + "optional and if it's missing it gets the value the default 
group";
+
+    public static final String PARTITIONS_CONFIG = "partitions";
+    private static final String PARTITIONS_DOC = "The number of partitions new 
topics created for"
+            + " this connector. For the default group this configuration is 
required. For any "
+            + "other group defined in topic.creation.groups this config is 
optional and if it's "
+            + "missing it gets the value the default group";
+
+    public static final ConfigDef.Validator REPLICATION_FACTOR_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validateReplicationFactor(name, (short) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    public static final ConfigDef.Validator PARTITIONS_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> validatePartitions(name, (int) value),
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+    @SuppressWarnings("unchecked")
+    public static final ConfigDef.Validator REGEX_VALIDATOR = 
ConfigDef.LambdaValidator.with(
+        (name, value) -> {
+            try {
+                ((List<String>) value).forEach(Pattern::compile);
+            } catch (PatternSyntaxException e) {
+                throw new ConfigException(name, value, "Syntax error in 
regular expression");
+            }
+        },
+        () -> "Positive number, or -1 to use the broker's default"
+    );
+
+    private static void validatePartitions(String configName, int factor) {
+        if (factor != TopicAdmin.NO_PARTITIONS && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Number of partitions must be positive, or -1 to use the 
broker's default");
+        }
+    }
+
+    private static void validateReplicationFactor(String configName, short 
factor) {
+        if (factor != TopicAdmin.NO_REPLICATION_FACTOR && factor < 1) {
+            throw new ConfigException(configName, factor,
+                    "Replication factor must be positive, or -1 to use the 
broker's default");
+        }
+    }
+
+    public static ConfigDef configDef(String group, short 
defaultReplicationFactor, int defaultParitionCount) {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, group, ++orderInGroup, 
ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + group)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        REGEX_VALIDATOR, ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, group, ++orderInGroup, 
ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + group)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        defaultReplicationFactor, REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, 
group, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics 
in " + group)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        defaultParitionCount, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, group, 
++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " 
+ group);
+        return configDef;
+    }
+
+    public static ConfigDef defaultGroupConfigDef() {
+        int orderInGroup = 0;
+        ConfigDef configDef = new ConfigDef();
+        configDef
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, ".*",
+                        new ConfigDef.NonNullValidator(), 
ConfigDef.Importance.LOW,
+                        INCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, 
++orderInGroup, ConfigDef.Width.LONG,
+                        "Inclusion Topic Pattern for " + 
DEFAULT_TOPIC_CREATION_GROUP)
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                        new ConfigDef.NonNullValidator(), 
ConfigDef.Importance.LOW,
+                        EXCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, 
++orderInGroup, ConfigDef.Width.LONG,
+                        "Exclusion Topic Pattern for " + 
DEFAULT_TOPIC_CREATION_GROUP)
+                .define(REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT,
+                        ConfigDef.NO_DEFAULT_VALUE, 
REPLICATION_FACTOR_VALIDATOR,
+                        ConfigDef.Importance.LOW, REPLICATION_FACTOR_DOC, 
DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Replication Factor for Topics 
in " + DEFAULT_TOPIC_CREATION_GROUP)
+                .define(PARTITIONS_CONFIG, ConfigDef.Type.INT,
+                        ConfigDef.NO_DEFAULT_VALUE, PARTITIONS_VALIDATOR,
+                        ConfigDef.Importance.LOW, PARTITIONS_DOC, 
DEFAULT_TOPIC_CREATION_GROUP, ++orderInGroup,
+                        ConfigDef.Width.LONG, "Partition Count for Topics in " 
+ DEFAULT_TOPIC_CREATION_GROUP);
+        return configDef;
+    }

Review comment:
       I'm in favor of including since it's well tested. We could reconsider a 
consolidation in the future. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to