ferenc-csaky commented on code in PR #247:
URL:
https://github.com/apache/flink-connector-aws/pull/247#discussion_r3294234898
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -337,56 +353,89 @@ public void onNext(SubscribeToShardEventStream
subscribeToShardEventStream) {
new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
- try {
+ synchronized (lockObject) {
+ if (shardSubscriber !=
FanOutShardSubscriber.this) {
+ LOG.warn(
+ "Ignoring late event for shard {}
from a disposed "
+ + "subscriber; it will be
re-delivered after "
+ + "reactivation.",
+ shardId);
+ return;
+ }
+
LOG.debug(
"Received event: {}, {}",
event.getClass().getSimpleName(),
event);
- eventQueue.put(event);
- if (event.continuationSequenceNumber() ==
null) {
- isShardEnd.set(true);
+ // Non-blocking offer. Under the prefetch
discipline maintained
+ // by onSubscribe (primes PREFETCH -
queue.size() requests) and
+ // pollAndRequestNext (issues request(1) after
each consumer
+ // drain), the invariant queue.size +
outstanding == PREFETCH
+ // holds in steady state, so the queue is
guaranteed to have
+ // room for each delivered event. If offer()
ever returns false
+ // it indicates a protocol / state invariant
violation (e.g. the
+ // server delivered an unrequested event) -
fail loud rather
+ // than block the Netty event loop. The
subscription will be
+ // reactivated from the previous
startingPosition (which has
+ // not yet been advanced below) and the server
will re-deliver
+ // this event.
+ if (!eventQueue.offer(event)) {
+ LOG.error(
+ "Event queue overflow for shard
{}; server delivered "
+ + "an unrequested event.
Failing subscription "
+ + "to recover.",
+ shardId);
+
+ if
(disposeIfActive(FanOutShardSubscriber.this)) {
+ setSubscriptionException(
+ new IOException(
+ "Event queue overflow
for shard "
+ + shardId
+ + "; server
delivered an "
+ + "unrequested
event."));
+ }
return;
}
- // Update the starting position in case we
have to recreate the
- // subscription
- startingPosition =
-
StartingPosition.continueFromSequenceNumber(
-
event.continuationSequenceNumber());
-
- // Replace the record just consumed in the
Queue
- requestRecords();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new KinesisStreamsSourceException(
- "Interrupted while adding Kinesis
record to internal buffer.",
- e);
+ if (event.continuationSequenceNumber() ==
null) {
+ startingPosition = null;
+ } else {
+ startingPosition =
+
StartingPosition.continueFromSequenceNumber(
+
event.continuationSequenceNumber());
+ }
}
}
});
}
@Override
public void onError(Throwable throwable) {
- if (!subscriptionException.compareAndSet(null, throwable)) {
- LOG.warn(
- "Another subscription exception has been queued,
ignoring subsequent exceptions",
- throwable);
+ synchronized (lockObject) {
+ if (!disposeIfActive(this)) {
+ return;
+ }
}
+ setSubscriptionException(throwable);
}
@Override
public void onComplete() {
- LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
- this.subscriptionState.set(SubscriptionState.COMPLETED);
+ synchronized (lockObject) {
+ LOG.info("Subscription complete - {} ({})", shardId,
consumerArn);
Review Comment:
I think `onComplete` also need to guard against a stale subscriber that
`onNext` and `onError` already has in place, to make sure a late `onComplete`
for an old subscriber as a result of some unhappy path cannot corrupt
subscription state. Something like the below should happen first inside
`synchronized`:
```java
if (shardSubscriber != this) {
LOG.warn("Ignoring late onComplete for shard {} from a disposed
subscriber.", shardId);
return;
}
```
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -70,22 +70,38 @@ public class FanOutKinesisShardSubscription {
TimeoutException.class,
IOException.class,
LimitExceededException.class);
+ private static final ScheduledExecutorService TIMEOUT_SCHEDULER =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ r -> {
+ Thread t = new Thread(r,
"subscription-timeout-scheduler");
+ t.setDaemon(true);
+ return t;
+ });
Review Comment:
I do not believe this `ExecutorService` should be placed as a `static` field
here. Making it a `daemon` will not block JVM exit, true, but we cannot monitor
it's lifecycle this way and cannot shut it down properly.
Instead I think we can add this as a simple `private final` field into
`FanOutKinesisShardSplitReader`, initialize it in the
`FanOutKinesisShardSplitReader` constructor, and then we can call a proper
shutdown on it in `FanOutKinesisShardSplitReader#close`. Then we can simply
pass it to `FanOutKinesisShardSubscription` when the reader initializes it, so
it will still be centralized.
After we initialize the `timeoutScheduler` in the reader ctor, I'd also add
`timeoutScheduler.setRemoveOnCancelPolicy(true)`.
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscriptionTest.java:
##########
Review Comment:
To spare some time and tokens, I already put together a test case to
validate, feel free to use it. This code already assumes that
`ScheduledThreadPoolExecutor` is managed by `FanOutKinesisShardSplitReader` and
only gets passed to `FanOutKinesisShardSubscription` as a ctor param, as I
suggested in my other comment.
```java
@Test
void onCompleteFromStaleSubscriberDoesNotDisposeActiveSubscriber()
throws Exception {
ScriptedProxy proxy = new ScriptedProxy();
FanOutKinesisShardSubscription subscription = newSubscription(proxy,
LONG_TIMEOUT);
subscription.activateSubscription();
ScriptedSubscription s1 = proxy.awaitSubscription();
s1.onSubscribeDelivered();
s1.fireHandlerError(new IOException("dispose s1"));
assertThat(subscription.nextEvent()).isNull();
ScriptedSubscription s2 = proxy.awaitSubscription();
s2.onSubscribeDelivered();
s1.onComplete();
waitShort();
assertThat(proxy.subscribeCallCount()).isEqualTo(2);
s2.deliverEvent(
SubscribeToShardEvent.builder()
.records(record("live"))
.continuationSequenceNumber("live-cont")
.build());
SubscribeToShardEvent got = pollEvent(subscription);
assertThat(got.continuationSequenceNumber()).isEqualTo("live-cont");
}
private FanOutKinesisShardSubscription newSubscription(AsyncStreamProxy
proxy) {
return newSubscription(proxy, DEFAULT_TIMEOUT);
}
private FanOutKinesisShardSubscription newSubscription(
AsyncStreamProxy proxy, Duration subscriptionTimeout) {
ScheduledThreadPoolExecutor timeoutScheduler = new
ScheduledThreadPoolExecutor(1);
timeoutScheduler.setRemoveOnCancelPolicy(true);
timeoutSchedulers.add(timeoutScheduler);
return new FanOutKinesisShardSubscription(
proxy,
CONSUMER_ARN,
SHARD_ID,
StartingPosition.fromStart(),
subscriptionTimeout,
timeoutScheduler);
}
```
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSubscription.java:
##########
@@ -337,56 +353,89 @@ public void onNext(SubscribeToShardEventStream
subscribeToShardEventStream) {
new SubscribeToShardResponseHandler.Visitor() {
@Override
public void visit(SubscribeToShardEvent event) {
- try {
+ synchronized (lockObject) {
+ if (shardSubscriber !=
FanOutShardSubscriber.this) {
+ LOG.warn(
+ "Ignoring late event for shard {}
from a disposed "
+ + "subscriber; it will be
re-delivered after "
+ + "reactivation.",
+ shardId);
+ return;
+ }
+
LOG.debug(
"Received event: {}, {}",
event.getClass().getSimpleName(),
event);
- eventQueue.put(event);
- if (event.continuationSequenceNumber() ==
null) {
- isShardEnd.set(true);
+ // Non-blocking offer. Under the prefetch
discipline maintained
+ // by onSubscribe (primes PREFETCH -
queue.size() requests) and
+ // pollAndRequestNext (issues request(1) after
each consumer
+ // drain), the invariant queue.size +
outstanding == PREFETCH
+ // holds in steady state, so the queue is
guaranteed to have
+ // room for each delivered event. If offer()
ever returns false
+ // it indicates a protocol / state invariant
violation (e.g. the
+ // server delivered an unrequested event) -
fail loud rather
+ // than block the Netty event loop. The
subscription will be
+ // reactivated from the previous
startingPosition (which has
+ // not yet been advanced below) and the server
will re-deliver
+ // this event.
+ if (!eventQueue.offer(event)) {
+ LOG.error(
+ "Event queue overflow for shard
{}; server delivered "
+ + "an unrequested event.
Failing subscription "
+ + "to recover.",
+ shardId);
+
+ if
(disposeIfActive(FanOutShardSubscriber.this)) {
+ setSubscriptionException(
+ new IOException(
+ "Event queue overflow
for shard "
+ + shardId
+ + "; server
delivered an "
+ + "unrequested
event."));
+ }
return;
}
- // Update the starting position in case we
have to recreate the
- // subscription
- startingPosition =
-
StartingPosition.continueFromSequenceNumber(
-
event.continuationSequenceNumber());
-
- // Replace the record just consumed in the
Queue
- requestRecords();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new KinesisStreamsSourceException(
- "Interrupted while adding Kinesis
record to internal buffer.",
- e);
+ if (event.continuationSequenceNumber() ==
null) {
+ startingPosition = null;
+ } else {
+ startingPosition =
+
StartingPosition.continueFromSequenceNumber(
+
event.continuationSequenceNumber());
+ }
}
}
});
}
@Override
public void onError(Throwable throwable) {
- if (!subscriptionException.compareAndSet(null, throwable)) {
- LOG.warn(
- "Another subscription exception has been queued,
ignoring subsequent exceptions",
- throwable);
+ synchronized (lockObject) {
+ if (!disposeIfActive(this)) {
+ return;
+ }
}
+ setSubscriptionException(throwable);
}
@Override
public void onComplete() {
- LOG.info("Subscription complete - {} ({})", shardId, consumerArn);
- this.subscriptionState.set(SubscriptionState.COMPLETED);
+ synchronized (lockObject) {
+ LOG.info("Subscription complete - {} ({})", shardId,
consumerArn);
+ shardSubscriber = null;
+ }
+ activateSubscription();
}
}
- /** States that the {@code FanOutShardSubscriber} may be in. */
- private enum SubscriptionState {
- NOT_STARTED,
- SUBSCRIBED,
- COMPLETED
+ public void close() {
+ synchronized (lockObject) {
+ closed = true;
+ if (shardSubscriber != null) {
Review Comment:
I'd put `cancelTimeoutFuture();` right after `closed = true` to make sure it
will happen.
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/fanout/FanOutKinesisShardSplitReader.java:
##########
Review Comment:
Not part of this change, but I think we should add `subscription.close()`
after this remove, cause in the `close()` method if this class, it will only
close the subscriptions that map still contains.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]