ferenc-csaky commented on code in PR #247:
URL: 
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3294234898


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -337,56 +353,89 @@ public void onNext(SubscribeToShardEventStream 
subscribeToShardEventStream) {
                     new SubscribeToShardResponseHandler.Visitor() {
                         @Override
                         public void visit(SubscribeToShardEvent event) {
-                            try {
+                            synchronized (lockObject) {
+                                if (shardSubscriber != 
FanOutShardSubscriber.this) {
+                                    LOG.warn(
+                                            "Ignoring late event for shard {} 
from a disposed "
+                                                    + "subscriber; it will be 
re-delivered after "
+                                                    + "reactivation.",
+                                            shardId);
+                                    return;
+                                }
+
                                 LOG.debug(
                                         "Received event: {}, {}",
                                         event.getClass().getSimpleName(),
                                         event);
-                                eventQueue.put(event);
 
-                                if (event.continuationSequenceNumber() == 
null) {
-                                    isShardEnd.set(true);
+                                // Non-blocking offer. Under the prefetch 
discipline maintained
+                                // by onSubscribe (primes PREFETCH - 
queue.size() requests) and
+                                // pollAndRequestNext (issues request(1) after 
each consumer
+                                // drain), the invariant queue.size + 
outstanding == PREFETCH
+                                // holds in steady state, so the queue is 
guaranteed to have
+                                // room for each delivered event. If offer() 
ever returns false
+                                // it indicates a protocol / state invariant 
violation (e.g. the
+                                // server delivered an unrequested event) - 
fail loud rather
+                                // than block the Netty event loop. The 
subscription will be
+                                // reactivated from the previous 
startingPosition (which has
+                                // not yet been advanced below) and the server 
will re-deliver
+                                // this event.
+                                if (!eventQueue.offer(event)) {
+                                    LOG.error(
+                                            "Event queue overflow for shard 
{}; server delivered "
+                                                    + "an unrequested event. 
Failing subscription "
+                                                    + "to recover.",
+                                            shardId);
+
+                                    if 
(disposeIfActive(FanOutShardSubscriber.this)) {
+                                        setSubscriptionException(
+                                                new IOException(
+                                                        "Event queue overflow 
for shard "
+                                                                + shardId
+                                                                + "; server 
delivered an "
+                                                                + "unrequested 
event."));
+                                    }
                                     return;
                                 }
 
-                                // Update the starting position in case we 
have to recreate the
-                                // subscription
-                                startingPosition =
-                                        
StartingPosition.continueFromSequenceNumber(
-                                                
event.continuationSequenceNumber());
-
-                                // Replace the record just consumed in the 
Queue
-                                requestRecords();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                throw new KinesisStreamsSourceException(
-                                        "Interrupted while adding Kinesis 
record to internal buffer.",
-                                        e);
+                                if (event.continuationSequenceNumber() == 
null) {
+                                    startingPosition = null;
+                                } else {
+                                    startingPosition =
+                                            
StartingPosition.continueFromSequenceNumber(
+                                                    
event.continuationSequenceNumber());
+                                }
                             }
                         }
                     });
         }
 
         @Override
         public void onError(Throwable throwable) {
-            if (!subscriptionException.compareAndSet(null, throwable)) {
-                LOG.warn(
-                        "Another subscription exception has been queued, 
ignoring subsequent exceptions",
-                        throwable);
+            synchronized (lockObject) {
+                if (!disposeIfActive(this)) {
+                    return;
+                }
             }
+            setSubscriptionException(throwable);
         }
 
         @Override
         public void onComplete() {
-            LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
-            this.subscriptionState.set(SubscriptionState.COMPLETED);
+            synchronized (lockObject) {
+                LOG.info("Subscription complete - {} ({})", shardId, 
consumerArn);

Review Comment:
   I think `onComplete` also need to guard against a stale subscriber that 
`onNext` and `onError` already has in place, to make sure a late `onComplete` 
for an old subscriber as a result of some unhappy path cannot corrupt 
subscription state. Something like the below should happen first inside 
`synchronized`:
   ```java
   if (shardSubscriber != this) {
       LOG.warn("Ignoring late onComplete for shard {} from a disposed 
subscriber.", shardId);
       return;
   }
   ```



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -70,22 +70,38 @@ public class FanOutKinesisShardSubscription {
                     TimeoutException.class,
                     IOException.class,
                     LimitExceededException.class);
+    private static final ScheduledExecutorService TIMEOUT_SCHEDULER =
+            new ScheduledThreadPoolExecutor(
+                    1,
+                    r -> {
+                        Thread t = new Thread(r, 
"subscription-timeout-scheduler");
+                        t.setDaemon(true);
+                        return t;
+                    });

Review Comment:
   I do not believe this `ExecutorService` should be placed as a `static` field 
here. Making it a `daemon` will not block JVM exit, true, but we cannot monitor 
it's lifecycle this way and cannot shut it down properly.
   
   Instead I think we can add this as a simple `private final` field into 
`FanOutKinesisShardSplitReader`, initialize it in the 
`FanOutKinesisShardSplitReader` constructor, and then we can call a proper 
shutdown on it in `FanOutKinesisShardSplitReader#close`. Then we can simply 
pass it to `FanOutKinesisShardSubscription` when the reader initializes it, so 
it will still be centralized.
   
   After we initialize the `timeoutScheduler` in the reader ctor, I'd also add 
`timeoutScheduler.setRemoveOnCancelPolicy(true)`.



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java:
##########


Review Comment:
   To spare some time and tokens, I already put together a test case to 
validate, feel free to use it. This code already assumes that 
`ScheduledThreadPoolExecutor` is managed by `FanOutKinesisShardSplitReader` and 
only gets passed to `FanOutKinesisShardSubscription` as a ctor param, as I 
suggested in my other comment.
   ```java
       @Test
       void onCompleteFromStaleSubscriberDoesNotDisposeActiveSubscriber() 
throws Exception {
           ScriptedProxy proxy = new ScriptedProxy();
           FanOutKinesisShardSubscription subscription = newSubscription(proxy, 
LONG_TIMEOUT);
   
           subscription.activateSubscription();
           ScriptedSubscription s1 = proxy.awaitSubscription();
           s1.onSubscribeDelivered();
   
           s1.fireHandlerError(new IOException("dispose s1"));
           assertThat(subscription.nextEvent()).isNull();
           ScriptedSubscription s2 = proxy.awaitSubscription();
           s2.onSubscribeDelivered();
   
           s1.onComplete();
   
           waitShort();
           assertThat(proxy.subscribeCallCount()).isEqualTo(2);
   
           s2.deliverEvent(
                   SubscribeToShardEvent.builder()
                           .records(record("live"))
                           .continuationSequenceNumber("live-cont")
                           .build());
           SubscribeToShardEvent got = pollEvent(subscription);
           assertThat(got.continuationSequenceNumber()).isEqualTo("live-cont");
       }
   
       private FanOutKinesisShardSubscription newSubscription(AsyncStreamProxy 
proxy) {
           return newSubscription(proxy, DEFAULT_TIMEOUT);
       }
   
       private FanOutKinesisShardSubscription newSubscription(
               AsyncStreamProxy proxy, Duration subscriptionTimeout) {
           ScheduledThreadPoolExecutor timeoutScheduler = new 
ScheduledThreadPoolExecutor(1);
           timeoutScheduler.setRemoveOnCancelPolicy(true);
           timeoutSchedulers.add(timeoutScheduler);
           return new FanOutKinesisShardSubscription(
                   proxy,
                   CONSUMER_ARN,
                   SHARD_ID,
                   StartingPosition.fromStart(),
                   subscriptionTimeout,
                   timeoutScheduler);
       }
   ```



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -337,56 +353,89 @@ public void onNext(SubscribeToShardEventStream 
subscribeToShardEventStream) {
                     new SubscribeToShardResponseHandler.Visitor() {
                         @Override
                         public void visit(SubscribeToShardEvent event) {
-                            try {
+                            synchronized (lockObject) {
+                                if (shardSubscriber != 
FanOutShardSubscriber.this) {
+                                    LOG.warn(
+                                            "Ignoring late event for shard {} 
from a disposed "
+                                                    + "subscriber; it will be 
re-delivered after "
+                                                    + "reactivation.",
+                                            shardId);
+                                    return;
+                                }
+
                                 LOG.debug(
                                         "Received event: {}, {}",
                                         event.getClass().getSimpleName(),
                                         event);
-                                eventQueue.put(event);
 
-                                if (event.continuationSequenceNumber() == 
null) {
-                                    isShardEnd.set(true);
+                                // Non-blocking offer. Under the prefetch 
discipline maintained
+                                // by onSubscribe (primes PREFETCH - 
queue.size() requests) and
+                                // pollAndRequestNext (issues request(1) after 
each consumer
+                                // drain), the invariant queue.size + 
outstanding == PREFETCH
+                                // holds in steady state, so the queue is 
guaranteed to have
+                                // room for each delivered event. If offer() 
ever returns false
+                                // it indicates a protocol / state invariant 
violation (e.g. the
+                                // server delivered an unrequested event) - 
fail loud rather
+                                // than block the Netty event loop. The 
subscription will be
+                                // reactivated from the previous 
startingPosition (which has
+                                // not yet been advanced below) and the server 
will re-deliver
+                                // this event.
+                                if (!eventQueue.offer(event)) {
+                                    LOG.error(
+                                            "Event queue overflow for shard 
{}; server delivered "
+                                                    + "an unrequested event. 
Failing subscription "
+                                                    + "to recover.",
+                                            shardId);
+
+                                    if 
(disposeIfActive(FanOutShardSubscriber.this)) {
+                                        setSubscriptionException(
+                                                new IOException(
+                                                        "Event queue overflow 
for shard "
+                                                                + shardId
+                                                                + "; server 
delivered an "
+                                                                + "unrequested 
event."));
+                                    }
                                     return;
                                 }
 
-                                // Update the starting position in case we 
have to recreate the
-                                // subscription
-                                startingPosition =
-                                        
StartingPosition.continueFromSequenceNumber(
-                                                
event.continuationSequenceNumber());
-
-                                // Replace the record just consumed in the 
Queue
-                                requestRecords();
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                throw new KinesisStreamsSourceException(
-                                        "Interrupted while adding Kinesis 
record to internal buffer.",
-                                        e);
+                                if (event.continuationSequenceNumber() == 
null) {
+                                    startingPosition = null;
+                                } else {
+                                    startingPosition =
+                                            
StartingPosition.continueFromSequenceNumber(
+                                                    
event.continuationSequenceNumber());
+                                }
                             }
                         }
                     });
         }
 
         @Override
         public void onError(Throwable throwable) {
-            if (!subscriptionException.compareAndSet(null, throwable)) {
-                LOG.warn(
-                        "Another subscription exception has been queued, 
ignoring subsequent exceptions",
-                        throwable);
+            synchronized (lockObject) {
+                if (!disposeIfActive(this)) {
+                    return;
+                }
             }
+            setSubscriptionException(throwable);
         }
 
         @Override
         public void onComplete() {
-            LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
-            this.subscriptionState.set(SubscriptionState.COMPLETED);
+            synchronized (lockObject) {
+                LOG.info("Subscription complete - {} ({})", shardId, 
consumerArn);
+                shardSubscriber = null;
+            }
+            activateSubscription();
         }
     }
 
-    /** States that the {@code FanOutShardSubscriber} may be in. */
-    private enum SubscriptionState {
-        NOT_STARTED,
-        SUBSCRIBED,
-        COMPLETED
+    public void close() {
+        synchronized (lockObject) {
+            closed = true;
+            if (shardSubscriber != null) {

Review Comment:
   I'd put `cancelTimeoutFuture();` right after `closed = true` to make sure it 
will happen.



##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java:
##########


Review Comment:
   Not part of this change, but I think we should add `subscription.close()` 
after this remove, cause in the `close()` method if this class, it will only 
close the subscriptions that map still contains.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to