aws-nageshvh commented on code in PR #208:
URL: 
https://github.com/apache/flink-connector-aws/pull/208#discussion_r2098916975


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -334,4 +368,87 @@ public void onComplete() {
             activateSubscription();
         }
     }
+
+    /**
+     * Submits an event processing task to the executor service.
+     * This method encapsulates the task submission logic and error handling.
+     *
+     * @param event The subscription event to process
+     */
+    private void submitEventProcessingTask(SubscribeToShardEvent event) {
+        try {
+            subscriptionEventProcessingExecutor.execute(() -> {
+                synchronized (subscriptionEventProcessingLock) {
+                    try {
+                        processSubscriptionEvent(event);
+                    } catch (Exception e) {
+                        // For critical path operations, propagate exceptions 
to cause a Flink job restart
+                        LOG.error("Error processing subscription event", e);
+                        // Propagate the exception to the subscription 
exception handler
+                        terminateSubscription(new 
KinesisStreamsSourceException(
+                            "Error processing subscription event", e));
+                    }
+                }
+            });
+        } catch (Exception e) {
+            // This should never happen with an unbounded queue, but if it 
does,
+            // we need to propagate the exception to cause a Flink job restart
+            LOG.error("Error submitting subscription event task", e);
+            throw new KinesisStreamsSourceException(
+                "Error submitting subscription event task", e);
+        }
+    }
+
+    /**
+     * Processes a subscription event in a separate thread from the shared 
executor pool.
+     * This method encapsulates the critical path operations:
+     * 1. Putting the event in the blocking queue (which has a capacity of 2)
+     * 2. Updating the starting position for recovery after failover
+     * 3. Requesting more records
+     *
+     * <p>These operations are executed sequentially for each shard to ensure 
thread safety
+     * and prevent race conditions. The bounded nature of the event queue 
(capacity 2) combined
+     * with only requesting more records after processing an event provides 
natural flow control,
+     * effectively limiting the number of tasks in the executor's queue.
+     *
+     * <p>This method is made public for testing purposes.
+     *
+     * @param event The subscription event to process
+     */
+    public void processSubscriptionEvent(SubscribeToShardEvent event) {

Review Comment:
   Not needed, removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to