C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r786311430
########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support, + * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the + * connector cannot. + */ + public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) { + return null; + } + + /** + * Signals whether the connector can define its own transaction boundaries with the proposed + * configuration. Developers must override this method if they wish to add connector-defined + * transaction boundary support; if they do not, users will be unable to create instances of + * this connector that use connector-defined transaction boundaries. The default implementation + * will return {@code UNSUPPORTED}. Review comment: LGTM. I added a small note on not returning `null` with the same language as in `SourceConnector::exactlyOnceSupport` but otherwise added this verbatim. LMKWYT ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + public enum TransactionBoundary { Review comment: Just making sure: is there anything left to address for this comment? ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + public enum TransactionBoundary { + POLL, + INTERVAL, + CONNECTOR; + + public static final TransactionBoundary DEFAULT = POLL; + + public static List<String> options() { Review comment: Neither of these patterns play nicely with the validators offered by `ConfigDef`, though. Both `ValidString` and `CaseInsensitiveValidString` require an array (or really, a varargs list) of strings in their constructors, which prevents us from using `Enum::values` directly to create one of these validators. I've removed `options()` (from both `TransactionBoundary` and `ExactlyOnceSupportLevel`) in favor of a reusable `Utils::enumOptions` method, which returns a `String[]` containing the names (retrieved via `toString`) of each value for an enum class. This prevents us from having to duplicate this logic and allows us to use it without adding new methods to public API. The actual implementations for both enums now follow the `ConnectorType` pattern, with an overridden `toString` method that returns the lowercase name of the enum and a static `fromProperty` method (which mirrors the `ConnectorType::fromValue` method) that parses a string case-insensitively into a value for the enum. I opted for a different method name for the latter since the values given to `fromProperty` should always be user-supplied values and the motivation for this separate method (instead of just invoking `Enum::valueOf` directly) is directly related to parsing user-supplied values. We could also add a general-purpose `EnumValidator` class to `ConfigDef` (for public use) or in a section of the Connect code base reserved for private API, but that would still leave us with the trouble of having to re-implement logic to follow the phrase "Permitted values are" in property docstrings, and it's probably best to hold off on implementing a public-facing validator like that until its exact behavior can be agreed on with a KIP. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java ########## @@ -38,4 +38,29 @@ * Get the OffsetStorageReader for this SourceTask. */ OffsetStorageReader offsetStorageReader(); + + /** + * Get a {@link TransactionContext} that can be used to define producer transaction boundaries + * when exactly-once support is enabled for the connector. + * + * <p>This method was added in Apache Kafka 3.0. Source tasks that use this method but want to + * maintain backward compatibility so they can also be deployed to older Connect runtimes + * should guard the call to this method with a try-catch block, since calling this method will result in a + * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to + * Connect runtimes older than Kafka 3.0. For example: + * <pre> + * TransactionContext transactionContext; + * try { + * transactionContext = context.transactionContext(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * transactionContext = null; + * } + * </pre> + * + * @return the transaction context, or null if the connector was not configured to specify transaction boundaries + * @since 3.0 Review comment: I believe 3.2 is the correct version now, right? Will update to that. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,39 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * For backwards compatibility, the default implementation will return {@code null}, but connector developers are Review comment: Ack, done. (And added a few more paragraph breaks in other places too). ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +/** + * Provided to source tasks to allow them to define their own producer transaction boundaries when + * exactly-once support is enabled. + */ +public interface TransactionContext { Review comment: The behavior specified in the KIP is for `SourceTaskContext::transactionContext` to return "the transaction context, or null if the connector was not configured to specify transaction boundaries" (see the Javadocs for that method in the code snippets in [the relevant KIP section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-ConnectorAPIexpansions)). This is implemented in the [`ExactlyOnceWorkerSourceTask`](https://github.com/C0urante/kafka/blob/c06207d64a76286316b862e16143f909181501c3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java#L113-L117) class and should prevent connectors from invoking `TransactionContext` methods and accumulating records when the user has not configured the connector to define its own transaction boundaries. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -192,6 +199,44 @@ public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests"; public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT); + private enum ExactlyOnceSourceSupport { + DISABLED(false), + PREPARING(true), + ENABLED(true); + + public final boolean usesTransactionalLeader; + + ExactlyOnceSourceSupport(boolean usesTransactionalLeader) { + this.usesTransactionalLeader = usesTransactionalLeader; + } + + public static List<String> options() { + return Stream.of(values()).map(ExactlyOnceSourceSupport::toString).collect(Collectors.toList()); + } + + public static ExactlyOnceSourceSupport fromProperty(String property) { + return ExactlyOnceSourceSupport.valueOf(property.toUpperCase(Locale.ROOT)); + } + + @Override + public String toString() { + return name().toLowerCase(Locale.ROOT); + } + } + + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support"; + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster " + + "by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. " Review comment: Ack, done. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedConfigTest.java ########## @@ -294,4 +298,41 @@ public void shouldRemoveCompactionFromStatusTopicSettings() { assertEquals(expectedTopicSettings, actual); assertNotEquals(topicSettings, actual); } + + @Test + public void shouldIdentifyNeedForTransactionalLeader() { + Map<String, String> workerProps = configs(); + + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled"); + assertFalse(new DistributedConfig(workerProps).transactionalLeaderEnabled()); + + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "preparing"); + assertTrue(new DistributedConfig(workerProps).transactionalLeaderEnabled()); + + workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled"); + assertTrue(new DistributedConfig(workerProps).transactionalLeaderEnabled()); + } + + @Test + public void shouldConstructExpectedTransactionalId() { + Map<String, String> workerProps = configs(); + + workerProps.put(GROUP_ID_CONFIG, "why did i stay up all night writing unit tests"); + assertEquals( + "connect-cluster-why did i stay up all night writing unit tests", + new DistributedConfig(workerProps).transactionalProducerId() + ); + + workerProps.put(GROUP_ID_CONFIG, "connect-cluster"); + assertEquals( + "connect-cluster-connect-cluster", + new DistributedConfig(workerProps).transactionalProducerId() + ); + + workerProps.put(GROUP_ID_CONFIG, "\u2603"); + assertEquals( + "connect-cluster-\u2603", + new DistributedConfig(workerProps).transactionalProducerId() + ); + } Review comment: Ack, done. The tests are located elsewhere since the logic is implemented in a different class, but there are new cases for this in the `KafkaConfigBackingStoreTest`, `KafkaOffsetBackingStoreTest`, and `WorkerTest` test suites. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -125,28 +130,28 @@ public KafkaBasedLog(String topic, * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until * {@link #start()} is invoked. * - * @param topic the topic to treat as a log - * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the number of acks, will be overridden to ensure correct behavior of this * class. - * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the auto offset reset policy, will be overridden to ensure correct * behavior of this class. - * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled + * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled * by the calling component; may not be null - * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log - * @param time Time interface - * @param initializer the function that should be run when this log is {@link #start() started}; may be null + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + * @param initializer the function that should be run when this log is {@link #start() started}; may be null */ public KafkaBasedLog(String topic, - Map<String, Object> producerConfigs, - Map<String, Object> consumerConfigs, - Supplier<TopicAdmin> topicAdminSupplier, - Callback<ConsumerRecord<K, V>> consumedCallback, - Time time, - java.util.function.Consumer<TopicAdmin> initializer) { + Map<String, Object> producerConfigs, + Map<String, Object> consumerConfigs, + Supplier<TopicAdmin> topicAdminSupplier, + Callback<ConsumerRecord<K, V>> consumedCallback, + Time time, + java.util.function.Consumer<TopicAdmin> initializer) { Review comment: Apologies, will revert. Believe this was left in from an older approach that involved more invasive changes to this class. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -365,6 +413,10 @@ private void readToLogEnd() { // This may happen with really old brokers that don't support the auto topic creation // field in metadata requests log.debug("Reading to end of log offsets with consumer since admin client is unsupported: {}", e.getMessage()); + if (requireAdminForOffsets) { + // Should be handled by the caller during log startup + throw e; + } Review comment: Ack on both points; done 👍 ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -264,12 +279,62 @@ public void startConnector( log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); - final ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector) - ? new SinkConnectorConfig(plugins, connProps) - : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + final ConnectorConfig connConfig; + final CloseableOffsetStorageReader offsetReader; + if (ConnectUtils.isSinkConnector(connector)) { + connConfig = new SinkConnectorConfig(plugins, connProps); + offsetReader = null; Review comment: Ah, good catch! I think this is one more place where `Utils::closeQuietly` comes in handy since it does the null check for us, won't interrupt shutdown if an exception is thrown, and further standardizes the cleanup logic for the code base. Alternatively, we could retain the current behavior, but I don't see much use in constructing an offset reader for sink connectors that won't be used now and is unlikely to be used in the future. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +public class ConnectorOffsetBackingStore implements OffsetBackingStore { Review comment: Ack, done. Added docs to the (now-three) static factory methods and to each of the overridden `OffsetBackingStore` methods. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +public class ConnectorOffsetBackingStore implements OffsetBackingStore { + + private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class); + + private final Time time; + private final Supplier<LoggingContext> loggingContext; + private final String primaryOffsetsTopic; + private final OffsetBackingStore workerStore; + private final Optional<OffsetBackingStore> connectorStore; + private final Optional<TopicAdmin> connectorStoreAdmin; + + public static ConnectorOffsetBackingStore withConnectorOffsetStore( Review comment: Ack, done. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ########## @@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) { } } + /** + * Log a warning when the user attempts to override a property that cannot be overridden. + * @param props the configuration properties provided by the user + * @param key the name of the property to check on + * @param expectedValue the expected value for the property + * @param justification the reason the property cannot be overridden. + * Will follow the phrase "The value... for the... property will be ignored as it cannot be overridden ". + * For example, one might supply the message "in connectors with the DLQ feature enabled" for this parameter. + * @param caseSensitive whether the value should match case-insensitively + */ + public static void warnOnOverriddenProperty( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + overriddenPropertyWarning(props, key, expectedValue, justification, caseSensitive).ifPresent(log::warn); + } + + // Visible for testing + static Optional<String> overriddenPropertyWarning( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + Predicate<String> matchesExpectedValue = caseSensitive ? expectedValue::equals : expectedValue::equalsIgnoreCase; + String value = Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null); + if (value != null && !matchesExpectedValue.test(value)) { + return Optional.of(String.format( + "The value '%s' for the '%s' property will be ignored as it cannot be overridden%s. " + + "The value '%s' will be used instead.", Review comment: I suppose not! There are two classes that currently leverage this utility method: `Worker` and `DistributedConfig`. In the former, each invocation of `warnOnOverriddenProperty` is followed closely (if not immediately) by a line that overrides any user-supplied values that don't match the expected value. However, in `DistributedConfig`, warnings are emitted for values that are ignored and overridden when constructing Kafka clients for some but not all `KafkaBasedLog` instances: specifically, the ones used for the config and offset backing stores. It's possible that a user might specify `enable.idempotence=false` in their worker config with the intention of disabling idempotent writes for the producers used for the status backing store (since the default for this property was updated from `false` to `true` in [KIP-679](https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default)). With the current behavior for this PR, that change would have an effect (and the values logged for the producer for the status backing store would confirm this), but the warning message would still be logged, which might lead to some confusion. At the bare minimum we can and should update this message to be more specific about where exactly these values will be ignored, but in addition to that, I've taken a stab at restructuring the reusable utility function to actually perform the override in addition to just warning about it, like you suggested in a comment below. This makes it harder to log warnings about overrides without actually ensuring that the overrides have taken place. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -264,12 +279,62 @@ public void startConnector( log.info("Creating connector {} of type {}", connName, connClass); final Connector connector = plugins.newConnector(connClass); - final ConnectorConfig connConfig = ConnectUtils.isSinkConnector(connector) - ? new SinkConnectorConfig(plugins, connProps) - : new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + final ConnectorConfig connConfig; + final CloseableOffsetStorageReader offsetReader; + if (ConnectUtils.isSinkConnector(connector)) { + connConfig = new SinkConnectorConfig(plugins, connProps); + offsetReader = null; + } else { + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); + connConfig = sourceConfig; + + String connectorOffsetsTopic = null; + if (sourceConfig.offsetsTopic() != null) { + connectorOffsetsTopic = sourceConfig.offsetsTopic(); + } else if (config.exactlyOnceSourceEnabled()) { + connectorOffsetsTopic = config.offsetsTopic(); + } + Review comment: Ack, done. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -476,22 +541,95 @@ public boolean isRunning(String connName) { } /** - * Start a task managed by this worker. + * Start a sink task managed by this worker. + * + * @param id the task ID. + * @param configState the most recent {@link ClusterConfigState} known to the worker + * @param connProps the connector properties. + * @param taskProps the tasks properties. + * @param statusListener a listener for the runtime status transitions of the task. + * @param initialState the initial state of the connector. + * @return true if the task started successfully. + */ + public boolean startSinkTask( + ConnectorTaskId id, + ClusterConfigState configState, + Map<String, String> connProps, + Map<String, String> taskProps, + TaskStatus.Listener statusListener, + TargetState initialState + ) { + return startTask(id, connProps, taskProps, statusListener, + new SinkTaskBuilder(id, configState, statusListener, initialState)); + } + + /** + * Start a source task managed by this worker. Review comment: Ack, done. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -692,21 +823,22 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, ConnectUtils.addMetricsContextProperties(consumerProps, config, clusterId); // Connector-specified overrides Map<String, Object> consumerOverrides = - connectorClientConfigOverrides(id, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, + connectorClientConfigOverrides(connName, connConfig, connectorClass, ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX, ConnectorType.SINK, ConnectorClientConfigRequest.ClientType.CONSUMER, connectorClientConfigOverridePolicy); consumerProps.putAll(consumerOverrides); return consumerProps; } - static Map<String, Object> adminConfigs(ConnectorTaskId id, + static Map<String, Object> adminConfigs(String connName, Review comment: We have to be able to construct admin clients for `Connector` instances now, whereas before we only used them for `Task` instances. I do sympathize with the desire for task-specific tweaks, though. Right now the pattern is for the `adminConfigs` method to define a new parameter for anything that should be derived from the task ID/connector name and place the onus on the caller to derive that value (for example, this is done with the `defaultClientId` parameter). I think this works for now but one pattern we could consider is to add separate wrapper `adminConfigs` methods for clients constructed on behalf of connectors and on behalf of tasks, similar to the new wrapper methods I've added for (regular and exactly-once) offset consumers and exactly-once task producers. Thoughts? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { Review comment: I've extracted a lot of this logic into separate methods for testability and readability, but tried to stick with the general spirit of this comment while doing so and left a few comments and added some `final boolean` local variables in the newly-isolated methods. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + admin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + if (sourceConfig.offsetsTopic() != null && config.connectorOffsetsTopicsPermitted()) { + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + // Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways + consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + sourceConfig.offsetsTopic(), + producer, + consumer, + admin + ); + } + } else { + admin = null; + } + offsetStore.configure(config); + + CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + + // Note we pass the configState as it performs dynamic transformations under the covers + return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, + headerConverter, transformationChain, producer, admin, topicCreationGroups, + offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, + retryWithToleranceOperator, herder.statusBackingStore(), executor); + } + } + + class ExactlyOnceSourceTaskBuilder extends TaskBuilder { + private final Runnable preProducerCheck; + private final Runnable postProducerCheck; + + public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + Runnable preProducerCheck, + Runnable postProducerCheck) { + super(id, configState, statusListener, initialState); + this.preProducerCheck = preProducerCheck; + this.postProducerCheck = postProducerCheck; + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, Review comment: This section has been pretty heavily refactored; I hope the current state of things honors the intent of this comment but if not LMK. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + admin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + if (sourceConfig.offsetsTopic() != null && config.connectorOffsetsTopicsPermitted()) { + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + // Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways + consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + sourceConfig.offsetsTopic(), + producer, + consumer, + admin + ); + } + } else { + admin = null; + } + offsetStore.configure(config); + + CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + + // Note we pass the configState as it performs dynamic transformations under the covers + return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, + headerConverter, transformationChain, producer, admin, topicCreationGroups, + offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, + retryWithToleranceOperator, herder.statusBackingStore(), executor); + } + } + + class ExactlyOnceSourceTaskBuilder extends TaskBuilder { + private final Runnable preProducerCheck; + private final Runnable postProducerCheck; + + public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + Runnable preProducerCheck, + Runnable postProducerCheck) { + super(id, configState, statusListener, initialState); + this.preProducerCheck = preProducerCheck; + this.postProducerCheck = postProducerCheck; + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + TopicAdmin topicAdmin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + String transactionalId = transactionalId(id); + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true", + "for connectors when exactly-once source support is enabled", + false + ); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId, + "for connectors when exactly-once source support is enabled", + true + ); + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); Review comment: Yep, done. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ########## @@ -980,6 +1119,329 @@ WorkerMetricsGroup workerMetricsGroup() { return workerMetricsGroup; } + abstract class TaskBuilder { + + private final ConnectorTaskId id; + private final ClusterConfigState configState; + private final TaskStatus.Listener statusListener; + private final TargetState initialState; + + private Task task; + private ConnectorConfig connectorConfig = null; + private Converter keyConverter = null; + private Converter valueConverter = null; + private HeaderConverter headerConverter = null; + private ClassLoader classLoader = null; + + public TaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + this.id = id; + this.configState = configState; + this.statusListener = statusListener; + this.initialState = initialState; + } + + public TaskBuilder withTask(Task task) { + this.task = task; + return this; + } + + public TaskBuilder withConnectorConfig(ConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + return this; + } + + public TaskBuilder withKeyConverter(Converter keyConverter) { + this.keyConverter = keyConverter; + return this; + } + + public TaskBuilder withValueConverter(Converter valueConverter) { + this.valueConverter = valueConverter; + return this; + } + + public TaskBuilder withHeaderConverter(HeaderConverter headerConverter) { + this.headerConverter = headerConverter; + return this; + } + + public TaskBuilder withClassloader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public WorkerTask build() { + Objects.requireNonNull(task, "Task cannot be null"); + Objects.requireNonNull(connectorConfig, "Connector config used by task cannot be null"); + Objects.requireNonNull(keyConverter, "Key converter used by task cannot be null"); + Objects.requireNonNull(valueConverter, "Value converter used by task cannot be null"); + Objects.requireNonNull(headerConverter, "Header converter used by task cannot be null"); + Objects.requireNonNull(classLoader, "Classloader used by task cannot be null"); + + ErrorHandlingMetrics errorHandlingMetrics = errorHandlingMetrics(id); + final Class<? extends Connector> connectorClass = plugins.connectorClass( + connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(), + connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM); + retryWithToleranceOperator.metrics(errorHandlingMetrics); + + return doBuild(task, id, configState, statusListener, initialState, + connectorConfig, keyConverter, valueConverter, headerConverter, classLoader, + errorHandlingMetrics, connectorClass, retryWithToleranceOperator); + } + + abstract WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator); + + } + + class SinkTaskBuilder extends TaskBuilder { + public SinkTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); + retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); + WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator, + keyConverter, valueConverter, headerConverter); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, + valueConverter, headerConverter, transformationChain, consumer, classLoader, time, + retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore()); + } + } + + class SourceTaskBuilder extends TaskBuilder { + public SourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState) { + super(id, configState, statusListener, initialState); + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + ConnectorOffsetBackingStore offsetStore = ConnectorOffsetBackingStore.withoutConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + final TopicAdmin admin; + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (sourceConfig.offsetsTopic() != null || (config.topicCreationEnable() && sourceConfig.usesTopicCreation())) { + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + admin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + if (sourceConfig.offsetsTopic() != null && config.connectorOffsetsTopicsPermitted()) { + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + // Users can disable this if they want to; it won't affect delivery guarantees since the task isn't exactly-once anyways + consumerProps.putIfAbsent(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + offsetStore = ConnectorOffsetBackingStore.withConnectorOffsetStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + sourceConfig.offsetsTopic(), + producer, + consumer, + admin + ); + } + } else { + admin = null; + } + offsetStore.configure(config); + + CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetStore, id.connector(), internalKeyConverter, internalValueConverter); + + // Note we pass the configState as it performs dynamic transformations under the covers + return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, + headerConverter, transformationChain, producer, admin, topicCreationGroups, + offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time, + retryWithToleranceOperator, herder.statusBackingStore(), executor); + } + } + + class ExactlyOnceSourceTaskBuilder extends TaskBuilder { + private final Runnable preProducerCheck; + private final Runnable postProducerCheck; + + public ExactlyOnceSourceTaskBuilder(ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + Runnable preProducerCheck, + Runnable postProducerCheck) { + super(id, configState, statusListener, initialState); + this.preProducerCheck = preProducerCheck; + this.postProducerCheck = postProducerCheck; + } + + @Override + public WorkerTask doBuild(Task task, + ConnectorTaskId id, + ClusterConfigState configState, + TaskStatus.Listener statusListener, + TargetState initialState, + ConnectorConfig connectorConfig, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + ClassLoader classLoader, + ErrorHandlingMetrics errorHandlingMetrics, + Class<? extends Connector> connectorClass, + RetryWithToleranceOperator retryWithToleranceOperator) { + + SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, + connectorConfig.originalsStrings(), config.topicCreationEnable()); + retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); + + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + TopicAdmin topicAdmin = new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + Map<String, TopicCreationGroup> topicCreationGroups = null; + if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { + topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); + } + + String transactionalId = transactionalId(id); + Map<String, Object> producerProps = producerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true", + "for connectors when exactly-once source support is enabled", + false + ); + ConnectUtils.warnOnOverriddenProperty( + producerProps, ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId, + "for connectors when exactly-once source support is enabled", + true + ); + producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); + + Map<String, Object> consumerProps = consumerConfigs(id.connector(), "connector-consumer-" + id, config, connectorConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + ConnectUtils.warnOnOverriddenProperty( + consumerProps, ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT), + "for connectors when exactly-once source support is enabled", + false + ); + consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT)); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + String offsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); + + ConnectorOffsetBackingStore offsetStore; + // No need to do secondary writes to the global offsets topic if we're certain that the task's local offset store + // is going to be targeting it anyway + // Note that this may lead to a false positive if the user provides an overridden bootstrap servers value for their + // producer that resolves to the same Kafka cluster; we might consider looking up the Kafka cluster ID in the future + // to prevent these false positives but at the moment this is probably adequate, especially since we probably don't + // want to put a ping to a remote Kafka cluster inside the herder's tick thread (which is where this logic takes place + // right now) in case that takes a while. Review comment: Ack, done. This logic has been moved into an isolated method and I've moved the relevant portions of the comment with it. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ########## @@ -342,6 +343,10 @@ private void logPluginPathConfigProviderWarning(Map<String, String> rawOriginals } } + public String bootstrapServers() { + return String.join(",", getList(BOOTSTRAP_SERVERS_CONFIG)); + } Review comment: Sorry, I'm a little unclear on what the benefits of Javadocs here are. I've added tags for each of the relevant methods but would you mind elaborating? In IntelliJ, I can see where this method is invoked by cmd+clicking the `bootstrapServers()` declaration, and wherever it's invoked, I can go to the declaration by cmd+clicking the invocation. I'm aware that not everyone uses my IDE and certainly don't expect them to start doing so; just wondering if there's a different IDE out there with behavior that makes Javadocs like these significantly more powerful. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java ########## @@ -74,15 +75,15 @@ private volatile boolean cancelled; // indicates whether the Worker has cancelled the connector (e.g. because of slow shutdown) private State state; - private final OffsetStorageReader offsetStorageReader; + private final CloseableOffsetStorageReader offsetStorageReader; public WorkerConnector(String connName, Connector connector, ConnectorConfig connectorConfig, CloseableConnectorContext ctx, ConnectMetrics metrics, ConnectorStatus.Listener statusListener, - OffsetStorageReader offsetStorageReader, + CloseableOffsetStorageReader offsetStorageReader, Review comment: I opted for an alternative approach where the offset reader is allowed to be `null`, but I can also see the benefit of a conditional call to `Objects::requireNonNull`. I've added one in `WorkerConnector::initialize`; let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org