C0urante commented on a change in pull request #10907: URL: https://github.com/apache/kafka/pull/10907#discussion_r732231502
########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ########## @@ -28,4 +30,31 @@ protected SourceConnectorContext context() { return (SourceConnectorContext) context; } + + /** + * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration. + * Developers can assume that worker-level exactly-once support is enabled when this method is invoked. + * The default implementation will return {@code null}. + * @param connectorConfig the configuration that will be used for the connector. + * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support, + * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code null}, it is assumed that the + * connector cannot. + */ + public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) { + return null; Review comment: > Should we just have another enum for UNKNOWN and make this more explicit than "null"? This was actually [suggested in the discussion thread](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3cCAMdOrUX4CvPsb+yjfTenHyRTtE=2aaw-_-_b2vbd+pvqzy7...@mail.gmail.com%3e): > what do you think about a new "exactlyOnce()" method to the SourceConnector class that can return a new ExactlyOnce enum with options of "SUPPORTED", "UNSUPPORTED", and "UNKNOWN", with a default implementation that returns "UNKNOWN"? And [decided against](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3ccadxunmbsypos0lej8kxw9eapcxc7wbtgxqdqhrpu6qrbjwi...@mail.gmail.com%3e): > The problem with having an explicit UNKNOWN case is we really want connector developers to _not_ use it. That could mean it's deprecated from the start. Alternatively we could omit it from the enum and use null to mean unknown (we'd have to check for a null result anyway), with the contract for the method being that it should return non-null. Of course, this doesn't remove the ambiguous case, but avoids the need to eventually remove UNKNOWN in the future. (What I found especially convincing in the snippet above were the points that 1) we don't want people to return `UNKNOWN` from this method, and 2) no matter what, we're going to have to check for `null` anyways.) > Also, it seems like it would make sense to document that this method should be overridden by Connector developers, but has a default for backward compatibility. Ack, can do. > And it should state more clearly what should be returned for the various options. I've taken a shot at this, not sure how much clearer it can get but if you have thoughts let me know. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + public enum TransactionBoundary { Review comment: Ack, done. ########## File path: connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java ########## @@ -20,13 +20,45 @@ import org.apache.kafka.clients.producer.RecordMetadata; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * SourceTask is a Task that pulls records from another system for storage in Kafka. */ public abstract class SourceTask implements Task { + /** + * <p> + * The configuration key that determines how source tasks will define transaction boundaries + * when exactly-once support is enabled. + * </p> + */ + public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary"; + + public enum TransactionBoundary { + POLL, + INTERVAL, + CONNECTOR; + + public static final TransactionBoundary DEFAULT = POLL; + + public static List<String> options() { Review comment: I wanted a convenient way to bring everything to lowercase, which is more standard for properties like this (see how [values for the consumer `isolation.level` property are rendered](https://github.com/apache/kafka/blob/da38a1df273ec9d3a077435b2a63d75053edd308/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L557), for example) and, IMO, more readable. We could remove this method and replace it with inline calls to `values()` followed by some streams magic to lowercase at the call site, but that seemed less clean than this approach. Alternatively, we could introduce a new utility method (such as `Utils::enumNames`) that does this for us in a centralized, reusable location and obviates the need for public-API methods that may cause headaches down the road. Thoughts? ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -192,6 +198,19 @@ public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A list of permitted algorithms for verifying internal requests"; public static final List<String> INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT); + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support"; + public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster " + + "by writing source records and their offsets in a Kafka transaction, and by proactively fencing out old task generations before bringing up new ones. " + + "Note that this must be enabled on every worker in a cluster in order for exactly-once delivery to be guaranteed, " + + "and that some source connectors may still not be able to provide exactly-once delivery guarantees even with this support enabled. " + + "Permitted values are \"disabled\", \"preparing\", and \"enabled\". In order to safely enable exactly-once support for source connectors, " + + "all workers in the cluster must first be updated to use the \"preparing\" value for this property. " + + "Once this has been done, a second update of all of the workers in the cluster should be performed to change the value of this property to \"enabled\"."; Review comment: > It's clear to me that we should mention this, but it's not clear where we should do so. The user documentation generated from this doc string might be one spot that users will see routinely, so maybe it's a candidate. Another would be in the Kafka Connect docs about EOS for source connectors. I think both are acceptable. I'll add this to the docstring now and include it in the high-level docs when I write those as well. > BTW, can you add to this PR changes to the Kafka docs that describe this feature? Given how massive this PR is already, I'd like to do this in a follow-up, with the understanding that Kafka docs changes are a requisite for including this feature in a release. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java ########## @@ -401,10 +429,46 @@ public Integer getRebalanceTimeout() { return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG); } + @Override + public boolean exactlyOnceSourceEnabled() { + return EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED.equalsIgnoreCase( + getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG) + ); + } + + public boolean transactionalLeaderEnabled() { + return Arrays.asList(EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED, EXACTLY_ONCE_SOURCE_SUPPORT_PREPARING) + .contains(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG).toLowerCase(Locale.ROOT)); + } Review comment: > Should we have an enum for the enabled, preparing and disabled literals, and should these make use of them? Sure, gave that a shot, LMK what you think. > Also, it might be useful to have JavaDoc on these methods, simply to help future developers understand the intent. Gave this a try too. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -156,6 +165,14 @@ public KafkaBasedLog(String topic, this.readLogEndOffsetCallbacks = new ArrayDeque<>(); this.time = time; this.initializer = initializer != null ? initializer : admin -> { }; + this.topicContainsTransactions = topicContainsTransactions; + + // If the consumer is configured with isolation.level = read_committed, then its end offsets method cannot be relied on + // as it will not take records from currently-open transactions into account. We want to err on the side of caution in that + // case: when users request a read to the end of the log, we will read up to the point where the latest offsets visible to the + // consumer are at least as high as the (possibly-part-of-a-transaction) end offsets of the topic. + this.requireAdminForOffsets = IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) + .equals(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)); Review comment: This was my first thought, but the `isolation.level` property is case-sensitive. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -118,35 +123,39 @@ public KafkaBasedLog(String topic, Callback<ConsumerRecord<K, V>> consumedCallback, Time time, Runnable initializer) { - this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null); + this(topic, producerConfigs, consumerConfigs, () -> null, consumedCallback, time, initializer != null ? admin -> initializer.run() : null, false); } /** * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until * {@link #start()} is invoked. * - * @param topic the topic to treat as a log - * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must + * @param topic the topic to treat as a log + * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the number of acks, will be overridden to ensure correct behavior of this * class. - * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must + * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must * contain compatible serializer settings for the generic types used on this class. Some * setting, such as the auto offset reset policy, will be overridden to ensure correct * behavior of this class. - * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled + * @param topicAdminSupplier supplier function for an admin client, the lifecycle of which is expected to be controlled * by the calling component; may not be null - * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log - * @param time Time interface - * @param initializer the function that should be run when this log is {@link #start() started}; may be null + * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log + * @param time Time interface + * @param initializer the function that should be run when this log is {@link #start() started}; may be null + * @param topicContainsTransactions whether the topic being consumed contains (or is expected to contain) transactions; + * if this is {@code false} and the topic does contain transactions, reads to the end of the log may block + * indefinitely Review comment: I'll rebase this PR on top of https://github.com/apache/kafka/pull/11046 and, after removing any changes to this class that become unnecessary as a result, will address this. I do believe a KIP is called for here though. As you've noted, this class is not public API and it's frustrating that development efforts against Connect are hampered by this unofficial and somewhat arbitrary restriction. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java ########## @@ -210,26 +227,21 @@ public void stop() { synchronized (this) { stopRequested = true; } - consumer.wakeup(); - - try { - thread.join(); - } catch (InterruptedException e) { - throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " + - "down it's producer and consumer.", e); + if (consumer != null) { + consumer.wakeup(); } - try { - producer.close(); - } catch (KafkaException e) { - log.error("Failed to stop KafkaBasedLog producer", e); + if (thread != null) { + try { + thread.join(); + } catch (InterruptedException e) { + throw new ConnectException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " + + "down it's producer and consumer.", e); + } } - try { - consumer.close(); - } catch (KafkaException e) { - log.error("Failed to stop KafkaBasedLog consumer", e); - } + Utils.closeQuietly(producer, "KafkaBasedLog producer"); + Utils.closeQuietly(consumer, "KafkaBasedLog consumer"); Review comment: Fewer lines, more standardized, and logging these at `ERROR` level is incorrect IMO. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ########## @@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) { } } + /** + * Log a warning when the user attempts to override a property that cannot be overridden. + * @param props the configuration properties provided by the user + * @param key the name of the property to check on + * @param expectedValue the expected value for the property + * @param justification the reason the property cannot be overridden. + * Will follow the phrase "The value... for the... property will be ignored as it cannot be overridden ". + * For example, one might supply the message "in connectors with the DLQ feature enabled" for this parameter. + * @param caseSensitive whether the value should match case-insensitively + */ + public static void warnOnOverriddenProperty( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + overriddenPropertyWarning(props, key, expectedValue, justification, caseSensitive).ifPresent(log::warn); + } + + // Visible for testing + static Optional<String> overriddenPropertyWarning( + Map<String, ?> props, + String key, + String expectedValue, + String justification, + boolean caseSensitive) { + Predicate<String> matchesExpectedValue = caseSensitive ? expectedValue::equals : expectedValue::equalsIgnoreCase; + String value = Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null); + if (value != null && !matchesExpectedValue.test(value)) { + return Optional.of(String.format( + "The value '%s' for the '%s' property will be ignored as it cannot be overridden %s. " + + "The value '%s' will be used instead.", + value, key, justification, expectedValue + )); + } else { + return Optional.empty(); + } Review comment: 👍 SGTM ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ########## @@ -43,6 +44,13 @@ * storage to achieve exactly once semantics). * </p> * <p> + * In order to support per-connector offsets topics but continue to back up progress to a + * cluster-global offsets topic, the writer accepts an optional <i>secondary backing store</i>. + * After successful flushes to the primary backing store, the writer will copy the flushed offsets + * over to the secondary backing store on a best-effort basis. Failures to write to the secondary + * store are logged but otherwise swallowed silently. + * </p> + * <p> Review comment: This is a neat idea. It'd also simplify the offset read logic. I've taken a stab at this; LMK what you think. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java ########## @@ -114,12 +124,18 @@ public synchronized boolean beginFlush() { if (data.isEmpty()) return false; - assert !flushing(); Review comment: It's incorrect; this is a known issue where a race condition involving task failure during offset commit can cause this assertion to fail. The assertion itself has provided no value in catching, reproducing, or testing against this issue and at this point is more misleading than useful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org