leekeiabstraction commented on code in PR #208: URL: https://github.com/apache/flink-connector-aws/pull/208#discussion_r2140306990
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -19,43 +19,242 @@ package org.apache.flink.connector.kinesis.source.reader.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT; /** * An implementation of the KinesisShardSplitReader that consumes from Kinesis using Enhanced * Fan-Out and HTTP/2. */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class); private final AsyncStreamProxy asyncStreamProxy; private final String consumerArn; private final Duration subscriptionTimeout; + private final Duration deregisterTimeout; + + /** + * Shared executor service for all shard subscriptions. + * + * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) but this does not pose + * a risk of out-of-memory errors due to the natural flow control mechanisms in the system: + * + * <ol> + * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event queue with capacity of 2</li> + * <li>New records are only requested after processing an event (via {@code requestRecords()})</li> + * <li>When a shard's queue is full, the processing thread blocks at the {@code put()} operation</li> + * <li>The AWS SDK implements the Reactive Streams protocol with built-in backpressure</li> + * </ol> + * + * <p>In the worst-case scenario during backpressure, the maximum number of events in memory is: + * <pre> + * Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads) + * </pre> + * + * <p>Where: + * <ul> + * <li>2 * Number_of_Shards: Total capacity of all shard event queues</li> + * <li>min(Number_of_Shards, Number_of_Threads): Maximum events being actively processed</li> + * </ul> + * + * <p>This ensures that memory usage scales linearly with the number of shards, not exponentially, + * making it safe to use an unbounded executor queue even with a large number of shards. + */ + private final ExecutorService sharedShardSubscriptionExecutor; Review Comment: Let's make the name of the event executor service consistent, there are currently 2 names.. `sharedShardSubscriptionExecutor` and `eventProcessingExecutor`; ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -19,43 +19,242 @@ package org.apache.flink.connector.kinesis.source.reader.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT; /** * An implementation of the KinesisShardSplitReader that consumes from Kinesis using Enhanced * Fan-Out and HTTP/2. */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class); private final AsyncStreamProxy asyncStreamProxy; private final String consumerArn; private final Duration subscriptionTimeout; + private final Duration deregisterTimeout; + + /** + * Shared executor service for all shard subscriptions. + * + * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) but this does not pose + * a risk of out-of-memory errors due to the natural flow control mechanisms in the system: + * + * <ol> + * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event queue with capacity of 2</li> + * <li>New records are only requested after processing an event (via {@code requestRecords()})</li> + * <li>When a shard's queue is full, the processing thread blocks at the {@code put()} operation</li> + * <li>The AWS SDK implements the Reactive Streams protocol with built-in backpressure</li> + * </ol> + * + * <p>In the worst-case scenario during backpressure, the maximum number of events in memory is: + * <pre> + * Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads) + * </pre> + * + * <p>Where: + * <ul> + * <li>2 * Number_of_Shards: Total capacity of all shard event queues</li> + * <li>min(Number_of_Shards, Number_of_Threads): Maximum events being actively processed</li> + * </ul> + * + * <p>This ensures that memory usage scales linearly with the number of shards, not exponentially, + * making it safe to use an unbounded executor queue even with a large number of shards. + */ + private final ExecutorService sharedShardSubscriptionExecutor; + + /** + * Shared executor service for making subscribeToShard API calls. + * + * <p>This executor is separate from the event processing executor to avoid contention + * between API calls and event processing. Using a dedicated executor for subscription calls + * provides several important benefits: + * + * <ol> + * <li>Prevents blocking of the main thread or event processing threads during API calls</li> + * <li>Isolates API call failures from event processing operations</li> + * <li>Allows for controlled concurrency of API calls across multiple shards</li> + * <li>Prevents potential deadlocks that could occur when the same thread handles both + * subscription calls and event processing</li> + * </ol> + * + * <p>The executor uses a smaller number of threads than the event processing executor since + * subscription calls are less frequent and primarily I/O bound. This helps optimize resource + * usage while still providing sufficient parallelism for multiple concurrent subscription calls. + */ + private final ExecutorService sharedSubscriptionCallExecutor; private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions = new HashMap<>(); + /** + * Factory for creating subscriptions. This is primarily used for testing. + */ + @VisibleForTesting + public interface SubscriptionFactory { + FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor); + } Review Comment: Should SubscriptionFactory be an interface in FanOutKinesisShardSubscription or in its own file instead? ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java: ########## @@ -262,17 +373,21 @@ private FanOutShardSubscriber(CountDownLatch subscriptionLatch) { } public void requestRecords() { - subscription.request(1); + if (subscription != null) { + subscription.request(1); + } else { + LOG.warn("Cannot request records - subscription is null for shard {}", shardId); Review Comment: Under what condition does this happen? If not handled correctly, wouldn't this cause a subscription to stop getting further data? Should we just let it throw NPE here instead? ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -80,19 +279,167 @@ public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) { super.handleSplitsChanges(splitsChanges); for (KinesisShardSplit split : splitsChanges.splits()) { FanOutKinesisShardSubscription subscription = - new FanOutKinesisShardSubscription( + subscriptionFactory.createSubscription( asyncStreamProxy, consumerArn, split.getShardId(), split.getStartingPosition(), - subscriptionTimeout); + subscriptionTimeout, + sharedShardSubscriptionExecutor, + sharedSubscriptionCallExecutor); subscription.activateSubscription(); splitSubscriptions.put(split.splitId(), subscription); } } + /** + * Closes the split reader and releases all resources. + * + * <p>The close method follows a specific order to ensure proper shutdown: + * 1. First, cancel all active subscriptions to prevent new events from being processed + * 2. Then, shutdown the shared executor service to stop processing existing events + * 3. Finally, close the async stream proxy to release network resources + * + * <p>This ordering is critical because: + * - Cancelling subscriptions first prevents new events from being submitted to the executor + * - Shutting down the executor next ensures all in-flight tasks complete or are cancelled + * - Closing the async stream proxy last ensures all resources are properly released after + * all processing has stopped + */ @Override public void close() throws Exception { - asyncStreamProxy.close(); + cancelActiveSubscriptions(); + shutdownSharedShardSubscriptionExecutor(); + shutdownSharedSubscriptionCallExecutor(); + closeAsyncStreamProxy(); + } + + /** + * Cancels all active subscriptions to prevent new events from being processed. + * + * <p>After cancelling subscriptions, we wait a short time to allow the cancellation + * signals to propagate before proceeding with executor shutdown. + */ + private void cancelActiveSubscriptions() { + for (FanOutKinesisShardSubscription subscription : splitSubscriptions.values()) { + if (subscription.isActive()) { + try { + subscription.cancelSubscription(); + } catch (Exception e) { + LOG.warn("Error cancelling subscription for shard {}", + subscription.getShardId(), e); + } + } + } + + // Wait a short time (200ms) to allow cancellation signals to propagate + // This helps ensure that no new tasks are submitted to the executor after we begin shutdown + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + /** + * Shuts down the shared executor service used for processing subscription events. + * + * <p>We use the EFO_DEREGISTER_CONSUMER_TIMEOUT (10 seconds) as the shutdown timeout + * to maintain consistency with other deregistration operations in the connector. + */ + private void shutdownSharedShardSubscriptionExecutor() { + if (sharedShardSubscriptionExecutor == null) { + return; + } + + sharedShardSubscriptionExecutor.shutdown(); + try { + // Use the deregister consumer timeout (10 seconds) + // This timeout is consistent with other deregistration operations in the connector + if (!sharedShardSubscriptionExecutor.awaitTermination( + deregisterTimeout.toMillis(), + TimeUnit.MILLISECONDS)) { + LOG.warn("Event processing executor did not terminate in the specified time. Forcing shutdown."); + sharedShardSubscriptionExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for event processing executor shutdown", e); + sharedShardSubscriptionExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** + * Shuts down the shared executor service used for subscription API calls. + * + * <p>We use the EFO_DEREGISTER_CONSUMER_TIMEOUT (10 seconds) as the shutdown timeout + * to maintain consistency with other deregistration operations in the connector. + */ + private void shutdownSharedSubscriptionCallExecutor() { + if (sharedSubscriptionCallExecutor == null) { + return; + } + + sharedSubscriptionCallExecutor.shutdown(); + try { + // Use a shorter timeout since these are just API calls + if (!sharedSubscriptionCallExecutor.awaitTermination( + deregisterTimeout.toMillis(), + TimeUnit.MILLISECONDS)) { + LOG.warn("Subscription call executor did not terminate in the specified time. Forcing shutdown."); + sharedSubscriptionCallExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for subscription call executor shutdown", e); + sharedSubscriptionCallExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** + * Closes the async stream proxy with a timeout. + * + * <p>We use the EFO_CONSUMER_SUBSCRIPTION_TIMEOUT (60 seconds) as the close timeout + * since closing the client involves similar network operations as subscription. + * The longer timeout accounts for potential network delays during shutdown. + */ + private void closeAsyncStreamProxy() { + // Create a dedicated single-threaded executor for closing the asyncStreamProxy + // This prevents the close operation from being affected by the main executor shutdown + ExecutorService closeExecutor = new ThreadPoolExecutor( + 1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ExecutorThreadFactory("kinesis-client-close")); Review Comment: Let's use more elaborate thread names e.g. close-async-stream-proxy-executor-service ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -19,43 +19,242 @@ package org.apache.flink.connector.kinesis.source.reader.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT; /** * An implementation of the KinesisShardSplitReader that consumes from Kinesis using Enhanced * Fan-Out and HTTP/2. */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class); private final AsyncStreamProxy asyncStreamProxy; private final String consumerArn; private final Duration subscriptionTimeout; + private final Duration deregisterTimeout; + + /** + * Shared executor service for all shard subscriptions. + * + * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) but this does not pose + * a risk of out-of-memory errors due to the natural flow control mechanisms in the system: + * + * <ol> + * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event queue with capacity of 2</li> + * <li>New records are only requested after processing an event (via {@code requestRecords()})</li> + * <li>When a shard's queue is full, the processing thread blocks at the {@code put()} operation</li> + * <li>The AWS SDK implements the Reactive Streams protocol with built-in backpressure</li> + * </ol> + * + * <p>In the worst-case scenario during backpressure, the maximum number of events in memory is: + * <pre> + * Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads) + * </pre> + * + * <p>Where: + * <ul> + * <li>2 * Number_of_Shards: Total capacity of all shard event queues</li> + * <li>min(Number_of_Shards, Number_of_Threads): Maximum events being actively processed</li> + * </ul> + * + * <p>This ensures that memory usage scales linearly with the number of shards, not exponentially, + * making it safe to use an unbounded executor queue even with a large number of shards. + */ + private final ExecutorService sharedShardSubscriptionExecutor; + + /** + * Shared executor service for making subscribeToShard API calls. + * + * <p>This executor is separate from the event processing executor to avoid contention + * between API calls and event processing. Using a dedicated executor for subscription calls + * provides several important benefits: + * + * <ol> + * <li>Prevents blocking of the main thread or event processing threads during API calls</li> + * <li>Isolates API call failures from event processing operations</li> + * <li>Allows for controlled concurrency of API calls across multiple shards</li> + * <li>Prevents potential deadlocks that could occur when the same thread handles both + * subscription calls and event processing</li> + * </ol> + * + * <p>The executor uses a smaller number of threads than the event processing executor since + * subscription calls are less frequent and primarily I/O bound. This helps optimize resource + * usage while still providing sufficient parallelism for multiple concurrent subscription calls. + */ + private final ExecutorService sharedSubscriptionCallExecutor; private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions = new HashMap<>(); + /** + * Factory for creating subscriptions. This is primarily used for testing. + */ + @VisibleForTesting + public interface SubscriptionFactory { + FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor); + } + + /** + * Default implementation of the subscription factory. + */ + private static class DefaultSubscriptionFactory implements SubscriptionFactory { + @Override + public FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor) { + return new FanOutKinesisShardSubscription( + proxy, + consumerArn, + shardId, + startingPosition, + timeout, + eventProcessingExecutor, + subscriptionCallExecutor); + } + } + + private SubscriptionFactory subscriptionFactory; + public FanOutKinesisShardSplitReader( AsyncStreamProxy asyncStreamProxy, String consumerArn, Map<String, KinesisShardMetrics> shardMetricGroupMap, Configuration configuration) { + this(asyncStreamProxy, consumerArn, shardMetricGroupMap, configuration, new DefaultSubscriptionFactory()); + } + + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory) { + this( + asyncStreamProxy, + consumerArn, + shardMetricGroupMap, + configuration, + subscriptionFactory, + createDefaultEventProcessingExecutor(), + createDefaultSubscriptionCallExecutor()); + } + + /** + * Constructor with injected executor services for testing. + * + * @param asyncStreamProxy The proxy for Kinesis API calls + * @param consumerArn The ARN of the consumer + * @param shardMetricGroupMap The metrics map + * @param configuration The configuration + * @param subscriptionFactory The factory for creating subscriptions + * @param eventProcessingExecutor The executor service to use for event processing tasks + * @param subscriptionCallExecutor The executor service to use for subscription API calls + */ + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor) { super(shardMetricGroupMap, configuration); this.asyncStreamProxy = asyncStreamProxy; this.consumerArn = consumerArn; this.subscriptionTimeout = configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT); + this.deregisterTimeout = configuration.get(EFO_DEREGISTER_CONSUMER_TIMEOUT); + this.subscriptionFactory = subscriptionFactory; + this.sharedShardSubscriptionExecutor = eventProcessingExecutor; + this.sharedSubscriptionCallExecutor = subscriptionCallExecutor; + } + + /** + * Creates the default executor service for event processing tasks. + * + * @return A new executor service + */ + private static ExecutorService createDefaultEventProcessingExecutor() { + int minThreads = Runtime.getRuntime().availableProcessors(); + int maxThreads = minThreads * 2; + return new ThreadPoolExecutor( + minThreads, + maxThreads, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control + new ExecutorThreadFactory("kinesis-efo-subscription")); + } + + /** + * Creates the default executor service for subscription API calls. + * + * <p>This executor is configured with: + * <ul> + * <li>Minimum threads: 1 - Ensures at least one thread is always available for API calls</li> + * <li>Maximum threads: max(2, availableProcessors/4) - Scales with system resources but + * keeps the thread count relatively low since API calls are I/O bound</li> + * <li>Keep-alive time: 60 seconds - Allows for efficient reuse of threads</li> + * <li>Unbounded queue - Safe because the number of subscription tasks is naturally limited + * by the number of shards</li> + * <li>Custom thread factory - Provides meaningful thread names for debugging</li> + * </ul> + * + * <p>This configuration balances resource efficiency with responsiveness for subscription calls. + * Since subscription calls are primarily waiting on network I/O, a relatively small number of + * threads can efficiently handle many concurrent calls. + * + * @return A new executor service optimized for subscription API calls + */ + private static ExecutorService createDefaultSubscriptionCallExecutor() { + int minThreads = 1; + int maxThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 4); + return new ThreadPoolExecutor( + minThreads, + maxThreads, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control + new ExecutorThreadFactory("kinesis-subscription-caller")); Review Comment: Let's make the threads' names consistent with the executor name e.g. subscription-call-executor-service-thread ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -19,43 +19,242 @@ package org.apache.flink.connector.kinesis.source.reader.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT; /** * An implementation of the KinesisShardSplitReader that consumes from Kinesis using Enhanced * Fan-Out and HTTP/2. */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class); private final AsyncStreamProxy asyncStreamProxy; private final String consumerArn; private final Duration subscriptionTimeout; + private final Duration deregisterTimeout; + + /** + * Shared executor service for all shard subscriptions. + * + * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) but this does not pose + * a risk of out-of-memory errors due to the natural flow control mechanisms in the system: + * + * <ol> + * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event queue with capacity of 2</li> + * <li>New records are only requested after processing an event (via {@code requestRecords()})</li> + * <li>When a shard's queue is full, the processing thread blocks at the {@code put()} operation</li> + * <li>The AWS SDK implements the Reactive Streams protocol with built-in backpressure</li> + * </ol> + * + * <p>In the worst-case scenario during backpressure, the maximum number of events in memory is: + * <pre> + * Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads) + * </pre> + * + * <p>Where: + * <ul> + * <li>2 * Number_of_Shards: Total capacity of all shard event queues</li> + * <li>min(Number_of_Shards, Number_of_Threads): Maximum events being actively processed</li> + * </ul> + * + * <p>This ensures that memory usage scales linearly with the number of shards, not exponentially, + * making it safe to use an unbounded executor queue even with a large number of shards. + */ + private final ExecutorService sharedShardSubscriptionExecutor; + + /** + * Shared executor service for making subscribeToShard API calls. + * + * <p>This executor is separate from the event processing executor to avoid contention + * between API calls and event processing. Using a dedicated executor for subscription calls + * provides several important benefits: + * + * <ol> + * <li>Prevents blocking of the main thread or event processing threads during API calls</li> + * <li>Isolates API call failures from event processing operations</li> + * <li>Allows for controlled concurrency of API calls across multiple shards</li> + * <li>Prevents potential deadlocks that could occur when the same thread handles both + * subscription calls and event processing</li> + * </ol> + * + * <p>The executor uses a smaller number of threads than the event processing executor since + * subscription calls are less frequent and primarily I/O bound. This helps optimize resource + * usage while still providing sufficient parallelism for multiple concurrent subscription calls. + */ + private final ExecutorService sharedSubscriptionCallExecutor; private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions = new HashMap<>(); + /** + * Factory for creating subscriptions. This is primarily used for testing. + */ + @VisibleForTesting + public interface SubscriptionFactory { + FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor); + } + + /** + * Default implementation of the subscription factory. + */ + private static class DefaultSubscriptionFactory implements SubscriptionFactory { + @Override + public FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor) { + return new FanOutKinesisShardSubscription( + proxy, + consumerArn, + shardId, + startingPosition, + timeout, + eventProcessingExecutor, + subscriptionCallExecutor); + } + } + + private SubscriptionFactory subscriptionFactory; + public FanOutKinesisShardSplitReader( AsyncStreamProxy asyncStreamProxy, String consumerArn, Map<String, KinesisShardMetrics> shardMetricGroupMap, Configuration configuration) { + this(asyncStreamProxy, consumerArn, shardMetricGroupMap, configuration, new DefaultSubscriptionFactory()); + } + + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory) { + this( + asyncStreamProxy, + consumerArn, + shardMetricGroupMap, + configuration, + subscriptionFactory, + createDefaultEventProcessingExecutor(), + createDefaultSubscriptionCallExecutor()); + } + + /** + * Constructor with injected executor services for testing. + * + * @param asyncStreamProxy The proxy for Kinesis API calls + * @param consumerArn The ARN of the consumer + * @param shardMetricGroupMap The metrics map + * @param configuration The configuration + * @param subscriptionFactory The factory for creating subscriptions + * @param eventProcessingExecutor The executor service to use for event processing tasks + * @param subscriptionCallExecutor The executor service to use for subscription API calls + */ + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor) { super(shardMetricGroupMap, configuration); this.asyncStreamProxy = asyncStreamProxy; this.consumerArn = consumerArn; this.subscriptionTimeout = configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT); + this.deregisterTimeout = configuration.get(EFO_DEREGISTER_CONSUMER_TIMEOUT); + this.subscriptionFactory = subscriptionFactory; + this.sharedShardSubscriptionExecutor = eventProcessingExecutor; + this.sharedSubscriptionCallExecutor = subscriptionCallExecutor; + } + + /** + * Creates the default executor service for event processing tasks. + * + * @return A new executor service + */ + private static ExecutorService createDefaultEventProcessingExecutor() { + int minThreads = Runtime.getRuntime().availableProcessors(); + int maxThreads = minThreads * 2; + return new ThreadPoolExecutor( + minThreads, + maxThreads, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control + new ExecutorThreadFactory("kinesis-efo-subscription")); Review Comment: Let's make the threads' names consistent with the executor name e.g. event-processing-executor-service-thread ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -80,19 +279,167 @@ public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) { super.handleSplitsChanges(splitsChanges); for (KinesisShardSplit split : splitsChanges.splits()) { FanOutKinesisShardSubscription subscription = - new FanOutKinesisShardSubscription( + subscriptionFactory.createSubscription( asyncStreamProxy, consumerArn, split.getShardId(), split.getStartingPosition(), - subscriptionTimeout); + subscriptionTimeout, + sharedShardSubscriptionExecutor, + sharedSubscriptionCallExecutor); subscription.activateSubscription(); splitSubscriptions.put(split.splitId(), subscription); } } + /** + * Closes the split reader and releases all resources. + * + * <p>The close method follows a specific order to ensure proper shutdown: + * 1. First, cancel all active subscriptions to prevent new events from being processed + * 2. Then, shutdown the shared executor service to stop processing existing events + * 3. Finally, close the async stream proxy to release network resources + * + * <p>This ordering is critical because: + * - Cancelling subscriptions first prevents new events from being submitted to the executor + * - Shutting down the executor next ensures all in-flight tasks complete or are cancelled + * - Closing the async stream proxy last ensures all resources are properly released after + * all processing has stopped + */ @Override public void close() throws Exception { - asyncStreamProxy.close(); + cancelActiveSubscriptions(); + shutdownSharedShardSubscriptionExecutor(); + shutdownSharedSubscriptionCallExecutor(); + closeAsyncStreamProxy(); + } + + /** + * Cancels all active subscriptions to prevent new events from being processed. + * + * <p>After cancelling subscriptions, we wait a short time to allow the cancellation + * signals to propagate before proceeding with executor shutdown. + */ + private void cancelActiveSubscriptions() { + for (FanOutKinesisShardSubscription subscription : splitSubscriptions.values()) { + if (subscription.isActive()) { Review Comment: Is this if necessary? cancelSubscription() already check if subscription is active. ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java: ########## @@ -132,27 +217,49 @@ public void activateSubscription() { }) .build(); - // We don't need to keep track of the future here because we monitor subscription success - // using our own CountDownLatch - kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler) - .exceptionally( - throwable -> { - // If consumer exists and is still activating, we want to countdown. - if (ExceptionUtils.findThrowable( - throwable, ResourceInUseException.class) - .isPresent()) { - waitForSubscriptionLatch.countDown(); - return null; - } - LOG.error( - "Error subscribing to shard {} with starting position {} for consumer {}.", - shardId, - startingPosition, - consumerArn, - throwable); - terminateSubscription(throwable); - return null; - }); + // Use the executor service to make the subscription call + // This offloads the potentially blocking API call to a dedicated thread pool, + // preventing it from blocking the main thread or the event processing threads. + // This separation is crucial to avoid potential deadlocks that could occur when + // the Netty event loop thread (used by the AWS SDK) needs to handle both the + // subscription call and the resulting events. + CompletableFuture<Void> subscriptionFuture = CompletableFuture.supplyAsync( + () -> { + try { + LOG.debug("Making subscribeToShard API call for shard {} on thread {}", + shardId, Thread.currentThread().getName()); + + // Make the API call using the provided executor + return kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler); + } catch (Exception e) { + // Handle any exceptions that occur during the API call + LOG.error("Exception during subscribeToShard API call for shard {}", shardId, e); + terminateSubscription(e); + waitForSubscriptionLatch.countDown(); + return CompletableFuture.<Void>completedFuture(null); + } + }, + subscriptionCallExecutor + ).thenCompose(future -> future); // Flatten the CompletableFuture<CompletableFuture<Void>> to CompletableFuture<Void> + + subscriptionFuture.exceptionally( Review Comment: Should we retain the previous comment? ``` // We don't need to keep track of the future here because we monitor subscription success // using our own CountDownLatch ``` ########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java: ########## @@ -132,27 +217,49 @@ public void activateSubscription() { }) .build(); - // We don't need to keep track of the future here because we monitor subscription success - // using our own CountDownLatch - kinesis.subscribeToShard(consumerArn, shardId, startingPosition, responseHandler) - .exceptionally( - throwable -> { - // If consumer exists and is still activating, we want to countdown. - if (ExceptionUtils.findThrowable( - throwable, ResourceInUseException.class) - .isPresent()) { - waitForSubscriptionLatch.countDown(); - return null; - } - LOG.error( - "Error subscribing to shard {} with starting position {} for consumer {}.", - shardId, - startingPosition, - consumerArn, - throwable); - terminateSubscription(throwable); - return null; - }); + // Use the executor service to make the subscription call + // This offloads the potentially blocking API call to a dedicated thread pool, + // preventing it from blocking the main thread or the event processing threads. + // This separation is crucial to avoid potential deadlocks that could occur when + // the Netty event loop thread (used by the AWS SDK) needs to handle both the + // subscription call and the resulting events. + CompletableFuture<Void> subscriptionFuture = CompletableFuture.supplyAsync( + () -> { + try { + LOG.debug("Making subscribeToShard API call for shard {} on thread {}", + shardId, Thread.currentThread().getName()); Review Comment: Is logging out the thread necessary? The thread information should normally be logged out already as a separate field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org