tombentley commented on code in PR #11781: URL: https://github.com/apache/kafka/pull/11781#discussion_r900964836
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1425,6 +1426,256 @@ public WorkerTask doBuild(Task task, } } + // Visible for testing + ConnectorOffsetBackingStore offsetStoreForRegularSourceConnector( + SourceConnectorConfig sourceConfig, + String connName, + Connector connector + ) { + String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic(); + + Map<String, Object> producerProps = baseProducerConfigs(connName, "connector-producer-" + connName, config, sourceConfig, connector.getClass(), + connectorClientConfigOverridePolicy, kafkaClusterId); + + // We use a connector-specific store (i.e., a dedicated KafkaOffsetBackingStore for this connector) + // if the worker supports per-connector offsets topics (which may be the case in distributed but not standalone mode, for example) + // and if the connector is explicitly configured with an offsets topic + final boolean usesConnectorSpecificStore = connectorSpecificOffsetsTopic != null + && config.connectorOffsetsTopicsPermitted(); + + if (usesConnectorSpecificStore) { + Map<String, Object> consumerProps = regularSourceOffsetsConsumerConfigs( + connName, "connector-consumer-" + connName, config, sourceConfig, connector.getClass(), + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + Map<String, Object> adminOverrides = adminConfigs(connName, "connector-adminclient-" + connName, config, + sourceConfig, connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + + TopicAdmin admin = new TopicAdmin(adminOverrides); + KafkaOffsetBackingStore connectorStore = + KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, admin); + + // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct + // an offset store that has a primary and a secondary store which both read from that same topic. + // So, if the user has explicitly configured the connector with a connector-specific offsets topic + // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global + // offset store and build a store backed exclusively by a connector-specific offsets store. + // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that + // would prevent users from being able to customize the config properties used for the Kafka clients that + // access the offsets topic, and we would not be able to establish reasonable defaults like setting + // isolation.level=read_committed for the offsets topic consumer for this connector + if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, producerProps)) { + return ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector(connName), + connectorStore, + connectorSpecificOffsetsTopic, + admin + ); + } else { + return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector(connName), + globalOffsetBackingStore, + connectorStore, + connectorSpecificOffsetsTopic, + admin + ); + } + } else { + return ConnectorOffsetBackingStore.withOnlyWorkerStore( + () -> LoggingContext.forConnector(connName), + globalOffsetBackingStore, + config.offsetsTopic() + ); + } + } + + // Visible for testing + ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceConnector( + SourceConnectorConfig sourceConfig, + String connName, + Connector connector + ) { + String connectorSpecificOffsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); + + Map<String, Object> producerProps = baseProducerConfigs(connName, "connector-producer-" + connName, config, sourceConfig, connector.getClass(), + connectorClientConfigOverridePolicy, kafkaClusterId); + + Map<String, Object> consumerProps = exactlyOnceSourceOffsetsConsumerConfigs( + connName, "connector-consumer-" + connName, config, sourceConfig, connector.getClass(), + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + Map<String, Object> adminOverrides = adminConfigs(connName, "connector-adminclient-" + connName, config, + sourceConfig, connector.getClass(), connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + + TopicAdmin admin = new TopicAdmin(adminOverrides); + KafkaOffsetBackingStore connectorStore = + KafkaOffsetBackingStore.forConnector(connectorSpecificOffsetsTopic, consumer, admin); + + // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct + // an offset store that has a primary and a secondary store which both read from that same topic. + // So, even if the user has explicitly configured the connector with a connector-specific offsets topic, + // if we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global + // offset store and build a store backed exclusively by a connector-specific offsets store. + // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that + // would prevent users from being able to customize the config properties used for the Kafka clients that + // access the offsets topic, and may lead to confusion for them when tasks are created for the connector + // since they will all have their own dedicated offsets stores anyways + if (sameOffsetTopicAsWorker(connectorSpecificOffsetsTopic, producerProps)) { + return ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector(connName), + connectorStore, + connectorSpecificOffsetsTopic, + admin + ); + } else { + return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector(connName), + globalOffsetBackingStore, + connectorStore, + connectorSpecificOffsetsTopic, + admin + ); + } + } + + // Visible for testing + ConnectorOffsetBackingStore offsetStoreForRegularSourceTask( + ConnectorTaskId id, + SourceConnectorConfig sourceConfig, + Class<? extends Connector> connectorClass, + Producer<byte[], byte[]> producer, + Map<String, Object> producerProps, + TopicAdmin topicAdmin + ) { + String connectorSpecificOffsetsTopic = sourceConfig.offsetsTopic(); + + if (regularSourceTaskUsesConnectorSpecificOffsetsStore(sourceConfig)) { + Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when configured to use their own offsets topic"); + + Map<String, Object> consumerProps = regularSourceOffsetsConsumerConfigs( + id.connector(), "connector-consumer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + KafkaOffsetBackingStore connectorStore = + KafkaOffsetBackingStore.forTask(sourceConfig.offsetsTopic(), producer, consumer, topicAdmin); + + // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct + // an offset store that has a primary and a secondary store which both read from that same topic. + // So, if the user has (implicitly or explicitly) configured the connector with a connector-specific offsets topic + // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global + // offset store and build a store backed exclusively by a connector-specific offsets store. + // It may seem reasonable to instead build a store backed exclusively by the worker-global offset store, but that + // would prevent users from being able to customize the config properties used for the Kafka clients that + // access the offsets topic, and we would not be able to establish reasonable defaults like setting + // isolation.level=read_committed for the offsets topic consumer for this task + if (sameOffsetTopicAsWorker(sourceConfig.offsetsTopic(), producerProps)) { + return ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forTask(id), + connectorStore, + connectorSpecificOffsetsTopic, + topicAdmin + ); + } else { + return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + connectorStore, + connectorSpecificOffsetsTopic, + topicAdmin + ); + } + } else { + return ConnectorOffsetBackingStore.withOnlyWorkerStore( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + config.offsetsTopic() + ); + } + } + + // Visible for testing + ConnectorOffsetBackingStore offsetStoreForExactlyOnceSourceTask( + ConnectorTaskId id, + SourceConnectorConfig sourceConfig, + Class<? extends Connector> connectorClass, + Producer<byte[], byte[]> producer, + Map<String, Object> producerProps, + TopicAdmin topicAdmin + ) { + Objects.requireNonNull(topicAdmin, "Source tasks require a non-null topic admin when exactly-once support is enabled"); + + Map<String, Object> consumerProps = exactlyOnceSourceOffsetsConsumerConfigs( + id.connector(), "connector-consumer-" + id, config, sourceConfig, connectorClass, + connectorClientConfigOverridePolicy, kafkaClusterId); + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps); + + String connectorOffsetsTopic = Optional.ofNullable(sourceConfig.offsetsTopic()).orElse(config.offsetsTopic()); + + KafkaOffsetBackingStore connectorStore = + KafkaOffsetBackingStore.forTask(connectorOffsetsTopic, producer, consumer, topicAdmin); + + // If the connector's offsets topic is the same as the worker-global offsets topic, there's no need to construct + // an offset store that has a primary and a secondary store which both read from that same topic. + // So, if the user has (implicitly or explicitly) configured the connector with a connector-specific offsets topic + // but we know that that topic is the same as the worker-global offsets topic, we ignore the worker-global + // offset store and build a store backed exclusively by a connector-specific offsets store. + // We cannot under any circumstances build an offset store backed exclusively by the worker-global offset store + // as that would prevent us from being able to write source records and source offset information for the task + // with the same producer, and therefore, in the same transaction. + if (sameOffsetTopicAsWorker(connectorOffsetsTopic, producerProps)) { + return ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forTask(id), + connectorStore, + connectorOffsetsTopic, + topicAdmin + ); + } else { + return ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forTask(id), + globalOffsetBackingStore, + connectorStore, + connectorOffsetsTopic, + topicAdmin + ); + } + } + + /** + * Gives a best-effort guess for whether the given offsets topic is the same topic as the worker-global offsets topic. + * Even if the name of the topic is the same as the name of the worker's offsets topic, the two may still be different topics + * if the connector is configured to produce to a different Kafka cluster than the one that hosts the worker's offsets topic. + * @param offsetsTopic the name of the offsets topic for the connector + * @param producerProps the producer configuration for the connector + * @return whether it appears that the connector's offsets topic is the same topic as the worker-global offsets topic. + * If {@code true}, it is guaranteed that the two are the same; + * if {@code false}, it is likely but not guaranteed that the two are not the same + */ + private boolean sameOffsetTopicAsWorker(String offsetsTopic, Map<String, Object> producerProps) { + // We can check the offset topic name and the Kafka cluster's bootstrap servers, + // although this isn't exact and can lead to some false negatives if the user + // provides an overridden bootstrap servers value for their producer that is different than + // the worker's but still resolves to the same Kafka cluster used by the worker. + // At the moment this is probably adequate, especially since we don't want to put + // a network ping to a remote Kafka cluster inside the herder's tick thread (which is where this + // logic takes place right now) in case that takes a while. + return offsetsTopic.equals(config.offsetsTopic()) + && config.bootstrapServers().equals(producerProps.get(BOOTSTRAP_SERVERS_CONFIG)); Review Comment: I guess we could improve this slightly by not comparing as strings, but as `Set<String>`. That way at least the order wouldn't matter, which it looks like it does right now. I've wondered whether we should add some kind of `cluster.id` to the client configs which could be used to assert that the client was connected to the expected cluster. If we did do that, and they were specified in the configs here, then you could safely know at this point whether they were the same cluster without needing to connect at this point. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ########## @@ -142,16 +231,40 @@ private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<Stri DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets"); } }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @Override public void start() { log.info("Starting KafkaOffsetBackingStore"); - offsetLog.start(); + try { + offsetLog.start(); + } catch (UnsupportedVersionException e) { + String message; + if (exactlyOnce) { + message = "Enabling exactly-once support for source connectors requires a Kafka broker version that allows " + + "admin clients to read consumer offsets. Please either disable the worker's exactly-once " + + "support for source connectors, or use a newer Kafka broker version."; + } else { + message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + "is set to " + + IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) + + ", a Kafka broker version that allows admin clients to read consumer offsets is required. " + + "Please either reconfigure the worker or connector, or use a newer Kafka broker version."; + } + throw new ConnectException(message, e); + } log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore"); } + /** + * Stop reading from and writing to the offsets topic, and relinquish resources allocated for interacting + * with it, including Kafka clients. + * <p> + * <b>Note:</b> if the now-deprecated {@link #KafkaOffsetBackingStore()} constructor was used to create + * this store, the underlying admin client allocated for interacting with the offsets topic will be closed. + * On the other hand, if the recommended {@link #KafkaOffsetBackingStore(Supplier)} constructor was used to + * create this store, the admin client derived from the given {@link Supplier} will not be closed and it is the + * caller's responsibility to manage its lifecycle accordingly. Review Comment: Great doc, but maybe we should also have some on those constructors themselves? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ########## @@ -131,7 +203,24 @@ private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<Stri Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback, final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) { - java.util.function.Consumer<TopicAdmin> createTopics = admin -> { + java.util.function.Consumer<TopicAdmin> createTopics = initialize(topic, topicDescription); + return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); + } + + protected NewTopic topicDescription(final String topic, final WorkerConfig config) { Review Comment: Might `newTopicSpecification` be a better name? It's just the "description" is a bit ambiguous: it could be describing an _existing_ topic. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ########## @@ -142,16 +231,40 @@ private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<Stri DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets"); } }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @Override public void start() { log.info("Starting KafkaOffsetBackingStore"); - offsetLog.start(); + try { + offsetLog.start(); + } catch (UnsupportedVersionException e) { + String message; + if (exactlyOnce) { + message = "Enabling exactly-once support for source connectors requires a Kafka broker version that allows " + + "admin clients to read consumer offsets. Please either disable the worker's exactly-once " + + "support for source connectors, or use a newer Kafka broker version."; + } else { + message = "When " + ConsumerConfig.ISOLATION_LEVEL_CONFIG + "is set to " + + IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT) + + ", a Kafka broker version that allows admin clients to read consumer offsets is required. " + + "Please either reconfigure the worker or connector, or use a newer Kafka broker version."; Review Comment: ```suggestion + "Please either reconfigure the worker or connector, or upgrade to a newer Kafka broker version."; ``` ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java: ########## @@ -142,16 +231,40 @@ private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<Stri DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "source connector offsets"); } }; - return new KafkaBasedLog<>(topic, producerProps, consumerProps, adminSupplier, consumedCallback, Time.SYSTEM, createTopics); } @Override public void start() { log.info("Starting KafkaOffsetBackingStore"); - offsetLog.start(); + try { + offsetLog.start(); + } catch (UnsupportedVersionException e) { + String message; + if (exactlyOnce) { + message = "Enabling exactly-once support for source connectors requires a Kafka broker version that allows " + + "admin clients to read consumer offsets. Please either disable the worker's exactly-once " + + "support for source connectors, or use a newer Kafka broker version."; Review Comment: ```suggestion + "support for source connectors, or upgrade to a newer Kafka broker version."; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org