tombentley commented on code in PR #11781: URL: https://github.com/apache/kafka/pull/11781#discussion_r895874082
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ########## @@ -1327,30 +1334,39 @@ public WorkerTask doBuild(Task task, connectorClientConfigOverridePolicy, kafkaClusterId); KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps); - TopicAdmin topicAdmin; + // Prepare to create a topic admin if the task requires one, but do not actually create an instance + // until/unless one is needed + final AtomicReference<TopicAdmin> topicAdmin = new AtomicReference<>(); + final Supplier<TopicAdmin> topicAdminCreator = () -> topicAdmin.updateAndGet(existingAdmin -> { + if (existingAdmin != null) { + return existingAdmin; + } + Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, + sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); + Admin adminClient = Admin.create(adminOverrides); + return new TopicAdmin(adminOverrides.get(BOOTSTRAP_SERVERS_CONFIG), adminClient); + }); + Map<String, TopicCreationGroup> topicCreationGroups; if (config.topicCreationEnable() && sourceConfig.usesTopicCreation()) { topicCreationGroups = TopicCreationGroup.configuredGroups(sourceConfig); // Create a topic admin that the task can use for topic creation - Map<String, Object> adminOverrides = adminConfigs(id.connector(), "connector-adminclient-" + id, config, - sourceConfig, connectorClass, connectorClientConfigOverridePolicy, kafkaClusterId, ConnectorType.SOURCE); - topicAdmin = new TopicAdmin(adminOverrides); + topicAdminCreator.get(); Review Comment: I think this call to `get` could result in a leaked `TopicAdmin`, and thus `Admin`, because `offsetStoreForRegularSourceTask` doesn't guarantee to invoke `topicAdminCreator` (it calls it `topicAdminSupplier`) (when it does, the Admin will eventually we closed via `ConnectorOffsetBackingStore.stop`). In any case I think the ownership of the `Admin` here (responsibility for ultimately closing it) is pretty unclear, but it's not clear to me why it _needs_ to be. So a comment to explain would be great, if a clean up of the logic is no possible. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -16,56 +16,322 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +/** + * An {@link OffsetBackingStore} with support for reading from and writing to a worker-global + * offset backing store and/or a connector-specific offset backing store. + */ public class ConnectorOffsetBackingStore implements OffsetBackingStore { - private final OffsetBackingStore workerStore; + private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class); + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store and + * the worker-global offset store as the secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store backed primarily by the connector-specific offset topic and secondarily + * by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withConnectorAndWorkerStores( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + Objects.requireNonNull(connectorStore); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + workerStore, + connectorStore, + connectorStoreAdmin + ); + } + + /** + * Builds an offset store that uses the worker-global offset store as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param workerOffsetsTopic the name of the worker-global offset topic; may be null if the worker + * does not use an offset topic for its offset store + * @return an offset store for the connector backed solely by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withOnlyWorkerStore( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + String workerOffsetsTopic + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + return new ConnectorOffsetBackingStore(Time.SYSTEM, loggingContext, workerOffsetsTopic, workerStore, null, null); + } + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store for the connector backed solely by the connector-specific offset topic; never null + */ + public static ConnectorOffsetBackingStore withOnlyConnectorStore( + Supplier<LoggingContext> loggingContext, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + null, + connectorStore, + connectorStoreAdmin + ); + } + + private final Time time; + private final Supplier<LoggingContext> loggingContext; private final String primaryOffsetsTopic; + private final Optional<OffsetBackingStore> workerStore; + private final Optional<KafkaOffsetBackingStore> connectorStore; + private final Optional<TopicAdmin> connectorStoreAdmin; - public ConnectorOffsetBackingStore( + ConnectorOffsetBackingStore( + Time time, + Supplier<LoggingContext> loggingContext, + String primaryOffsetsTopic, OffsetBackingStore workerStore, - String primaryOffsetsTopic + KafkaOffsetBackingStore connectorStore, + TopicAdmin connectorStoreAdmin ) { - this.workerStore = workerStore; + if (workerStore == null && connectorStore == null) { + throw new IllegalArgumentException("At least one non-null offset store must be provided"); + } + this.time = time; + this.loggingContext = loggingContext; this.primaryOffsetsTopic = primaryOffsetsTopic; + this.workerStore = Optional.ofNullable(workerStore); + this.connectorStore = Optional.ofNullable(connectorStore); + this.connectorStoreAdmin = Optional.ofNullable(connectorStoreAdmin); } public String primaryOffsetsTopic() { return primaryOffsetsTopic; } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() start} that store. + * + * <p>The worker-global offset store is not modified; it is the caller's responsibility to ensure that it is started + * before calls to {@link #get(Collection)} and {@link #set(Map, Callback)} take place. + */ @Override public void start() { - // TODO + // Worker offset store should already be started + connectorStore.ifPresent(OffsetBackingStore::start); } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() stop} that store, + * and the {@link TopicAdmin} used by that store. + * + * <p>The worker-global offset store is not modified as it may be used for other connectors that either already exist, + * or will be created, on this worker. + */ @Override public void stop() { - // TODO + // Worker offset store should not be stopped as it may be used for multiple connectors + connectorStore.ifPresent(OffsetBackingStore::stop); + connectorStoreAdmin.ifPresent(TopicAdmin::close); } + /** + * Get the offset values for the specified keys. + * + * <p>If configured to use a connector-specific offset store, priority is given to the values contained in that store, + * and the values in the worker-global offset store (if one is provided) are used as a fallback for keys that are not + * present in the connector-specific store. + * + * <p>If not configured to use a connector-specific offset store, only the values contained in the worker-global + * offset store are returned. + + * @param keys list of keys to look up + * @return future for the resulting map from key to value + */ @Override public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) { - // TODO - return workerStore.get(keys); + Future<Map<ByteBuffer, ByteBuffer>> workerGetFuture = getFromStore(workerStore, keys); + Future<Map<ByteBuffer, ByteBuffer>> connectorGetFuture = getFromStore(connectorStore, keys); + + return new Future<Map<ByteBuffer, ByteBuffer>>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return workerGetFuture.cancel(mayInterruptIfRunning) + || connectorGetFuture.cancel(mayInterruptIfRunning); Review Comment: Is it correct that if `workerGetFuture.cancel` was already cancelled, and returned true, then we won't try to cancel `connectorGetFuture`? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -16,56 +16,322 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +/** + * An {@link OffsetBackingStore} with support for reading from and writing to a worker-global + * offset backing store and/or a connector-specific offset backing store. + */ public class ConnectorOffsetBackingStore implements OffsetBackingStore { - private final OffsetBackingStore workerStore; + private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class); + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store and + * the worker-global offset store as the secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store backed primarily by the connector-specific offset topic and secondarily + * by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withConnectorAndWorkerStores( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + Objects.requireNonNull(connectorStore); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + workerStore, + connectorStore, + connectorStoreAdmin + ); + } + + /** + * Builds an offset store that uses the worker-global offset store as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param workerOffsetsTopic the name of the worker-global offset topic; may be null if the worker + * does not use an offset topic for its offset store + * @return an offset store for the connector backed solely by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withOnlyWorkerStore( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + String workerOffsetsTopic + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + return new ConnectorOffsetBackingStore(Time.SYSTEM, loggingContext, workerOffsetsTopic, workerStore, null, null); + } + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store for the connector backed solely by the connector-specific offset topic; never null + */ + public static ConnectorOffsetBackingStore withOnlyConnectorStore( + Supplier<LoggingContext> loggingContext, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + null, + connectorStore, + connectorStoreAdmin + ); + } + + private final Time time; + private final Supplier<LoggingContext> loggingContext; private final String primaryOffsetsTopic; + private final Optional<OffsetBackingStore> workerStore; + private final Optional<KafkaOffsetBackingStore> connectorStore; + private final Optional<TopicAdmin> connectorStoreAdmin; - public ConnectorOffsetBackingStore( + ConnectorOffsetBackingStore( + Time time, + Supplier<LoggingContext> loggingContext, + String primaryOffsetsTopic, OffsetBackingStore workerStore, - String primaryOffsetsTopic + KafkaOffsetBackingStore connectorStore, + TopicAdmin connectorStoreAdmin ) { - this.workerStore = workerStore; + if (workerStore == null && connectorStore == null) { + throw new IllegalArgumentException("At least one non-null offset store must be provided"); + } + this.time = time; + this.loggingContext = loggingContext; this.primaryOffsetsTopic = primaryOffsetsTopic; + this.workerStore = Optional.ofNullable(workerStore); + this.connectorStore = Optional.ofNullable(connectorStore); + this.connectorStoreAdmin = Optional.ofNullable(connectorStoreAdmin); } public String primaryOffsetsTopic() { return primaryOffsetsTopic; } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() start} that store. + * + * <p>The worker-global offset store is not modified; it is the caller's responsibility to ensure that it is started + * before calls to {@link #get(Collection)} and {@link #set(Map, Callback)} take place. + */ @Override public void start() { - // TODO + // Worker offset store should already be started + connectorStore.ifPresent(OffsetBackingStore::start); } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() stop} that store, + * and the {@link TopicAdmin} used by that store. + * + * <p>The worker-global offset store is not modified as it may be used for other connectors that either already exist, + * or will be created, on this worker. + */ @Override public void stop() { - // TODO + // Worker offset store should not be stopped as it may be used for multiple connectors + connectorStore.ifPresent(OffsetBackingStore::stop); + connectorStoreAdmin.ifPresent(TopicAdmin::close); } + /** + * Get the offset values for the specified keys. + * + * <p>If configured to use a connector-specific offset store, priority is given to the values contained in that store, + * and the values in the worker-global offset store (if one is provided) are used as a fallback for keys that are not + * present in the connector-specific store. + * + * <p>If not configured to use a connector-specific offset store, only the values contained in the worker-global + * offset store are returned. + + * @param keys list of keys to look up + * @return future for the resulting map from key to value + */ @Override public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) { - // TODO - return workerStore.get(keys); + Future<Map<ByteBuffer, ByteBuffer>> workerGetFuture = getFromStore(workerStore, keys); + Future<Map<ByteBuffer, ByteBuffer>> connectorGetFuture = getFromStore(connectorStore, keys); + + return new Future<Map<ByteBuffer, ByteBuffer>>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return workerGetFuture.cancel(mayInterruptIfRunning) + || connectorGetFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return workerGetFuture.isCancelled() + || connectorGetFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return workerGetFuture.isDone() + && connectorGetFuture.isDone(); + } + + @Override + public Map<ByteBuffer, ByteBuffer> get() throws InterruptedException, ExecutionException { + Map<ByteBuffer, ByteBuffer> result = new HashMap<>(workerGetFuture.get()); + result.putAll(connectorGetFuture.get()); + return result; + } + + @Override + public Map<ByteBuffer, ByteBuffer> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long endTime = time.milliseconds() + unit.toMillis(timeout); + Map<ByteBuffer, ByteBuffer> result = new HashMap<>(workerGetFuture.get(timeout, unit)); + timeout = Math.max(1, endTime - time.milliseconds()); Review Comment: It's quite confusing that `timeout` is initially in `unit`, but in this reassignment becomes ms. I think maybe a new `timeoutRemainingMs` would be clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org