leekeiabstraction commented on code in PR #208: URL: https://github.com/apache/flink-connector-aws/pull/208#discussion_r2149589839
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java: ########## @@ -19,43 +19,242 @@ package org.apache.flink.connector.kinesis.source.reader.fanout; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics; import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy; import org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit; import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState; +import org.apache.flink.connector.kinesis.source.split.StartingPosition; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT; +import static org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT; /** * An implementation of the KinesisShardSplitReader that consumes from Kinesis using Enhanced * Fan-Out and HTTP/2. */ @Internal public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class); private final AsyncStreamProxy asyncStreamProxy; private final String consumerArn; private final Duration subscriptionTimeout; + private final Duration deregisterTimeout; + + /** + * Shared executor service for all shard subscriptions. + * + * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) but this does not pose + * a risk of out-of-memory errors due to the natural flow control mechanisms in the system: + * + * <ol> + * <li>Each {@link FanOutKinesisShardSubscription} has a bounded event queue with capacity of 2</li> + * <li>New records are only requested after processing an event (via {@code requestRecords()})</li> + * <li>When a shard's queue is full, the processing thread blocks at the {@code put()} operation</li> + * <li>The AWS SDK implements the Reactive Streams protocol with built-in backpressure</li> + * </ol> + * + * <p>In the worst-case scenario during backpressure, the maximum number of events in memory is: + * <pre> + * Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, Number_of_Threads) + * </pre> + * + * <p>Where: + * <ul> + * <li>2 * Number_of_Shards: Total capacity of all shard event queues</li> + * <li>min(Number_of_Shards, Number_of_Threads): Maximum events being actively processed</li> + * </ul> + * + * <p>This ensures that memory usage scales linearly with the number of shards, not exponentially, + * making it safe to use an unbounded executor queue even with a large number of shards. + */ + private final ExecutorService sharedShardSubscriptionExecutor; + + /** + * Shared executor service for making subscribeToShard API calls. + * + * <p>This executor is separate from the event processing executor to avoid contention + * between API calls and event processing. Using a dedicated executor for subscription calls + * provides several important benefits: + * + * <ol> + * <li>Prevents blocking of the main thread or event processing threads during API calls</li> + * <li>Isolates API call failures from event processing operations</li> + * <li>Allows for controlled concurrency of API calls across multiple shards</li> + * <li>Prevents potential deadlocks that could occur when the same thread handles both + * subscription calls and event processing</li> + * </ol> + * + * <p>The executor uses a smaller number of threads than the event processing executor since + * subscription calls are less frequent and primarily I/O bound. This helps optimize resource + * usage while still providing sufficient parallelism for multiple concurrent subscription calls. + */ + private final ExecutorService sharedSubscriptionCallExecutor; private final Map<String, FanOutKinesisShardSubscription> splitSubscriptions = new HashMap<>(); + /** + * Factory for creating subscriptions. This is primarily used for testing. + */ + @VisibleForTesting + public interface SubscriptionFactory { + FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor); + } + + /** + * Default implementation of the subscription factory. + */ + private static class DefaultSubscriptionFactory implements SubscriptionFactory { + @Override + public FanOutKinesisShardSubscription createSubscription( + AsyncStreamProxy proxy, + String consumerArn, + String shardId, + StartingPosition startingPosition, + Duration timeout, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor) { + return new FanOutKinesisShardSubscription( + proxy, + consumerArn, + shardId, + startingPosition, + timeout, + eventProcessingExecutor, + subscriptionCallExecutor); + } + } + + private SubscriptionFactory subscriptionFactory; + public FanOutKinesisShardSplitReader( AsyncStreamProxy asyncStreamProxy, String consumerArn, Map<String, KinesisShardMetrics> shardMetricGroupMap, Configuration configuration) { + this(asyncStreamProxy, consumerArn, shardMetricGroupMap, configuration, new DefaultSubscriptionFactory()); + } + + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory) { + this( + asyncStreamProxy, + consumerArn, + shardMetricGroupMap, + configuration, + subscriptionFactory, + createDefaultEventProcessingExecutor(), + createDefaultSubscriptionCallExecutor()); + } + + /** + * Constructor with injected executor services for testing. + * + * @param asyncStreamProxy The proxy for Kinesis API calls + * @param consumerArn The ARN of the consumer + * @param shardMetricGroupMap The metrics map + * @param configuration The configuration + * @param subscriptionFactory The factory for creating subscriptions + * @param eventProcessingExecutor The executor service to use for event processing tasks + * @param subscriptionCallExecutor The executor service to use for subscription API calls + */ + @VisibleForTesting + FanOutKinesisShardSplitReader( + AsyncStreamProxy asyncStreamProxy, + String consumerArn, + Map<String, KinesisShardMetrics> shardMetricGroupMap, + Configuration configuration, + SubscriptionFactory subscriptionFactory, + ExecutorService eventProcessingExecutor, + ExecutorService subscriptionCallExecutor) { super(shardMetricGroupMap, configuration); this.asyncStreamProxy = asyncStreamProxy; this.consumerArn = consumerArn; this.subscriptionTimeout = configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT); + this.deregisterTimeout = configuration.get(EFO_DEREGISTER_CONSUMER_TIMEOUT); + this.subscriptionFactory = subscriptionFactory; + this.sharedShardSubscriptionExecutor = eventProcessingExecutor; + this.sharedSubscriptionCallExecutor = subscriptionCallExecutor; Review Comment: Been thinking about this approach where `FanOutKinesisShardSplitReader` instantiates the ExecutorServices and passes them to `FanOutKinesisShardSubscription`. I have the following questions: 1. Would a backpressured shard drown out other shards? Specifically, if there is a shard that has large backlog and is severely backpressured by downstream, we would expect to see that the `subscriptionEventProcessingExecutor`'s input queue have a large number of callables queued for the backpressured shard. This would mean that the quieter shards' callables will be backpressured as well as they will be place in the same queue. This was not the case in current design in that a shard is never backpressured by its neighbours' downstream. This may cause something akin to data loss in the case where idle watermarking strategy would drop the now late arriving records from non-backpressured, quieter shards. 2. Are we breaking abstraction here by defining and passing around executor services? ------ I think instead of letting SplitReader instantiate ExecutorService, an alternative approach that we can consider is 1. FanOutKinesisShardSubscription instantiate executor service e.g. `Executors.newSingleThreadExecutor()` and 2. FanOutKinesisShardSubscription manage the lifecycle of the executor service. 3. The executor service can be used for both handling of subscription event and also re-subscribing. This will ensure that single backpressured shard will not impact throughput of other non-backpressured shard and also use of ExecutorService is fully encapsulated within FanOutKinesisShardSubscription. There is a change in the scaling here as we'd scale threads with 1 x number of shards instead of 1 to 2x number of processors, however I do not see big drawback here as we usually match or recommend number of cores(KPU) to number of shards. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org