rhauch commented on a change in pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#discussion_r688675632



##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
##########
@@ -38,4 +38,30 @@
      * Get the OffsetStorageReader for this SourceTask.
      */
     OffsetStorageReader offsetStorageReader();
+
+    /**
+     * Get a {@link TransactionContext} that can be used to define producer 
transaction boundaries
+     * when exactly-once support is enabled for the connector.
+     *
+     * <p>This method was added in Apache Kafka 3.0. Source tasks that use 
this method but want to
+     * maintain backward compatibility so they can also be deployed to older 
Connect runtimes
+     * should guard the call to this method with a try-catch block, since 
calling this method will result in a
+     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the 
source connector is deployed to
+     * Connect runtimes older than Kafka 3.0. For example:
+     * <pre>
+     *     TransactionContext transactionContext;
+     *     try {
+     *         transactionContext = context.transactionContext();
+     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *         transactionContext = null;
+     *     }
+     * </pre>
+     *
+     * @return the transaction context, or null if the user does not want the 
connector to define
+     * its own transaction boundaries

Review comment:
       Nit: what does "user" mean here? Should we instead refer to the 
connector configuration, perhaps something like: "or null if the connector was 
configured to not specify transaction boundaries"?
   
   (I know this is taken directly from the KIP, but I think we should improve 
this JavaDoc during the implementation.)

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,31 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * The default implementation will return {@code null}.
+     * @param connectorConfig the configuration that will be used for the 
connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support,
+     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code 
null}, it is assumed that the
+     * connector cannot.
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> 
connectorConfig) {
+        return null;
+    }
+
+    /**
+     * Signals whether the connector can define its own transaction boundaries 
with the proposed
+     * configuration. Developers must override this method if they wish to add 
connector-defined
+     * transaction boundary support; if they do not, users will be unable to 
create instances of
+     * this connector that use connector-defined transaction boundaries. The 
default implementation
+     * will return {@code UNSUPPORTED}.

Review comment:
       I think this JavaDoc should more clearly specify when an implementation 
may expect this method is called relative to other methods. We don't want to 
box ourselves in with respect to the implementation, but we also need a clear 
contract that Connector developers can rely upon.
   
   For example, maybe add something like: 
   > This method may be called by the runtime before the {@link #start} method 
when the connector is being run with exactly-once support and when the 
connector is to determine the transactions boundaries.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,31 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * The default implementation will return {@code null}.
+     * @param connectorConfig the configuration that will be used for the 
connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can 
provide exactly-once support,
+     * and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If {@code 
null}, it is assumed that the
+     * connector cannot.
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> 
connectorConfig) {
+        return null;

Review comment:
       It's difficult to discern in the KIP, but IIUC there is a difference 
between this method returning `null` versus `ExactlyOnceSupport#UNSUPPORTED`:
   * Returning `null` really seems to equate to `UNKNOWN` and a Connect user is 
still allowed sort of force the use of EOS by setting the 
`exactly.once.support=requested` in the connector configuration.
   * Returning `UNSUPPORTED` means that a Connect user is not allowed to force 
the use of EOS.
   
   Should we just have another enum for `UNKNOWN` and make this more explicit 
than "null"?
   
   Also, it seems like it would make sense to document that this method should 
be overridden by Connector developers, but has a default for backward 
compatibility. And it should state more clearly what should be returned for the 
various options.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
##########
@@ -28,4 +30,31 @@
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees 
with a proposed configuration.
+     * Developers can assume that worker-level exactly-once support is enabled 
when this method is invoked.
+     * The default implementation will return {@code null}.

Review comment:
       I think this JavaDoc should more clearly specify when an implementation 
may expect this method is called relative to other methods. We don't want to 
box ourselves in with respect to the implementation, but we also need a clear 
contract that Connector developers can rely upon.
   
   For example, maybe add something like: 
   > This method may be called by the runtime before the {@link #start} method 
when the connector is being run with exactly-once support.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    public enum TransactionBoundary {
+        POLL,
+        INTERVAL,
+        CONNECTOR;
+
+        public static final TransactionBoundary DEFAULT = POLL;
+
+        public static List<String> options() {

Review comment:
       Do we really need this method when `values()` is already available and 
more standard? It looks like this is used in only two places: one expects an 
Array (where `values()` would work better), and the other just uses it to build 
a documentation (where `Arrays.toString(...)` might work just as well).

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -401,10 +429,46 @@ public Integer getRebalanceTimeout() {
         return getInt(DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG);
     }
 
+    @Override
+    public boolean exactlyOnceSourceEnabled() {
+        return EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED.equalsIgnoreCase(
+                getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG)
+        );
+    }
+
+    public boolean transactionalLeaderEnabled() {
+        return Arrays.asList(EXACTLY_ONCE_SOURCE_SUPPORT_ENABLED, 
EXACTLY_ONCE_SOURCE_SUPPORT_PREPARING)
+                
.contains(getString(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG).toLowerCase(Locale.ROOT));
+    }

Review comment:
       Should we have an enum for the `enabled`, `preparing` and `disabled` 
literals, and should these make use of them? Also, it might be useful to have 
JavaDoc on these methods, simply to help future developers understand the 
intent.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer 
transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from 
{@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The 
source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after.

Review comment:
       Is the SourceTask implementation calling this method allowed to pass a 
null value here? If so, what happens? If not, please add that to the JavaDoc.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>

Review comment:
       We don't need this `<p>` tag.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##########
@@ -163,13 +181,24 @@ public synchronized boolean beginFlush() {
             }
 
             // And submit the data
-            log.debug("Submitting {} entries to backing store. The offsets 
are: {}", offsetsSerialized.size(), toFlush);
+            log.debug("Submitting {} entries to backing store. The offsets 
are: {}", offsetsSerialized.size(), flushed);
         }
 
-        return backingStore.set(offsetsSerialized, (error, result) -> {
-            boolean isCurrent = handleFinishWrite(flushId, error, result);
-            if (isCurrent && callback != null) {
-                callback.onCompletion(error, result);
+        return primaryBackingStore.set(offsetsSerialized, (primaryError, 
primaryResult) -> {
+            boolean isCurrent = handleFinishWrite(flushId, primaryError, 
primaryResult);
+            if (isCurrent) {
+                if (callback != null) {
+                    callback.onCompletion(primaryError, primaryResult);
+                }
+                if (secondaryBackingStore != null && primaryError == null) {
+                    secondaryBackingStore.set(offsetsSerialized, 
(secondaryError, secondaryResult) -> {
+                        if (secondaryError != null) {
+                            log.warn("Failed to write offsets ({}) to 
secondary backing store", flushed, secondaryError);
+                        } else {
+                            log.debug("Successfully flushed offsets ({}) to 
secondary backing store", flushed);

Review comment:
       Do these log messages include the Connector context for EOS-enabled 
source connectors?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##########
@@ -43,6 +44,13 @@
  * storage to achieve exactly once semantics).
  * </p>
  * <p>
+ * In order to support per-connector offsets topics but continue to back up 
progress to a
+ * cluster-global offsets topic, the writer accepts an optional <i>secondary 
backing store</i>.
+ * After successful flushes to the primary backing store, the writer will copy 
the flushed offsets
+ * over to the secondary backing store on a best-effort basis. Failures to 
write to the secondary
+ * store are logged but otherwise swallowed silently.
+ * </p>
+ * <p>

Review comment:
       Did you consider introducing a new `OffsetBackingStore` implementation 
that writes to two other `OffsetBackingStore` implementations? That might 
simplify the logic in this class and better encapsulate the double write 
behavior.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -337,15 +353,29 @@ private void readToLogEnd() {
                 } else {
                     log.trace("Behind end offset {} for {}; last-read offset 
is {}",
                             endOffset, topicPartition, lastConsumedOffset);
-                    poll(Integer.MAX_VALUE);
+                    if (topicContainsTransactions) {
+                        // The consumer won't return from its poll method if a 
transaction is aborted, even though
+                        // its position will advance. So, we poll for at most 
one second, then give ourselves another
+                        // chance to check whether we've reached the end of 
the topic.
+                        poll(1000);

Review comment:
       And for reference, here is @C0urante's PR for that fix: #11046
   
   I agree, it would be good to avoid this hack if possible.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer 
transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from 
{@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The 
source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort the next batch of records from {@link 
SourceTask#poll()}. All of
+     * the records in that transaction will be discarded and will not appear 
in a committed transaction.
+     * However, offsets for that transaction will still be committed. If the 
data should be reprocessed,
+     * the task should not invoke this method and should instead throw an 
exception.
+     */
+    void abortTransaction();
+
+    /**
+     * Requests a transaction abort after a source record is processed. The 
source record will be the
+     * last record in the aborted transaction. All of the records in that 
transaction will be discarded
+     * and will not appear in a committed transaction. However, offsets for 
that transaction will still
+     * be committed. If the data should be reprocessed, the task should not 
invoke this method and
+     * should instead throw an exception.
+     * @param record the record to abort the transaction after.

Review comment:
       Is the SourceTask implementation calling this method allowed to pass a 
null value here? If so, what happens? If not, please add that to the JavaDoc.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = 
"transaction.boundary";
+
+    public enum TransactionBoundary {

Review comment:
       The `SourceTask` class is part of the public API for Connect, and so we 
should have JavaDoc on this enum and its literals and methods.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##########
@@ -72,6 +74,45 @@ static String lookupKafkaClusterId(Admin adminClient) {
         }
     }
 
+    /**
+     * Log a warning when the user attempts to override a property that cannot 
be overridden.
+     * @param props the configuration properties provided by the user
+     * @param key the name of the property to check on
+     * @param expectedValue the expected value for the property
+     * @param justification the reason the property cannot be overridden.
+     *                      Will follow the phrase "The value... for the... 
property will be ignored as it cannot be overridden ".
+     *                      For example, one might supply the message "in 
connectors with the DLQ feature enabled" for this parameter.
+     * @param caseSensitive whether the value should match case-insensitively
+     */
+    public static void warnOnOverriddenProperty(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        overriddenPropertyWarning(props, key, expectedValue, justification, 
caseSensitive).ifPresent(log::warn);
+    }
+
+    // Visible for testing
+    static Optional<String> overriddenPropertyWarning(
+            Map<String, ?> props,
+            String key,
+            String expectedValue,
+            String justification,
+            boolean caseSensitive) {
+        Predicate<String> matchesExpectedValue = caseSensitive ? 
expectedValue::equals : expectedValue::equalsIgnoreCase;
+        String value = 
Optional.ofNullable(props.get(key)).map(Object::toString).orElse(null);
+        if (value != null && !matchesExpectedValue.test(value)) {
+            return Optional.of(String.format(
+                    "The value '%s' for the '%s' property will be ignored as 
it cannot be overridden %s. "
+                            + "The value '%s' will be used instead.",
+                    value, key, justification, expectedValue
+            ));
+        } else {
+            return Optional.empty();
+        }

Review comment:
       Nit: what do you think about tolerating an empty or null `justification` 
string, since this methods does not ensure that one is provided?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
##########
@@ -192,6 +198,19 @@
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests";
     public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
 
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = 
"exactly.once.source.support";
+    public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to 
enable exactly-once support for source connectors in the cluster "
+            + "by writing source records and their offsets in a Kafka 
transaction, and by proactively fencing out old task generations before 
bringing up new ones. "
+            + "Note that this must be enabled on every worker in a cluster in 
order for exactly-once delivery to be guaranteed, "
+            + "and that some source connectors may still not be able to 
provide exactly-once delivery guarantees even with this support enabled. "
+            + "Permitted values are \"disabled\", \"preparing\", and 
\"enabled\". In order to safely enable exactly-once support for source 
connectors, "
+            + "all workers in the cluster must first be updated to use the 
\"preparing\" value for this property. "
+            + "Once this has been done, a second update of all of the workers 
in the cluster should be performed to change the value of this property to 
\"enabled\".";

Review comment:
       We should mention in the public docs that when enabling exactly once 
support for source connectors, consumers of the topics to which the EOS source 
connectors write should be configured with `isolation.level=read_committed`? 
After all, the default for consumers is `isolation.level=read_uncommitted` (see 
[ConsumerConfig.java](https://github.com/apache/kafka/blob/db1f581da7f3440cfd5be93800b4a9a2d7327a35/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java#L302)).
   
   It's clear to me that we should mention this, but it's not clear where we 
should do so. The user documentation generated from this doc string might be 
one spot that users will see routinely, so maybe it's a candidate. Another 
would be in the Kafka Connect docs about EOS for source connectors.
   
   BTW, can you add to this PR changes to the Kafka docs that describe this 
feature?
   

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -118,35 +123,39 @@ public KafkaBasedLog(String topic,
                          Callback<ConsumerRecord<K, V>> consumedCallback,
                          Time time,
                          Runnable initializer) {
-        this(topic, producerConfigs, consumerConfigs, () -> null, 
consumedCallback, time, initializer != null ? admin -> initializer.run() : 
null);
+        this(topic, producerConfigs, consumerConfigs, () -> null, 
consumedCallback, time, initializer != null ? admin -> initializer.run() : 
null, false);
     }
 
     /**
      * Create a new KafkaBasedLog object. This does not start reading the log 
and writing is not permitted until
      * {@link #start()} is invoked.
      *
-     * @param topic              the topic to treat as a log
-     * @param producerConfigs    configuration options to use when creating 
the internal producer. At a minimum this must
+     * @param topic                     the topic to treat as a log
+     * @param producerConfigs           configuration options to use when 
creating the internal producer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the number of acks, will be 
overridden to ensure correct behavior of this
      *                           class.
-     * @param consumerConfigs    configuration options to use when creating 
the internal consumer. At a minimum this must
+     * @param consumerConfigs           configuration options to use when 
creating the internal consumer. At a minimum this must
      *                           contain compatible serializer settings for 
the generic types used on this class. Some
      *                           setting, such as the auto offset reset 
policy, will be overridden to ensure correct
      *                           behavior of this class.
-     * @param topicAdminSupplier supplier function for an admin client, the 
lifecycle of which is expected to be controlled
+     * @param topicAdminSupplier        supplier function for an admin client, 
the lifecycle of which is expected to be controlled
      *                           by the calling component; may not be null
-     * @param consumedCallback   callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
-     * @param time               Time interface
-     * @param initializer        the function that should be run when this log 
is {@link #start() started}; may be null
+     * @param consumedCallback          callback to invoke for each {@link 
ConsumerRecord} consumed when tailing the log
+     * @param time                      Time interface
+     * @param initializer               the function that should be run when 
this log is {@link #start() started}; may be null
+     * @param topicContainsTransactions whether the topic being consumed 
contains (or is expected to contain) transactions;
+     *                                  if this is {@code false} and the topic 
does contain transactions, reads to the end of the log may block
+     *                                  indefinitely

Review comment:
       There are projects outside of Apache Kafka that do use this class. While 
this class is not a public API and we technically don't have to avoid breaking 
compatibility, it's fairly straightforward to maintain API compatibility and so 
we should do that.

##########
File path: 
connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
##########
@@ -20,13 +20,45 @@
 import org.apache.kafka.clients.producer.RecordMetadata;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * SourceTask is a Task that pulls records from another system for storage in 
Kafka.
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * <p>
+     * The configuration key that determines how source tasks will define 
transaction boundaries
+     * when exactly-once support is enabled.
+     * </p>

Review comment:
       The `</p>` has no meaning in JavaDoc, so we should remove these.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -210,26 +227,21 @@ public void stop() {
         synchronized (this) {
             stopRequested = true;
         }
-        consumer.wakeup();
-
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            throw new ConnectException("Failed to stop KafkaBasedLog. Exiting 
without cleanly shutting " +
-                    "down it's producer and consumer.", e);
+        if (consumer != null) {
+            consumer.wakeup();
         }
 
-        try {
-            producer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to stop KafkaBasedLog producer", e);
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                throw new ConnectException("Failed to stop KafkaBasedLog. 
Exiting without cleanly shutting " +
+                        "down it's producer and consumer.", e);
+            }
         }
 
-        try {
-            consumer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to stop KafkaBasedLog consumer", e);
-        }
+        Utils.closeQuietly(producer, "KafkaBasedLog producer");
+        Utils.closeQuietly(consumer, "KafkaBasedLog consumer");

Review comment:
       This change does alter the level at which these problems are logged. Is 
that intentional, and if so why?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -156,6 +165,14 @@ public KafkaBasedLog(String topic,
         this.readLogEndOffsetCallbacks = new ArrayDeque<>();
         this.time = time;
         this.initializer = initializer != null ? initializer : admin -> { };
+        this.topicContainsTransactions = topicContainsTransactions;
+
+        // If the consumer is configured with isolation.level = 
read_committed, then its end offsets method cannot be relied on
+        // as it will not take records from currently-open transactions into 
account. We want to err on the side of caution in that
+        // case: when users request a read to the end of the log, we will read 
up to the point where the latest offsets visible to the
+        // consumer are at least as high as the 
(possibly-part-of-a-transaction) end offsets of the topic.
+        this.requireAdminForOffsets = 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
+                
.equals(consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));

Review comment:
       Why not `equalsIgnoreCase(...)` here instead of lowercasing and then 
calling `equals(...)`?

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
##########
@@ -114,12 +124,18 @@ public synchronized boolean beginFlush() {
         if (data.isEmpty())
             return false;
 
-        assert !flushing();

Review comment:
       Why remove this assertion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to