leekeiabstraction commented on code in PR #208:
URL: 
https://github.com/apache/flink-connector-aws/pull/208#discussion_r2149589839


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java:
##########
@@ -19,43 +19,242 @@
 package org.apache.flink.connector.kinesis.source.reader.fanout;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.kinesis.source.metrics.KinesisShardMetrics;
 import org.apache.flink.connector.kinesis.source.proxy.AsyncStreamProxy;
 import 
org.apache.flink.connector.kinesis.source.reader.KinesisShardSplitReaderBase;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
 import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_CONSUMER_SUBSCRIPTION_TIMEOUT;
+import static 
org.apache.flink.connector.kinesis.source.config.KinesisSourceConfigOptions.EFO_DEREGISTER_CONSUMER_TIMEOUT;
 
 /**
  * An implementation of the KinesisShardSplitReader that consumes from Kinesis 
using Enhanced
  * Fan-Out and HTTP/2.
  */
 @Internal
 public class FanOutKinesisShardSplitReader extends KinesisShardSplitReaderBase 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FanOutKinesisShardSplitReader.class);
     private final AsyncStreamProxy asyncStreamProxy;
     private final String consumerArn;
     private final Duration subscriptionTimeout;
+    private final Duration deregisterTimeout;
+
+    /**
+     * Shared executor service for all shard subscriptions.
+     *
+     * <p>This executor uses an unbounded queue ({@link LinkedBlockingQueue}) 
but this does not pose
+     * a risk of out-of-memory errors due to the natural flow control 
mechanisms in the system:
+     *
+     * <ol>
+     *   <li>Each {@link FanOutKinesisShardSubscription} has a bounded event 
queue with capacity of 2</li>
+     *   <li>New records are only requested after processing an event (via 
{@code requestRecords()})</li>
+     *   <li>When a shard's queue is full, the processing thread blocks at the 
{@code put()} operation</li>
+     *   <li>The AWS SDK implements the Reactive Streams protocol with 
built-in backpressure</li>
+     * </ol>
+     *
+     * <p>In the worst-case scenario during backpressure, the maximum number 
of events in memory is:
+     * <pre>
+     * Max Events = (2 * Number_of_Shards) + min(Number_of_Shards, 
Number_of_Threads)
+     * </pre>
+     *
+     * <p>Where:
+     * <ul>
+     *   <li>2 * Number_of_Shards: Total capacity of all shard event 
queues</li>
+     *   <li>min(Number_of_Shards, Number_of_Threads): Maximum events being 
actively processed</li>
+     * </ul>
+     *
+     * <p>This ensures that memory usage scales linearly with the number of 
shards, not exponentially,
+     * making it safe to use an unbounded executor queue even with a large 
number of shards.
+     */
+    private final ExecutorService sharedShardSubscriptionExecutor;
+
+    /**
+     * Shared executor service for making subscribeToShard API calls.
+     *
+     * <p>This executor is separate from the event processing executor to 
avoid contention
+     * between API calls and event processing. Using a dedicated executor for 
subscription calls
+     * provides several important benefits:
+     *
+     * <ol>
+     *   <li>Prevents blocking of the main thread or event processing threads 
during API calls</li>
+     *   <li>Isolates API call failures from event processing operations</li>
+     *   <li>Allows for controlled concurrency of API calls across multiple 
shards</li>
+     *   <li>Prevents potential deadlocks that could occur when the same 
thread handles both
+     *       subscription calls and event processing</li>
+     * </ol>
+     *
+     * <p>The executor uses a smaller number of threads than the event 
processing executor since
+     * subscription calls are less frequent and primarily I/O bound. This 
helps optimize resource
+     * usage while still providing sufficient parallelism for multiple 
concurrent subscription calls.
+     */
+    private final ExecutorService sharedSubscriptionCallExecutor;
 
     private final Map<String, FanOutKinesisShardSubscription> 
splitSubscriptions = new HashMap<>();
 
+    /**
+     * Factory for creating subscriptions. This is primarily used for testing.
+     */
+    @VisibleForTesting
+    public interface SubscriptionFactory {
+        FanOutKinesisShardSubscription createSubscription(
+                AsyncStreamProxy proxy,
+                String consumerArn,
+                String shardId,
+                StartingPosition startingPosition,
+                Duration timeout,
+                ExecutorService eventProcessingExecutor,
+                ExecutorService subscriptionCallExecutor);
+    }
+
+    /**
+     * Default implementation of the subscription factory.
+     */
+    private static class DefaultSubscriptionFactory implements 
SubscriptionFactory {
+        @Override
+        public FanOutKinesisShardSubscription createSubscription(
+                AsyncStreamProxy proxy,
+                String consumerArn,
+                String shardId,
+                StartingPosition startingPosition,
+                Duration timeout,
+                ExecutorService eventProcessingExecutor,
+                ExecutorService subscriptionCallExecutor) {
+            return new FanOutKinesisShardSubscription(
+                    proxy,
+                    consumerArn,
+                    shardId,
+                    startingPosition,
+                    timeout,
+                    eventProcessingExecutor,
+                    subscriptionCallExecutor);
+        }
+    }
+
+    private SubscriptionFactory subscriptionFactory;
+
     public FanOutKinesisShardSplitReader(
             AsyncStreamProxy asyncStreamProxy,
             String consumerArn,
             Map<String, KinesisShardMetrics> shardMetricGroupMap,
             Configuration configuration) {
+        this(asyncStreamProxy, consumerArn, shardMetricGroupMap, 
configuration, new DefaultSubscriptionFactory());
+    }
+
+    @VisibleForTesting
+    FanOutKinesisShardSplitReader(
+            AsyncStreamProxy asyncStreamProxy,
+            String consumerArn,
+            Map<String, KinesisShardMetrics> shardMetricGroupMap,
+            Configuration configuration,
+            SubscriptionFactory subscriptionFactory) {
+        this(
+            asyncStreamProxy,
+            consumerArn,
+            shardMetricGroupMap,
+            configuration,
+            subscriptionFactory,
+            createDefaultEventProcessingExecutor(),
+            createDefaultSubscriptionCallExecutor());
+    }
+
+    /**
+     * Constructor with injected executor services for testing.
+     *
+     * @param asyncStreamProxy The proxy for Kinesis API calls
+     * @param consumerArn The ARN of the consumer
+     * @param shardMetricGroupMap The metrics map
+     * @param configuration The configuration
+     * @param subscriptionFactory The factory for creating subscriptions
+     * @param eventProcessingExecutor The executor service to use for event 
processing tasks
+     * @param subscriptionCallExecutor The executor service to use for 
subscription API calls
+     */
+    @VisibleForTesting
+    FanOutKinesisShardSplitReader(
+            AsyncStreamProxy asyncStreamProxy,
+            String consumerArn,
+            Map<String, KinesisShardMetrics> shardMetricGroupMap,
+            Configuration configuration,
+            SubscriptionFactory subscriptionFactory,
+            ExecutorService eventProcessingExecutor,
+            ExecutorService subscriptionCallExecutor) {
         super(shardMetricGroupMap, configuration);
         this.asyncStreamProxy = asyncStreamProxy;
         this.consumerArn = consumerArn;
         this.subscriptionTimeout = 
configuration.get(EFO_CONSUMER_SUBSCRIPTION_TIMEOUT);
+        this.deregisterTimeout = 
configuration.get(EFO_DEREGISTER_CONSUMER_TIMEOUT);
+        this.subscriptionFactory = subscriptionFactory;
+        this.sharedShardSubscriptionExecutor = eventProcessingExecutor;
+        this.sharedSubscriptionCallExecutor = subscriptionCallExecutor;

Review Comment:
   Been thinking about this approach where `FanOutKinesisShardSplitReader` 
instantiates the ExecutorServices and passes them to 
`FanOutKinesisShardSubscription`. I have the following questions:
   
   1. Would a backpressured shard drown out other shards? Specifically, if 
there is a shard that has large backlog and is severely backpressured by 
downstream, we would expect to see that the 
`subscriptionEventProcessingExecutor`'s input queue have a large number of 
callables queued for the backpressured shard. This would mean that the quieter 
shards' callables will be backpressured as well as they will be place in the 
same queue. This was not the case in current design in that a shard is never 
backpressured by its neighbours' downstream. This may cause something akin to 
data loss in the case where idle watermarking strategy would drop the now late 
arriving records from non-backpressured, quieter shards.
   
   2. Are we breaking abstraction here by defining and passing around executor 
services? 
   
   ------
   
   I think instead of letting SplitReader instantiate ExecutorService, an 
alternative approach that we can consider is
   1. FanOutKinesisShardSubscription instantiate executor service e.g. 
`Executors.newSingleThreadExecutor()` and 
   2. FanOutKinesisShardSubscription manage the lifecycle of the executor 
service. 
   3. The executor service can be used for both handling of subscription event 
and also re-subscribing. 
   
   This will ensure that single backpressured shard will not impact throughput 
of other non-backpressured shard and also use of ExecutorService is fully 
encapsulated within FanOutKinesisShardSubscription. There is a change in the 
scaling here as we'd scale threads with 1 x number of shards instead of 1 to 2x 
number of processors, however I do not see big drawback here as we usually 
match or recommend number of cores(KPU) to number of shards.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to