C0urante commented on code in PR #11781: URL: https://github.com/apache/kafka/pull/11781#discussion_r896234734
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -16,56 +16,322 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +/** + * An {@link OffsetBackingStore} with support for reading from and writing to a worker-global + * offset backing store and/or a connector-specific offset backing store. + */ public class ConnectorOffsetBackingStore implements OffsetBackingStore { - private final OffsetBackingStore workerStore; + private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class); + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store and + * the worker-global offset store as the secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store backed primarily by the connector-specific offset topic and secondarily + * by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withConnectorAndWorkerStores( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + Objects.requireNonNull(connectorStore); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + workerStore, + connectorStore, + connectorStoreAdmin + ); + } + + /** + * Builds an offset store that uses the worker-global offset store as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param workerOffsetsTopic the name of the worker-global offset topic; may be null if the worker + * does not use an offset topic for its offset store + * @return an offset store for the connector backed solely by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withOnlyWorkerStore( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + String workerOffsetsTopic + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + return new ConnectorOffsetBackingStore(Time.SYSTEM, loggingContext, workerOffsetsTopic, workerStore, null, null); + } + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store for the connector backed solely by the connector-specific offset topic; never null + */ + public static ConnectorOffsetBackingStore withOnlyConnectorStore( + Supplier<LoggingContext> loggingContext, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + null, + connectorStore, + connectorStoreAdmin + ); + } + + private final Time time; + private final Supplier<LoggingContext> loggingContext; private final String primaryOffsetsTopic; + private final Optional<OffsetBackingStore> workerStore; + private final Optional<KafkaOffsetBackingStore> connectorStore; + private final Optional<TopicAdmin> connectorStoreAdmin; - public ConnectorOffsetBackingStore( + ConnectorOffsetBackingStore( + Time time, + Supplier<LoggingContext> loggingContext, + String primaryOffsetsTopic, OffsetBackingStore workerStore, - String primaryOffsetsTopic + KafkaOffsetBackingStore connectorStore, + TopicAdmin connectorStoreAdmin ) { - this.workerStore = workerStore; + if (workerStore == null && connectorStore == null) { + throw new IllegalArgumentException("At least one non-null offset store must be provided"); + } + this.time = time; + this.loggingContext = loggingContext; this.primaryOffsetsTopic = primaryOffsetsTopic; + this.workerStore = Optional.ofNullable(workerStore); + this.connectorStore = Optional.ofNullable(connectorStore); + this.connectorStoreAdmin = Optional.ofNullable(connectorStoreAdmin); } public String primaryOffsetsTopic() { return primaryOffsetsTopic; } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() start} that store. + * + * <p>The worker-global offset store is not modified; it is the caller's responsibility to ensure that it is started + * before calls to {@link #get(Collection)} and {@link #set(Map, Callback)} take place. + */ @Override public void start() { - // TODO + // Worker offset store should already be started + connectorStore.ifPresent(OffsetBackingStore::start); } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() stop} that store, + * and the {@link TopicAdmin} used by that store. + * + * <p>The worker-global offset store is not modified as it may be used for other connectors that either already exist, + * or will be created, on this worker. + */ @Override public void stop() { - // TODO + // Worker offset store should not be stopped as it may be used for multiple connectors + connectorStore.ifPresent(OffsetBackingStore::stop); + connectorStoreAdmin.ifPresent(TopicAdmin::close); } + /** + * Get the offset values for the specified keys. + * + * <p>If configured to use a connector-specific offset store, priority is given to the values contained in that store, + * and the values in the worker-global offset store (if one is provided) are used as a fallback for keys that are not + * present in the connector-specific store. + * + * <p>If not configured to use a connector-specific offset store, only the values contained in the worker-global + * offset store are returned. + + * @param keys list of keys to look up + * @return future for the resulting map from key to value + */ @Override public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) { - // TODO - return workerStore.get(keys); + Future<Map<ByteBuffer, ByteBuffer>> workerGetFuture = getFromStore(workerStore, keys); + Future<Map<ByteBuffer, ByteBuffer>> connectorGetFuture = getFromStore(connectorStore, keys); + + return new Future<Map<ByteBuffer, ByteBuffer>>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return workerGetFuture.cancel(mayInterruptIfRunning) + || connectorGetFuture.cancel(mayInterruptIfRunning); Review Comment: Ah yes, good point. Switched to `|` instead of `||`, which disables short-circuiting so that it's guaranteed that `cancel` is called on both futures. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -16,56 +16,322 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +/** + * An {@link OffsetBackingStore} with support for reading from and writing to a worker-global + * offset backing store and/or a connector-specific offset backing store. + */ public class ConnectorOffsetBackingStore implements OffsetBackingStore { - private final OffsetBackingStore workerStore; + private static final Logger log = LoggerFactory.getLogger(ConnectorOffsetBackingStore.class); + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store and + * the worker-global offset store as the secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store backed primarily by the connector-specific offset topic and secondarily + * by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withConnectorAndWorkerStores( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + Objects.requireNonNull(connectorStore); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + workerStore, + connectorStore, + connectorStoreAdmin + ); + } + + /** + * Builds an offset store that uses the worker-global offset store as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param workerStore the worker-global offset store; may not be null + * @param workerOffsetsTopic the name of the worker-global offset topic; may be null if the worker + * does not use an offset topic for its offset store + * @return an offset store for the connector backed solely by the worker-global offset store; never null + */ + public static ConnectorOffsetBackingStore withOnlyWorkerStore( + Supplier<LoggingContext> loggingContext, + OffsetBackingStore workerStore, + String workerOffsetsTopic + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(workerStore); + return new ConnectorOffsetBackingStore(Time.SYSTEM, loggingContext, workerOffsetsTopic, workerStore, null, null); + } + + /** + * Builds an offset store that uses a connector-specific offset topic as the primary store, and no secondary store. + * + * @param loggingContext a {@link Supplier} for the {@link LoggingContext} that should be used + * for messages logged by this offset store; may not be null, and may never return null + * @param connectorStore the connector-specific offset store; may not be null + * @param connectorOffsetsTopic the name of the connector-specific offset topic; may not be null + * @param connectorStoreAdmin the topic admin to use for the connector-specific offset topic; may not be null + * @return an offset store for the connector backed solely by the connector-specific offset topic; never null + */ + public static ConnectorOffsetBackingStore withOnlyConnectorStore( + Supplier<LoggingContext> loggingContext, + KafkaOffsetBackingStore connectorStore, + String connectorOffsetsTopic, + TopicAdmin connectorStoreAdmin + ) { + Objects.requireNonNull(loggingContext); + Objects.requireNonNull(connectorOffsetsTopic); + Objects.requireNonNull(connectorStoreAdmin); + return new ConnectorOffsetBackingStore( + Time.SYSTEM, + loggingContext, + connectorOffsetsTopic, + null, + connectorStore, + connectorStoreAdmin + ); + } + + private final Time time; + private final Supplier<LoggingContext> loggingContext; private final String primaryOffsetsTopic; + private final Optional<OffsetBackingStore> workerStore; + private final Optional<KafkaOffsetBackingStore> connectorStore; + private final Optional<TopicAdmin> connectorStoreAdmin; - public ConnectorOffsetBackingStore( + ConnectorOffsetBackingStore( + Time time, + Supplier<LoggingContext> loggingContext, + String primaryOffsetsTopic, OffsetBackingStore workerStore, - String primaryOffsetsTopic + KafkaOffsetBackingStore connectorStore, + TopicAdmin connectorStoreAdmin ) { - this.workerStore = workerStore; + if (workerStore == null && connectorStore == null) { + throw new IllegalArgumentException("At least one non-null offset store must be provided"); + } + this.time = time; + this.loggingContext = loggingContext; this.primaryOffsetsTopic = primaryOffsetsTopic; + this.workerStore = Optional.ofNullable(workerStore); + this.connectorStore = Optional.ofNullable(connectorStore); + this.connectorStoreAdmin = Optional.ofNullable(connectorStoreAdmin); } public String primaryOffsetsTopic() { return primaryOffsetsTopic; } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() start} that store. + * + * <p>The worker-global offset store is not modified; it is the caller's responsibility to ensure that it is started + * before calls to {@link #get(Collection)} and {@link #set(Map, Callback)} take place. + */ @Override public void start() { - // TODO + // Worker offset store should already be started + connectorStore.ifPresent(OffsetBackingStore::start); } + /** + * If configured to use a connector-specific offset store, {@link OffsetBackingStore#start() stop} that store, + * and the {@link TopicAdmin} used by that store. + * + * <p>The worker-global offset store is not modified as it may be used for other connectors that either already exist, + * or will be created, on this worker. + */ @Override public void stop() { - // TODO + // Worker offset store should not be stopped as it may be used for multiple connectors + connectorStore.ifPresent(OffsetBackingStore::stop); + connectorStoreAdmin.ifPresent(TopicAdmin::close); } + /** + * Get the offset values for the specified keys. + * + * <p>If configured to use a connector-specific offset store, priority is given to the values contained in that store, + * and the values in the worker-global offset store (if one is provided) are used as a fallback for keys that are not + * present in the connector-specific store. + * + * <p>If not configured to use a connector-specific offset store, only the values contained in the worker-global + * offset store are returned. + + * @param keys list of keys to look up + * @return future for the resulting map from key to value + */ @Override public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys) { - // TODO - return workerStore.get(keys); + Future<Map<ByteBuffer, ByteBuffer>> workerGetFuture = getFromStore(workerStore, keys); + Future<Map<ByteBuffer, ByteBuffer>> connectorGetFuture = getFromStore(connectorStore, keys); + + return new Future<Map<ByteBuffer, ByteBuffer>>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return workerGetFuture.cancel(mayInterruptIfRunning) + || connectorGetFuture.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return workerGetFuture.isCancelled() + || connectorGetFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return workerGetFuture.isDone() + && connectorGetFuture.isDone(); + } + + @Override + public Map<ByteBuffer, ByteBuffer> get() throws InterruptedException, ExecutionException { + Map<ByteBuffer, ByteBuffer> result = new HashMap<>(workerGetFuture.get()); + result.putAll(connectorGetFuture.get()); + return result; + } + + @Override + public Map<ByteBuffer, ByteBuffer> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long endTime = time.milliseconds() + unit.toMillis(timeout); + Map<ByteBuffer, ByteBuffer> result = new HashMap<>(workerGetFuture.get(timeout, unit)); + timeout = Math.max(1, endTime - time.milliseconds()); Review Comment: 👍 done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org