rhauch commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r688675632
########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java ########## @@ -38,4 +38,30 @@ * Get the OffsetStorageReader for this SourceTask. */ OffsetStorageReader offsetStorageReader(); + + /** + * Get a {@link TransactionContext} that can be used to define producer transaction boundaries + * when exactly-once support is enabled for the connector. + * + * <p>This method was added in Apache Kafka 3.0. Source tasks that use this method but want to + * maintain backward compatibility so they can also be deployed to older Connect runtimes + * should guard the call to this method with a try-catch block, since calling this method will result in a + * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to + * Connect runtimes older than Kafka 3.0. For example: + * <pre> + * TransactionContext transactionContext; + * try { + * transactionContext = context.transactionContext(); + * } catch (NoSuchMethodError | NoClassDefFoundError e) { + * transactionContext = null; + * } + * </pre> + * + * @return the transaction context, or null if the user does not want the connector to define + * its own transaction boundaries Review comment: Nit: what does "user" mean here? Should we instead refer to the connector configuration, perhaps something like: "or null if the connector was configured to not specify transaction boundaries"? (I know this is taken directly from the KIP, but I think we should improve this JavaDoc during the implementation.) ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support, + * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the + * connector cannot. + */ + public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) { + return null; + } + + /** + * Signals whether the connector can define its own transaction boundaries with the proposed + * configuration. Developers must override this method if they wish to add connector-defined + * transaction boundary support; if they do not, users will be unable to create instances of + * this connector that use connector-defined transaction boundaries. The default implementation + * will return {@code UNSUPPORTED}. Review comment: I think this JavaDoc should more clearly specify when an implementation may expect this method is called relative to other methods. We don't want to box ourselves in with respect to the implementation, but we also need a clear contract that Connector developers can rely upon. For example, maybe add something like: > This method may be called by the runtime before the {@link #start} method when the connector is being run with exactly-once support and when the connector is to determine the transactions boundaries. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support, + * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the + * connector cannot. + */ + public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) { + return null; Review comment: It's difficult to discern in the KIP, but IIUC there is a difference between this method returning `null` versus `ExactlyOnceSupport#UNSUPPORTED`: * Returning `null` really seems to equate to `UNKNOWN` and a Connect user is still allowed sort of force the use of EOS by setting the `exactly.once.support=requested` in the connector configuration. * Returning `UNSUPPORTED` means that a Connect user is not allowed to force the use of EOS. Should we just have another enum for `UNKNOWN` and make this more explicit than "null"? Also, it seems like it would make sense to document that this method should be overridden by Connector developers, but has a default for backward compatibility. And it should state more clearly what should be returned for the various options. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. Review comment: I think this JavaDoc should more clearly specify when an implementation may expect this method is called relative to other methods. We don't want to box ourselves in with respect to the implementation, but we also need a clear contract that Connector developers can rely upon. For example, maybe add something like: > This method may be called by the runtime before the {@link #start} method when the connector is being run with exactly-once support. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + public enum TransactionBoundary { + POLL, + INTERVAL, + CONNECTOR; + + public static final TransactionBoundary DEFAULT = POLL; + + public static List<String> options() { Review comment: Do we really need this method when `values()` is already available and more standard? It looks like this is used in only two places: one expects an Array (where `values()` would work better), and the other just uses it to build a documentation (where `Arrays.toString(...)` might work just as well). ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -401,10 +429,46 @@ public Integer getRebalanceTimeout() { return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG); } + @Override + public boolean exactlyOnceSourceEnabled() { + return EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED.equalsIgnoreCase( + getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG) + ); + } + + public boolean transactionalLeaderEnabled() { + return Arrays.asList(EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED, EXACTLY_ONCE_SOURCE_SUPPORT_PREPARING) + .contains(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG).toLowerCase(Locale.ROOT)); + } Review comment: Should we have an enum for the `enabled`, `preparing` and `disabled` literals, and should these make use of them? Also, it might be useful to have JavaDoc on these methods, simply to help future developers understand the intent. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +/** + * Provided to source tasks to allow them to define their own producer transaction boundaries when + * exactly-once support is enabled. + */ +public interface TransactionContext { + + /** + * Request a transaction commit after the next batch of records from {@link SourceTask#poll()} + * is processed. + */ + void commitTransaction(); + + /** + * Request a transaction commit after a source record is processed. The source record will be the + * last record in the committed transaction. + * @param record the record to commit the transaction after. Review comment: Is the SourceTask implementation calling this method allowed to pass a null value here? If so, what happens? If not, please add that to the JavaDoc. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> Review comment: We don't need this `<p>` tag. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ########## @@ -163,13 +181,24 @@ public synchronized boolean beginFlush() { } // And submit the data - log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), toFlush); + log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), flushed); } - return backingStore.set(offsetsSerialized, (error, result) -> { - boolean isCurrent = handleFinishWrite(flushId, error, result); - if (isCurrent && callback != null) { - callback.onCompletion(error, result); + return primaryBackingStore.set(offsetsSerialized, (primaryError, primaryResult) -> { + boolean isCurrent = handleFinishWrite(flushId, primaryError, primaryResult); + if (isCurrent) { + if (callback != null) { + callback.onCompletion(primaryError, primaryResult); + } + if (secondaryBackingStore != null && primaryError == null) { + secondaryBackingStore.set(offsetsSerialized, (secondaryError, secondaryResult) -> { + if (secondaryError != null) { + log.warn("Failed to write offsets ({}) to secondary backing store", flushed, secondaryError); + } else { + log.debug("Successfully flushed offsets ({}) to secondary backing store", flushed); Review comment: Do these log messages include the Connector context for EOS-enabled source connectors? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ########## @@ -43,6 +44,13 @@ * storage to achieve exactly once semantics). * </p> * <p> + * In order to support per-connector offsets topics but continue to back up progress to a + * cluster-global offsets topic, the writer accepts an optional <i>secondary backing store</i>. + * After successful flushes to the primary backing store, the writer will copy the flushed offsets + * over to the secondary backing store on a best-effort basis. Failures to write to the secondary + * store are logged but otherwise swallowed silently. + * </p> + * <p> Review comment: Did you consider introducing a new `OffsetBackingStore` implementation that writes to two other `OffsetBackingStore` implementations? That might simplify the logic in this class and better encapsulate the double write behavior. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -337,15 +353,29 @@ private void readToLogEnd() { } else { log.trace("Behind end offset {} for {}; last-read offset is {}", endOffset, topicPartition, lastConsumedOffset); - poll(Integer.MAX_VALUE); + if (topicContainsTransactions) { + // The consumer won't return from its poll method if a transaction is aborted, even though + // its position will advance. So, we poll for at most one second, then give ourselves another + // chance to check whether we've reached the end of the topic. + poll(1000); Review comment: And for reference, here is @C0urante's PR for that fix: #11046 I agree, it would be good to avoid this hack if possible. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.source; + +/** + * Provided to source tasks to allow them to define their own producer transaction boundaries when + * exactly-once support is enabled. + */ +public interface TransactionContext { + + /** + * Request a transaction commit after the next batch of records from {@link SourceTask#poll()} + * is processed. + */ + void commitTransaction(); + + /** + * Request a transaction commit after a source record is processed. The source record will be the + * last record in the committed transaction. + * @param record the record to commit the transaction after. + */ + void commitTransaction(SourceRecord record); + + /** + * Requests a transaction abort the next batch of records from {@link SourceTask#poll()}. All of + * the records in that transaction will be discarded and will not appear in a committed transaction. + * However, offsets for that transaction will still be committed. If the data should be reprocessed, + * the task should not invoke this method and should instead throw an exception. + */ + void abortTransaction(); + + /** + * Requests a transaction abort after a source record is processed. The source record will be the + * last record in the aborted transaction. All of the records in that transaction will be discarded + * and will not appear in a committed transaction. However, offsets for that transaction will still + * be committed. If the data should be reprocessed, the task should not invoke this method and + * should instead throw an exception. + * @param record the record to abort the transaction after. Review comment: Is the SourceTask implementation calling this method allowed to pass a null value here? If so, what happens? If not, please add that to the JavaDoc. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + public enum TransactionBoundary { Review comment: The `SourceTask` class is part of the public API for Connect, and so we should have JavaDoc on this enum and its literals and methods. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ########## @@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) { } } + /** + * Log a warning when the user attempts to override a property that cannot be overridden. + * @param props the configuration properties provided by the user + * @param key the name of the property to check on + * @param expectedValue the expected value for the property + * @param justification the reason the property cannot be overridden. + * Will follow the phrase "The value... for the... property will be ignored as it cannot be overridden ". + * For example, one might supply the message "in connectors with the DLQ feature enabled" for this parameter. + * @param caseSensitive whether the value should match case-insensitively + */ + public static void warnOnOverriddenProperty( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + overriddenPropertyWarning(props, key, expectedValue, justification, caseSensitive).ifPresent(log::warn); + } + + // Visible for testing + static Optional<String> overriddenPropertyWarning( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + Predicate<String> matchesExpectedValue = caseSensitive ? expectedValue::equals : expectedValue::equalsIgnoreCase; + String value = Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null); + if (value != null && !matchesExpectedValue.test(value)) { + return Optional.of(String.format( + "The value '%s' for the '%s' property will be ignored as it cannot be overridden %s. " + + "The value '%s' will be used instead.", + value, key, justification, expectedValue + )); + } else { + return Optional.empty(); + } Review comment: Nit: what do you think about tolerating an empty or null `justification` string, since this methods does not ensure that one is provided? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -192,6 +198,19 @@ public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests"; public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT); + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support"; + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster " + + "by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. " + + "Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed, " + + "and that some source connectors may still not be able to provide exactly-once delivery guarantees even with this support enabled. " + + "Permitted values are \"disabled\", \"preparing\", and \"enabled\". In order to safely enable exactly-once support for source connectors, " + + "all workers in the cluster must first be updated to use the \"preparing\" value for this property. " + + "Once this has been done, a second update of all of the workers in the cluster should be performed to change the value of this property to \"enabled\"."; Review comment: We should mention in the public docs that when enabling exactly once support for source connectors, consumers of the topics to which the EOS source connectors write should be configured with `isolation.level=read_committed`? After all, the default for consumers is `isolation.level=read_uncommitted` (see [ConsumerConfig.java](https://github.com/apache/kafka/blob/db1f581da7f3440cfd5be93800b4a9a2d7327a35/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L302)). It's clear to me that we should mention this, but it's not clear where we should do so. The user documentation generated from this doc string might be one spot that users will see routinely, so maybe it's a candidate. Another would be in the Kafka Connect docs about EOS for source connectors. BTW, can you add to this PR changes to the Kafka docs that describe this feature? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -118,35 +123,39 @@ public KafkaBasedLog(String topic, Callback<ConsumerRecord<K, V>> consumedCallback, Time time, Runnable initializer) { - this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null); + this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null, false); } /** * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until * {@link #start()} is invoked. * - * @param topic the topic to treat as a log - * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the number of acks, will be overridden to ensure correct behavior of this * class. - * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the auto offset reset policy, will be overridden to ensure correct * behavior of this class. - * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled + * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled * by the calling component; may not be null - * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log - * @param time Time interface - * @param initializer the function that should be run when this log is {@link #start() started}; may be null + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + * @param initializer the function that should be run when this log is {@link #start() started}; may be null + * @param topicContainsTransactions whether the topic being consumed contains (or is expected to contain) transactions; + * if this is {@code false} and the topic does contain transactions, reads to the end of the log may block + * indefinitely Review comment: There are projects outside of Apache Kafka that do use this class. While this class is not a public API and we technically don't have to avoid breaking compatibility, it's fairly straightforward to maintain API compatibility and so we should do that. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> Review comment: The `</p>` has no meaning in JavaDoc, so we should remove these. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -210,26 +227,21 @@ public void stop() { synchronized (this) { stopRequested = true; } - consumer.wakeup(); - - try { - thread.join(); - } catch (InterruptedException e) { - throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " + - "down it's producer and consumer.", e); + if (consumer != null) { + consumer.wakeup(); } - try { - producer.close(); - } catch (KafkaException e) { - log.error("Failed to stop KafkaBasedLog producer", e); + if (thread != null) { + try { + thread.join(); + } catch (InterruptedException e) { + throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " + + "down it's producer and consumer.", e); + } } - try { - consumer.close(); - } catch (KafkaException e) { - log.error("Failed to stop KafkaBasedLog consumer", e); - } + Utils.closeQuietly(producer, "KafkaBasedLog producer"); + Utils.closeQuietly(consumer, "KafkaBasedLog consumer"); Review comment: This change does alter the level at which these problems are logged. Is that intentional, and if so why? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -156,6 +165,14 @@ public KafkaBasedLog(String topic, this.readLogEndOffsetCallbacks = new ArrayDeque<>(); this.time = time; this.initializer = initializer != null ? initializer : admin -> { }; + this.topicContainsTransactions = topicContainsTransactions; + + // If the consumer is configured with isolation.level = read_committed, then its end offsets method cannot be relied on + // as it will not take records from currently-open transactions into account. We want to err on the side of caution in that + // case: when users request a read to the end of the log, we will read up to the point where the latest offsets visible to the + // consumer are at least as high as the (possibly-part-of-a-transaction) end offsets of the topic. + this.requireAdminForOffsets = IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) + .equals(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)); Review comment: Why not `equalsIgnoreCase(...)` here instead of lowercasing and then calling `equals(...)`? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ########## @@ -114,12 +124,18 @@ public synchronized boolean beginFlush() { if (data.isEmpty()) return false; - assert !flushing(); Review comment: Why remove this assertion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org