tombentley commented on code in PR #11781:
URL: https://github.com/apache/kafka/pull/11781#discussion_r900964836


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1425,6 +1426,256 @@ public WorkerTask doBuild(Task task,
         }
     }
 
+    // Visible for testing
+    ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector(
+            SourceConnectorConfig sourceConfig,
+            String connName,
+            Connector connector
+    ) {
+        String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic();
+
+        Map<String, Object> producerProps = baseProducerConfigs(connName, 
"connector-producer-" + connName, config, sourceConfig, connector.getClass(),
+                connectorClientConfigOverridePolicy, kafkaClusterId);
+
+        // We use a connector-specific store (i.e., a dedicated 
KafkaOffsetBackingStore for this connector)
+        // if the worker supports per-connector offsets topics (which may be 
the case in distributed but not standalone mode, for example)
+        // and if the connector is explicitly configured with an offsets topic
+        final boolean usesConnectorSpecificStore = 
connectorSpecificOffsetsTopic != null
+                && config.connectorOffsetsTopicsPermitted();
+
+        if (usesConnectorSpecificStore) {
+            Map<String, Object> consumerProps = 
regularSourceOffsetsConsumerConfigs(
+                        connName, "connector-consumer-" + connName, config, 
sourceConfig, connector.getClass(),
+                        connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            Map<String, Object> adminOverrides = adminConfigs(connName, 
"connector-adminclient-" + connName, config,
+                    sourceConfig, connector.getClass(), 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+
+            TopicAdmin admin = new TopicAdmin(adminOverrides);
+            KafkaOffsetBackingStore connectorStore =
+                    
KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, 
admin);
+
+            // If the connector's offsets topic is the same as the 
worker-global offsets topic, there's no need to construct
+            // an offset store that has a primary and a secondary store which 
both read from that same topic.
+            // So, if the user has explicitly configured the connector with a 
connector-specific offsets topic
+            // but we know that that topic is the same as the worker-global 
offsets topic, we ignore the worker-global
+            // offset store and build a store backed exclusively by a 
connector-specific offsets store.
+            // It may seem reasonable to instead build a store backed 
exclusively by the worker-global offset store, but that
+            // would prevent users from being able to customize the config 
properties used for the Kafka clients that
+            // access the offsets topic, and we would not be able to establish 
reasonable defaults like setting
+            // isolation.level=read_committed for the offsets topic consumer 
for this connector
+            if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, 
producerProps)) {
+                return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+                        () -> LoggingContext.forConnector(connName),
+                        connectorStore,
+                        connectorSpecificOffsetsTopic,
+                        admin
+                );
+            } else {
+                return 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+                        () -> LoggingContext.forConnector(connName),
+                        globalOffsetBackingStore,
+                        connectorStore,
+                        connectorSpecificOffsetsTopic,
+                        admin
+                );
+            }
+        } else {
+            return ConnectorOffsetBackingStore.withOnlyWorkerStore(
+                    () -> LoggingContext.forConnector(connName),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+        }
+    }
+
+    // Visible for testing
+    ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector(
+            SourceConnectorConfig sourceConfig,
+            String connName,
+            Connector connector
+    ) {
+        String connectorSpecificOffsetsTopic = 
Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic());
+
+        Map<String, Object> producerProps = baseProducerConfigs(connName, 
"connector-producer-" + connName, config, sourceConfig, connector.getClass(),
+                connectorClientConfigOverridePolicy, kafkaClusterId);
+
+        Map<String, Object> consumerProps = 
exactlyOnceSourceOffsetsConsumerConfigs(
+                    connName, "connector-consumer-" + connName, config, 
sourceConfig, connector.getClass(),
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+        Map<String, Object> adminOverrides = adminConfigs(connName, 
"connector-adminclient-" + connName, config,
+                sourceConfig, connector.getClass(), 
connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE);
+
+        TopicAdmin admin = new TopicAdmin(adminOverrides);
+        KafkaOffsetBackingStore connectorStore =
+                
KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, 
admin);
+
+        // If the connector's offsets topic is the same as the worker-global 
offsets topic, there's no need to construct
+        // an offset store that has a primary and a secondary store which both 
read from that same topic.
+        // So, even if the user has explicitly configured the connector with a 
connector-specific offsets topic,
+        // if we know that that topic is the same as the worker-global offsets 
topic, we ignore the worker-global
+        // offset store and build a store backed exclusively by a 
connector-specific offsets store.
+        // It may seem reasonable to instead build a store backed exclusively 
by the worker-global offset store, but that
+        // would prevent users from being able to customize the config 
properties used for the Kafka clients that
+        // access the offsets topic, and may lead to confusion for them when 
tasks are created for the connector
+        // since they will all have their own dedicated offsets stores anyways
+        if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, 
producerProps)) {
+            return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+                    () -> LoggingContext.forConnector(connName),
+                    connectorStore,
+                    connectorSpecificOffsetsTopic,
+                    admin
+            );
+        } else {
+            return ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+                    () -> LoggingContext.forConnector(connName),
+                    globalOffsetBackingStore,
+                    connectorStore,
+                    connectorSpecificOffsetsTopic,
+                    admin
+            );
+        }
+    }
+
+    // Visible for testing
+    ConnectorOffsetBackingStore offsetStoreForRegularSourceTask(
+            ConnectorTaskId id,
+            SourceConnectorConfig sourceConfig,
+            Class<? extends Connector> connectorClass,
+            Producer<byte[], byte[]> producer,
+            Map<String, Object> producerProps,
+            TopicAdmin topicAdmin
+    ) {
+        String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic();
+
+        if (regularSourceTaskUsesConnectorSpecificOffsetsStore(sourceConfig)) {
+            Objects.requireNonNull(topicAdmin, "Source tasks require a 
non-null topic admin when configured to use their own offsets topic");
+
+            Map<String, Object> consumerProps = 
regularSourceOffsetsConsumerConfigs(
+                    id.connector(), "connector-consumer-" + id, config, 
sourceConfig, connectorClass,
+                    connectorClientConfigOverridePolicy, kafkaClusterId);
+            KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+            KafkaOffsetBackingStore connectorStore =
+                    
KafkaOffsetBackingStore.forTask(sourceConfig.offsetsTopic(), producer, 
consumer, topicAdmin);
+
+            // If the connector's offsets topic is the same as the 
worker-global offsets topic, there's no need to construct
+            // an offset store that has a primary and a secondary store which 
both read from that same topic.
+            // So, if the user has (implicitly or explicitly) configured the 
connector with a connector-specific offsets topic
+            // but we know that that topic is the same as the worker-global 
offsets topic, we ignore the worker-global
+            // offset store and build a store backed exclusively by a 
connector-specific offsets store.
+            // It may seem reasonable to instead build a store backed 
exclusively by the worker-global offset store, but that
+            // would prevent users from being able to customize the config 
properties used for the Kafka clients that
+            // access the offsets topic, and we would not be able to establish 
reasonable defaults like setting
+            // isolation.level=read_committed for the offsets topic consumer 
for this task
+            if (sameOffsetTopicAsWorker(sourceConfig.offsetsTopic(), 
producerProps)) {
+                return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+                        () -> LoggingContext.forTask(id),
+                        connectorStore,
+                        connectorSpecificOffsetsTopic,
+                        topicAdmin
+                );
+            } else {
+                return 
ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+                        () -> LoggingContext.forTask(id),
+                        globalOffsetBackingStore,
+                        connectorStore,
+                        connectorSpecificOffsetsTopic,
+                        topicAdmin
+                );
+            }
+        } else {
+            return ConnectorOffsetBackingStore.withOnlyWorkerStore(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    config.offsetsTopic()
+            );
+        }
+    }
+
+    // Visible for testing
+    ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask(
+            ConnectorTaskId id,
+            SourceConnectorConfig sourceConfig,
+            Class<? extends Connector> connectorClass,
+            Producer<byte[], byte[]> producer,
+            Map<String, Object> producerProps,
+            TopicAdmin topicAdmin
+    ) {
+        Objects.requireNonNull(topicAdmin, "Source tasks require a non-null 
topic admin when exactly-once support is enabled");
+
+        Map<String, Object> consumerProps = 
exactlyOnceSourceOffsetsConsumerConfigs(
+                id.connector(), "connector-consumer-" + id, config, 
sourceConfig, connectorClass,
+                connectorClientConfigOverridePolicy, kafkaClusterId);
+        KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerProps);
+
+        String connectorOffsetsTopic = 
Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic());
+
+        KafkaOffsetBackingStore connectorStore =
+                KafkaOffsetBackingStore.forTask(connectorOffsetsTopic, 
producer, consumer, topicAdmin);
+
+        // If the connector's offsets topic is the same as the worker-global 
offsets topic, there's no need to construct
+        // an offset store that has a primary and a secondary store which both 
read from that same topic.
+        // So, if the user has (implicitly or explicitly) configured the 
connector with a connector-specific offsets topic
+        // but we know that that topic is the same as the worker-global 
offsets topic, we ignore the worker-global
+        // offset store and build a store backed exclusively by a 
connector-specific offsets store.
+        // We cannot under any circumstances build an offset store backed 
exclusively by the worker-global offset store
+        // as that would prevent us from being able to write source records 
and source offset information for the task
+        // with the same producer, and therefore, in the same transaction.
+        if (sameOffsetTopicAsWorker(connectorOffsetsTopic, producerProps)) {
+            return ConnectorOffsetBackingStore.withOnlyConnectorStore(
+                    () -> LoggingContext.forTask(id),
+                    connectorStore,
+                    connectorOffsetsTopic,
+                    topicAdmin
+            );
+        } else {
+            return ConnectorOffsetBackingStore.withConnectorAndWorkerStores(
+                    () -> LoggingContext.forTask(id),
+                    globalOffsetBackingStore,
+                    connectorStore,
+                    connectorOffsetsTopic,
+                    topicAdmin
+            );
+        }
+    }
+
+    /**
+     * Gives a best-effort guess for whether the given offsets topic is the 
same topic as the worker-global offsets topic.
+     * Even if the name of the topic is the same as the name of the worker's 
offsets topic, the two may still be different topics
+     * if the connector is configured to produce to a different Kafka cluster 
than the one that hosts the worker's offsets topic.
+     * @param offsetsTopic the name of the offsets topic for the connector
+     * @param producerProps the producer configuration for the connector
+     * @return whether it appears that the connector's offsets topic is the 
same topic as the worker-global offsets topic.
+     * If {@code true}, it is guaranteed that the two are the same;
+     * if {@code false}, it is likely but not guaranteed that the two are not 
the same
+     */
+    private boolean sameOffsetTopicAsWorker(String offsetsTopic, Map<String, 
Object> producerProps) {
+        // We can check the offset topic name and the Kafka cluster's 
bootstrap servers,
+        // although this isn't exact and can lead to some false negatives if 
the user
+        // provides an overridden bootstrap servers value for their producer 
that is different than
+        // the worker's but still resolves to the same Kafka cluster used by 
the worker.
+        // At the moment this is probably adequate, especially since we don't 
want to put
+        // a network ping to a remote Kafka cluster inside the herder's tick 
thread (which is where this
+        // logic takes place right now) in case that takes a while.
+        return offsetsTopic.equals(config.offsetsTopic())
+                && 
config.bootstrapServers().equals(producerProps.get(BOOTSTRAP_SERVERS_CONFIG));

Review Comment:
   I guess we could improve this slightly by not comparing as strings, but as 
`Set<String>`. That way at least the order wouldn't matter, which it looks like 
it does right now.
   
   I've wondered whether we should add some kind of `cluster.id` to the client 
configs which could be used to assert that the client was connected to the 
expected cluster. If we did do that, and they were specified in the configs 
here, then you could safely know at this point whether they were the same 
cluster without needing to connect at this point.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -142,16 +231,40 @@ private KafkaBasedLog<byte[], byte[]> 
createKafkaBasedLog(String topic, Map<Stri
                         DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source 
connector offsets");
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @Override
     public void start() {
         log.info("Starting KafkaOffsetBackingStore");
-        offsetLog.start();
+        try {
+            offsetLog.start();
+        } catch (UnsupportedVersionException e) {
+            String message;
+            if (exactlyOnce) {
+                message = "Enabling exactly-once support for source connectors 
requires a Kafka broker version that allows "
+                        + "admin clients to read consumer offsets. Please 
either disable the worker's exactly-once "
+                        + "support for source connectors, or use a newer Kafka 
broker version.";
+            } else {
+                message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + 
"is set to "
+                        + 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
+                        + ", a Kafka broker version that allows admin clients 
to read consumer offsets is required. "
+                        + "Please either reconfigure the worker or connector, 
or use a newer Kafka broker version.";
+            }
+            throw new ConnectException(message, e);
+        }
         log.info("Finished reading offsets topic and starting 
KafkaOffsetBackingStore");
     }
 
+    /**
+     * Stop reading from and writing to the offsets topic, and relinquish 
resources allocated for interacting
+     * with it, including Kafka clients.
+     * <p>
+     * <b>Note:</b> if the now-deprecated {@link #KafkaOffsetBackingStore()} 
constructor was used to create
+     * this store, the underlying admin client allocated for interacting with 
the offsets topic will be closed.
+     * On the other hand, if the recommended {@link 
#KafkaOffsetBackingStore(Supplier)} constructor was used to
+     * create this store, the admin client derived from the given {@link 
Supplier} will not be closed and it is the
+     * caller's responsibility to manage its lifecycle accordingly.

Review Comment:
   Great doc, but maybe we should also have some on those constructors 
themselves?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -131,7 +203,24 @@ private KafkaBasedLog<byte[], byte[]> 
createKafkaBasedLog(String topic, Map<Stri
                                                               Map<String, 
Object> consumerProps,
                                                               
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
                                                               final NewTopic 
topicDescription, Supplier<TopicAdmin> adminSupplier) {
-        java.util.function.Consumer<TopicAdmin> createTopics = admin -> {
+        java.util.function.Consumer<TopicAdmin> createTopics = 
initialize(topic, topicDescription);
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
+    }
+
+    protected NewTopic topicDescription(final String topic, final WorkerConfig 
config) {

Review Comment:
   Might `newTopicSpecification` be a better name? It's just the "description" 
is a bit ambiguous: it could be describing an _existing_ topic. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -142,16 +231,40 @@ private KafkaBasedLog<byte[], byte[]> 
createKafkaBasedLog(String topic, Map<Stri
                         DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source 
connector offsets");
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @Override
     public void start() {
         log.info("Starting KafkaOffsetBackingStore");
-        offsetLog.start();
+        try {
+            offsetLog.start();
+        } catch (UnsupportedVersionException e) {
+            String message;
+            if (exactlyOnce) {
+                message = "Enabling exactly-once support for source connectors 
requires a Kafka broker version that allows "
+                        + "admin clients to read consumer offsets. Please 
either disable the worker's exactly-once "
+                        + "support for source connectors, or use a newer Kafka 
broker version.";
+            } else {
+                message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + 
"is set to "
+                        + 
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)
+                        + ", a Kafka broker version that allows admin clients 
to read consumer offsets is required. "
+                        + "Please either reconfigure the worker or connector, 
or use a newer Kafka broker version.";

Review Comment:
   ```suggestion
                           + "Please either reconfigure the worker or 
connector, or upgrade to a newer Kafka broker version.";
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java:
##########
@@ -142,16 +231,40 @@ private KafkaBasedLog<byte[], byte[]> 
createKafkaBasedLog(String topic, Map<Stri
                         DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source 
connector offsets");
             }
         };
-        return new KafkaBasedLog<>(topic, producerProps, consumerProps, 
adminSupplier, consumedCallback, Time.SYSTEM, createTopics);
     }
 
     @Override
     public void start() {
         log.info("Starting KafkaOffsetBackingStore");
-        offsetLog.start();
+        try {
+            offsetLog.start();
+        } catch (UnsupportedVersionException e) {
+            String message;
+            if (exactlyOnce) {
+                message = "Enabling exactly-once support for source connectors 
requires a Kafka broker version that allows "
+                        + "admin clients to read consumer offsets. Please 
either disable the worker's exactly-once "
+                        + "support for source connectors, or use a newer Kafka 
broker version.";

Review Comment:
   ```suggestion
                           + "support for source connectors, or upgrade to a 
newer Kafka broker version.";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to