C0urante commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r732231502



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,31 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * The default implementation will return {@code null}.
+     * @param connectorConfig the configuration that will be used for the 
connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support,
+     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code 
null}, it is assumed that the
+     * connector cannot.
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> 
connectorConfig) {
+        return null;

Review comment:
       > Should we just have another enum for UNKNOWN and make this more 
explicit than "null"?
   
   This was actually [suggested in the discussion 
thread](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3cCAMdOrUX4CvPsb+yjfTenHyRTtE=2aaw-_-_b2vbd+pvqzy7...@mail.gmail.com%3e):
   
   > what do you think about a new "exactlyOnce()" method to the 
SourceConnector class that can return a new ExactlyOnce enum with options of 
"SUPPORTED", "UNSUPPORTED", and "UNKNOWN", with a default implementation that 
returns "UNKNOWN"?
   
   And [decided 
against](https://mail-archives.apache.org/mod_mbox/kafka-dev/202105.mbox/%3ccadxunmbsypos0lej8kxw9eapcxc7wbtgxqdqhrpu6qrbjwi...@mail.gmail.com%3e):
   
   > The problem with having an explicit UNKNOWN case is we really want 
connector developers to _not_ use it. That could mean it's deprecated from the 
start. Alternatively we could omit it from the enum and use null to mean 
unknown (we'd have to check for a null result anyway), with the contract for 
the method being that it should return non-null. Of course, this doesn't remove 
the ambiguous case, but avoids the need to eventually remove UNKNOWN in the 
future.
   
   (What I found especially convincing in the snippet above were the points 
that 1) we don't want people to return `UNKNOWN` from this method, and 2) no 
matter what, we're going to have to check for `null` anyways.)
   
   
   > Also, it seems like it would make sense to document that this method 
should be overridden by Connector developers, but has a default for backward 
compatibility.
   
   Ack, can do.
   
   > And it should state more clearly what should be returned for the various 
options.
   
   I've taken a shot at this, not sure how much clearer it can get but if you 
have thoughts let me know.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    public enum TransactionBoundary {

Review comment:
       Ack, done.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    public enum TransactionBoundary {
+        POLL,
+        INTERVAL,
+        CONNECTOR;
+
+        public static final TransactionBoundary DEFAULT = POLL;
+
+        public static List<String> options() {

Review comment:
       I wanted a convenient way to bring everything to lowercase, which is 
more standard for properties like this (see how [values for the consumer 
`isolation.level` property are 
rendered](https://github.com/apache/kafka/blob/da38a1df273ec9d3a077435b2a63d75053edd308/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L557),
 for example) and, IMO, more readable.
   
   We could remove this method and replace it with inline calls to `values()` 
followed by some streams magic to lowercase at the call site, but that seemed 
less clean than this approach.
   
   Alternatively, we could introduce a new utility method (such as 
`Utils::enumNames`) that does this for us in a centralized, reusable location 
and obviates the need for public-API methods that may cause headaches down the 
road.
   
   Thoughts?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -192,6 +198,19 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests";
     public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
+            + "by writing source records and their offsets in a Kafka 
transaction, and by proactively fencing out old task generations before 
bringing up new ones. "
+            + "Note that this must be enabled on every worker in a cluster in 
order for exactly-once delivery to be guaranteed, "
+            + "and that some source connectors may still not be able to 
provide exactly-once delivery guarantees even with this support enabled. "
+            + "Permitted values are \"disabled\", \"preparing\", and 
\"enabled\". In order to safely enable exactly-once support for source 
connectors, "
+            + "all workers in the cluster must first be updated to use the 
\"preparing\" value for this property. "
+            + "Once this has been done, a second update of all of the workers 
in the cluster should be performed to change the value of this property to 
\"enabled\".";

Review comment:
       > It's clear to me that we should mention this, but it's not clear where 
we should do so. The user documentation generated from this doc string might be 
one spot that users will see routinely, so maybe it's a candidate. Another 
would be in the Kafka Connect docs about EOS for source connectors.
   
   I think both are acceptable. I'll add this to the docstring now and include 
it in the high-level docs when I write those as well.
   
   > BTW, can you add to this PR changes to the Kafka docs that describe this 
feature?
   
   Given how massive this PR is already, I'd like to do this in a follow-up, 
with the understanding that Kafka docs changes are a requisite for including 
this feature in a release.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -401,10 +429,46 @@ public Integer getRebalanceTimeout() {
         return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
     }
 
+    @Override
+    public boolean exactlyOnceSourceEnabled() {
+        return EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED.equalsIgnoreCase(
+                getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG)
+        );
+    }
+
+    public boolean transactionalLeaderEnabled() {
+        return Arrays.asList(EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED, 
EXACTLY_ONCE_SOURCE_SUPPORT_PREPARING)
+                
.contains(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG).toLowerCase(Locale.ROOT));
+    }

Review comment:
       > Should we have an enum for the enabled, preparing and disabled 
literals, and should these make use of them?
   
   Sure, gave that a shot, LMK what you think.
   
   > Also, it might be useful to have JavaDoc on these methods, simply to help 
future developers understand the intent.
   
   Gave this a try too.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -156,6 +165,14 @@ public KafkaBasedLog(String topic,
         this.readLogEndOffsetCallbacks = new ArrayDeque<>();
         this.time = time;
         this.initializer = initializer != null ? initializer : admin -> { };
+        this.topicContainsTransactions = topicContainsTransactions;
+
+        // If the consumer is configured with isolation.level = 
read_committed, then its end offsets method cannot be relied on
+        // as it will not take records from currently-open transactions into 
account. We want to err on the side of caution in that
+        // case: when users request a read to the end of the log, we will read 
up to the point where the latest offsets visible to the
+        // consumer are at least as high as the 
(possibly-part-of-a-transaction) end offsets of the topic.
+        this.requireAdminForOffsets = 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
+                
.equals(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));

Review comment:
       This was my first thought, but the `isolation.level` property is 
case-sensitive.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -118,35 +123,39 @@ public KafkaBasedLog(String topic,
                          Callback<ConsumerRecord<K, V>> consumedCallback,
                          Time time,
                          Runnable initializer) {
-        this(topic, producerConfigs, consumerConfigs, () -> null, 
consumedCallback, time, initializer != null ? admin -> initializer.run() : 
null);
+        this(topic, producerConfigs, consumerConfigs, () -> null, 
consumedCallback, time, initializer != null ? admin -> initializer.run() : 
null, false);
     }
 
     /**
      * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
      * {@link #start()} is invoked.
      *
-     * @param topic              the topic to treat as a log
-     * @param producerConfigs    configuration options to use when creating 
the internal producer. At a minimum this must
+     * @param topic                     the topic to treat as a log
+     * @param producerConfigs           configuration options to use when 
creating the internal producer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the number of acks, will be 
overridden to ensure correct behavior of this
      *                           class.
-     * @param consumerConfigs    configuration options to use when creating 
the internal consumer. At a minimum this must
+     * @param consumerConfigs           configuration options to use when 
creating the internal consumer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the auto offset reset 
policy, will be overridden to ensure correct
      *                           behavior of this class.
-     * @param topicAdminSupplier supplier function for an admin client, the 
lifecycle of which is expected to be controlled
+     * @param topicAdminSupplier        supplier function for an admin client, 
the lifecycle of which is expected to be controlled
      *                           by the calling component; may not be null
-     * @param consumedCallback   callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
-     * @param time               Time interface
-     * @param initializer        the function that should be run when this log 
is {@link #start() started}; may be null
+     * @param consumedCallback          callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
+     * @param time                      Time interface
+     * @param initializer               the function that should be run when 
this log is {@link #start() started}; may be null
+     * @param topicContainsTransactions whether the topic being consumed 
contains (or is expected to contain) transactions;
+     *                                  if this is {@code false} and the topic 
does contain transactions, reads to the end of the log may block
+     *                                  indefinitely

Review comment:
       I'll rebase this PR on top of https://github.com/apache/kafka/pull/11046 
and, after removing any changes to this class that become unnecessary as a 
result, will address this.
   
   I do believe a KIP is called for here though. As you've noted, this class is 
not public API and it's frustrating that development efforts against Connect 
are hampered by this unofficial and somewhat arbitrary restriction.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -210,26 +227,21 @@ public void stop() {
         synchronized (this) {
             stopRequested = true;
         }
-        consumer.wakeup();
-
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            throw new ConnectException("Failed to stop KafkaBasedLog. Exiting 
without cleanly shutting " +
-                    "down it's producer and consumer.", e);
+        if (consumer != null) {
+            consumer.wakeup();
         }
 
-        try {
-            producer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to stop KafkaBasedLog producer", e);
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                throw new ConnectException("Failed to stop KafkaBasedLog. 
Exiting without cleanly shutting " +
+                        "down it's producer and consumer.", e);
+            }
         }
 
-        try {
-            consumer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to stop KafkaBasedLog consumer", e);
-        }
+        Utils.closeQuietly(producer, "KafkaBasedLog producer");
+        Utils.closeQuietly(consumer, "KafkaBasedLog consumer");

Review comment:
       Fewer lines, more standardized, and logging these at `ERROR` level is 
incorrect IMO.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) {
         }
     }
 
+    /**
+     * Log a warning when the user attempts to override a property that cannot 
be overridden.
+     * @param props the configuration properties provided by the user
+     * @param key the name of the property to check on
+     * @param expectedValue the expected value for the property
+     * @param justification the reason the property cannot be overridden.
+     *                      Will follow the phrase "The value... for the... 
property will be ignored as it cannot be overridden ".
+     *                      For example, one might supply the message "in 
connectors with the DLQ feature enabled" for this parameter.
+     * @param caseSensitive whether the value should match case-insensitively
+     */
+    public static void warnOnOverriddenProperty(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        overriddenPropertyWarning(props, key, expectedValue, justification, 
caseSensitive).ifPresent(log::warn);
+    }
+
+    // Visible for testing
+    static Optional<String> overriddenPropertyWarning(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        Predicate<String> matchesExpectedValue = caseSensitive ? 
expectedValue::equals : expectedValue::equalsIgnoreCase;
+        String value = 
Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null);
+        if (value != null && !matchesExpectedValue.test(value)) {
+            return Optional.of(String.format(
+                    "The value '%s' for the '%s' property will be ignored as 
it cannot be overridden %s. "
+                            + "The value '%s' will be used instead.",
+                    value, key, justification, expectedValue
+            ));
+        } else {
+            return Optional.empty();
+        }

Review comment:
       👍  &nbsp; SGTM

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##########
@@ -43,6 +44,13 @@
  * storage to achieve exactly once semantics).
  * </p>
  * <p>
+ * In order to support per-connector offsets topics but continue to back up 
progress to a
+ * cluster-global offsets topic, the writer accepts an optional <i>secondary 
backing store</i>.
+ * After successful flushes to the primary backing store, the writer will copy 
the flushed offsets
+ * over to the secondary backing store on a best-effort basis. Failures to 
write to the secondary
+ * store are logged but otherwise swallowed silently.
+ * </p>
+ * <p>

Review comment:
       This is a neat idea. It'd also simplify the offset read logic. I've 
taken a stab at this; LMK what you think.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##########
@@ -114,12 +124,18 @@ public synchronized boolean beginFlush() {
         if (data.isEmpty())
             return false;
 
-        assert !flushing();

Review comment:
       It's incorrect; this is a known issue where a race condition involving 
task failure during offset commit can cause this assertion to fail. The 
assertion itself has provided no value in catching, reproducing, or testing 
against this issue and at this point is more misleading than useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to