aws-nageshvh commented on code in PR #208: URL: https://github.com/apache/flink-connector-aws/pull/208#discussion_r2098916975
########## flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java: ########## @@ -334,4 +368,87 @@ public void onComplete() { activateSubscription(); } } + + /** + * Submits an event processing task to the executor service. + * This method encapsulates the task submission logic and error handling. + * + * @param event The subscription event to process + */ + private void submitEventProcessingTask(SubscribeToShardEvent event) { + try { + subscriptionEventProcessingExecutor.execute(() -> { + synchronized (subscriptionEventProcessingLock) { + try { + processSubscriptionEvent(event); + } catch (Exception e) { + // For critical path operations, propagate exceptions to cause a Flink job restart + LOG.error("Error processing subscription event", e); + // Propagate the exception to the subscription exception handler + terminateSubscription(new KinesisStreamsSourceException( + "Error processing subscription event", e)); + } + } + }); + } catch (Exception e) { + // This should never happen with an unbounded queue, but if it does, + // we need to propagate the exception to cause a Flink job restart + LOG.error("Error submitting subscription event task", e); + throw new KinesisStreamsSourceException( + "Error submitting subscription event task", e); + } + } + + /** + * Processes a subscription event in a separate thread from the shared executor pool. + * This method encapsulates the critical path operations: + * 1. Putting the event in the blocking queue (which has a capacity of 2) + * 2. Updating the starting position for recovery after failover + * 3. Requesting more records + * + * <p>These operations are executed sequentially for each shard to ensure thread safety + * and prevent race conditions. The bounded nature of the event queue (capacity 2) combined + * with only requesting more records after processing an event provides natural flow control, + * effectively limiting the number of tasks in the executor's queue. + * + * <p>This method is made public for testing purposes. + * + * @param event The subscription event to process + */ + public void processSubscriptionEvent(SubscribeToShardEvent event) { Review Comment: Not needed, removed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org