aws-nageshvh commented on code in PR #208: URL: https://github.com/apache/flink-connector-aws/pull/208#discussion_r2098924842
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -45,17 +52,128 @@ public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { private final String consumerArn; private final Duration subscriptionTimeout; + /** + * Shared executor service for all shard subscriptions. + * + * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) to ensure no tasks are ever rejected. + * Although the queue is technically unbounded, the system has natural flow control mechanisms that effectively + * bound the queue size: + * + * <ol> + * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event queue with capacity of 2</li> + * <li>New records are only requested after processing an event (via {@code requestRecords()})</li> + * <li>The maximum number of queued tasks is effectively bounded by {@code 2 * number_of_shards}</li> + * </ol> + * + * <p>This design provides natural backpressure while ensuring no records are dropped, making it safe + * to use an unbounded executor queue. + */ + private final ExecutorService sharedShardSubscriptionExecutor; + private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions = new HashMap<>(); + /** + * Factory for creating subscriptions. This is primarily used for testing. + */ + @VisibleForTesting + public interface SubscriptionFactory { + FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService executor); + } + + /** + * Default implementation of the subscription factory. + */ + private static class DefaultSubscriptionFactory implements SubscriptionFactory { + @Override + public FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService executor) { + return new FanOutKinesisShardSubscription( + proxy, + consumerArn, + shardId, + startingPosition, + timeout, + executor); + } + } + + private SubscriptionFactory subscriptionFactory; + public FanOutKinesisShardSplitReader( AsyncStreamProxy asyncStreamProxy, String consumerArn, Map<String, KinesisShardMetrics> shardMetricGroupMap, Configuration configuration) { + this(asyncStreamProxy, consumerArn, shardMetricGroupMap, configuration, new DefaultSubscriptionFactory()); + } + + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory) { + this( + asyncStreamProxy, + consumerArn, + shardMetricGroupMap, + configuration, + subscriptionFactory, + createDefaultExecutor()); + } + + /** + * Constructor with injected executor service for testing. + * + * @param asyncStreamProxy The proxy for Kinesis API calls + * @param consumerArn The ARN of the consumer + * @param shardMetricGroupMap The metrics map + * @param configuration The configuration + * @param subscriptionFactory The factory for creating subscriptions + * @param executorService The executor service to use for subscription tasks + */ + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory, + ExecutorService executorService) { super(shardMetricGroupMap, configuration); this.asyncStreamProxy = asyncStreamProxy; this.consumerArn = consumerArn; this.subscriptionTimeout = configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT); + this.subscriptionFactory = subscriptionFactory; + this.sharedShardSubscriptionExecutor = executorService; + } + + /** + * Creates the default executor service for subscription tasks. + * + * @return A new executor service + */ + private static ExecutorService createDefaultExecutor() { + int minThreads = Runtime.getRuntime().availableProcessors(); + int maxThreads = minThreads * 2; + return new ThreadPoolExecutor( + minThreads, + maxThreads, + 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), // Unbounded queue with natural flow control Review Comment: Yes, this is added in the documentation too which I further refined it in the next iteration by adding some more details, Despite using an unbounded queue in the executor, we will highly likely not run into OOM problems because: 1. The number of threads is limited (2x the number of cores) 2. Each shard has a bounded queue with capacity 2 3. We only request more records after successfully processing and storing an event 4. If a shard's queue is full, the processing blocks, creating back-pressure 5. This effectively bounds the total number of events in memory to `2 * number_of_shards` In the worst-case scenario during backpressure, the maximum number of events in memory is Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads). I have been running a few tests to test these scenarios, a) 10 shards to one JVM with 4GB mem and running from TRIM for 10 days of backfill. The app catches up without a job restart/OOM b) 1 shards to one JVM with 4GB mem and running from TRIM for 10 days of backfill. The app catches up without a job restart/OOM Both the apps are writing to a 1 shard destination stream which makes it such that apps are constantly in backpressure till they catch up to tip -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org