tzulitai commented on a change in pull request #13102:
URL: https://github.com/apache/flink/pull/13102#discussion_r489964177



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
##########
@@ -102,6 +107,11 @@ public void run() {
                try {
                        while (isRunning()) {
                                final RecordPublisherRunResult result = 
recordPublisher.run(batch -> {
+                                       if 
(!batch.getDeaggregatedRecords().isEmpty()) {

Review comment:
       Could you briefly explain the reason behind adding this log?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher;
+import 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException;
+import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
+import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.EncryptionType;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE;
+import static 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE;
+import static 
software.amazon.awssdk.services.kinesis.model.StartingPosition.builder;
+
+/**
+ * A {@link RecordPublisher} that will read and forward records from Kinesis 
using EFO, to the subscriber.
+ * Records are consumed via Enhanced Fan Out subscriptions using 
SubscribeToShard API.
+ */
+@Internal
+public class FanOutRecordPublisher implements RecordPublisher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FanOutRecordPublisher.class);
+
+       private final FullJitterBackoff backoff;
+
+       private final String consumerArn;
+
+       private final KinesisProxyV2Interface kinesisProxy;
+
+       private final StreamShardHandle subscribedShard;
+
+       private final FanOutRecordPublisherConfiguration configuration;
+
+       /** The current attempt in the case of subsequent recoverable errors. */
+       private int attempt = 0;
+
+       private StartingPosition nextStartingPosition;
+
+       /**
+        * Instantiate a new FanOutRecordPublisher.
+        * Consumes data from KDS using EFO SubscribeToShard over AWS SDK V2.x
+        *
+        * @param startingPosition the position in the shard to start consuming 
from
+        * @param consumerArn the consumer ARN of the stream consumer
+        * @param subscribedShard the shard to consumer from
+        * @param kinesisProxy the proxy used to talk to Kinesis services
+        * @param configuration the record publisher configuration
+        */
+       public FanOutRecordPublisher(
+                       final StartingPosition startingPosition,
+                       final String consumerArn,
+                       final StreamShardHandle subscribedShard,
+                       final KinesisProxyV2Interface kinesisProxy,
+                       final FanOutRecordPublisherConfiguration configuration,
+                       final FullJitterBackoff backoff) {
+               this.nextStartingPosition = 
Preconditions.checkNotNull(startingPosition);
+               this.consumerArn = Preconditions.checkNotNull(consumerArn);
+               this.subscribedShard = 
Preconditions.checkNotNull(subscribedShard);
+               this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy);
+               this.configuration = Preconditions.checkNotNull(configuration);
+               this.backoff = Preconditions.checkNotNull(backoff);
+       }
+
+       @Override
+       public RecordPublisherRunResult run(final RecordBatchConsumer 
recordConsumer) throws InterruptedException {
+               LOG.info("Running fan out record publisher on {}::{} from {} - 
{}",
+                       subscribedShard.getStreamName(),
+                       subscribedShard.getShard().getShardId(),
+                       nextStartingPosition.getShardIteratorType(),
+                       nextStartingPosition.getStartingMarker());
+
+               Consumer<SubscribeToShardEvent> eventConsumer = event -> {
+                       RecordBatch recordBatch = new 
RecordBatch(toSdkV1Records(event.records()), subscribedShard, 
event.millisBehindLatest());
+                       SequenceNumber sequenceNumber = 
recordConsumer.accept(recordBatch);
+                       nextStartingPosition = 
StartingPosition.continueFromSequenceNumber(sequenceNumber);
+               };
+
+               RecordPublisherRunResult result = runWithBackoff(eventConsumer);
+
+               LOG.info("Subscription expired {}::{}, with status {}",
+                       subscribedShard.getStreamName(),
+                       subscribedShard.getShard().getShardId(),
+                       result);
+
+               return result;
+       }
+
+       /**
+        * Runs the record publisher, will sleep for configuration computed 
jitter period in the case of certain exceptions.
+        * Unrecoverable exceptions are thrown to terminate the application.
+        *
+        * @param eventConsumer the consumer to pass events to
+        * @return {@code COMPLETE} if the shard is complete and this shard 
consumer should exit
+        * @throws InterruptedException
+        */
+       private RecordPublisherRunResult runWithBackoff(
+                       final Consumer<SubscribeToShardEvent> eventConsumer) 
throws InterruptedException {
+               FanOutShardSubscriber fanOutShardSubscriber = new 
FanOutShardSubscriber(
+                       consumerArn,
+                       subscribedShard.getShard().getShardId(),
+                       kinesisProxy);
+               boolean complete;
+
+               try {
+                       complete = 
fanOutShardSubscriber.subscribeToShardAndConsumeRecords(
+                               toSdkV2StartingPosition(nextStartingPosition), 
eventConsumer);
+                       attempt = 0;
+               } catch (FanOutSubscriberException ex) {
+                       // We have received an error from the network layer
+                       // This can be due to limits being exceeded, network 
timeouts, etc
+                       // We should backoff, reacquire a subscription and try 
again
+                       if (ex.getCause() instanceof ResourceNotFoundException) 
{
+                               LOG.warn("Received ResourceNotFoundException. 
Either the shard does not exist, or the stream subscriber has been 
deregistered." +
+                                       "Marking this shard as complete {} 
({})", subscribedShard.getShard().getShardId(), consumerArn);
+
+                               return COMPLETE;
+                       }
+
+                       if (attempt == 
configuration.getSubscribeToShardMaxRetries()) {
+                               throw new RuntimeException("Maximum reties 
exceeded for SubscribeToShard. " +
+                                       "Failed " + 
configuration.getSubscribeToShardMaxRetries() + " times.");
+                       }
+
+                       backoff(ex);
+                       return INCOMPLETE;
+               }
+
+               return complete ? COMPLETE : INCOMPLETE;
+       }
+
+       private void backoff(final Throwable ex) throws InterruptedException {
+               long backoffMillis = backoff.calculateFullJitterBackoff(
+                       configuration.getSubscribeToShardBaseBackoffMillis(),
+                       configuration.getSubscribeToShardMaxBackoffMillis(),
+                       configuration.getSubscribeToShardExpConstant(),
+                       ++attempt);

Review comment:
       nitpick:
   What do you think about moving the attempt increment into the `catch` block 
in `runWithBackoff`?
   That seems like a more appropriate place flow-wise.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription 
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink 
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do 
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- |                      | 
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | 
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ *      Three types of message are passed over the queue for inter-thread 
communication:
+ *      <ul>
+ *             <li>{@link SubscriptionNextEvent} - passes data from the 
network to the consumer</li>
+ *             <li>{@link SubscriptionCompleteEvent} - indicates a 
subscription has expired</li>
+ *             <li>{@link SubscriptionErrorEvent} - passes an exception from 
the network to the consumer</li>
+ *      </ul>
+ * </p>
+ * <p>
+ *   The blocking queue has a maximum capacity of 1 record.
+ *   This allows backpressure to be applied closer to the network stack and 
results in record prefetch.
+ *   At maximum capacity we will have three {@link SubscribeToShardEvent} in 
memory (per instance of this class):
+ *   <ul>
+ *      <li>1 event being processed by the consumer</li>
+ *      <li>1 event enqueued in the blocking queue</li>
+ *      <li>1 event being added to the queue by the network (blocking)</li>
+ *   </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {

Review comment:
       Could you address the various warnings in this class?
   Such as `Access can be package-private` and missing serialVersionUID etc.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
##########
@@ -277,7 +278,9 @@ public static void 
setAwsClientConfigProperties(ClientConfiguration config,
         * @return the starting position
         */
        public static StartingPosition getStartingPosition(final SequenceNumber 
sequenceNumber, final Properties configProps) {
-               if 
(SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) {
+               if (sequenceNumber.equals(SENTINEL_LATEST_SEQUENCE_NUM.get())) {
+                       return StartingPosition.fromTimestamp(new Date());

Review comment:
       Since this a pretty delicate thing, could you add a comment here on why 
we do this "special" translation, so that its easier for future generations to 
understand the reasoning here :)

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription 
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink 
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do 
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- |                      | 
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | 
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ *      Three types of message are passed over the queue for inter-thread 
communication:
+ *      <ul>
+ *             <li>{@link SubscriptionNextEvent} - passes data from the 
network to the consumer</li>
+ *             <li>{@link SubscriptionCompleteEvent} - indicates a 
subscription has expired</li>
+ *             <li>{@link SubscriptionErrorEvent} - passes an exception from 
the network to the consumer</li>
+ *      </ul>
+ * </p>
+ * <p>
+ *   The blocking queue has a maximum capacity of 1 record.
+ *   This allows backpressure to be applied closer to the network stack and 
results in record prefetch.
+ *   At maximum capacity we will have three {@link SubscribeToShardEvent} in 
memory (per instance of this class):
+ *   <ul>
+ *      <li>1 event being processed by the consumer</li>
+ *      <li>1 event enqueued in the blocking queue</li>
+ *      <li>1 event being added to the queue by the network (blocking)</li>
+ *   </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+       /**
+        * The maximum capacity of the queue between the network and 
consumption thread.
+        * The queue is mainly used to isolate networking from consumption such 
that errors do not bubble up.
+        * This queue also acts as a buffer resulting in a record prefetch and 
reduced latency.
+        */
+       private static final int QUEUE_CAPACITY = 1;
+
+       /**
+        * Read timeout will occur after 30 seconds, a sanity timeout to 
prevent lockup in unexpected error states.
+        * If the consumer does not receive a new event within the 
DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+        * Under normal conditions heartbeat events are received even when 
there are no records to consume, so it is not
+        * expected for this timeout to occur under normal conditions.
+        */
+       private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+       /** The time to wait when enqueuing events to allow complete/error 
events to "push in front" of data . */
+       private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+       private final BlockingQueue<FanOutSubscriptionEvent> queue = new 
LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+       private final KinesisProxyV2Interface kinesis;
+
+       private final String consumerArn;
+
+       private final String shardId;
+
+       /**
+        * Create a new Fan Out subscriber.
+        *
+        * @param consumerArn the stream consumer ARN
+        * @param shardId the shard ID to subscribe to
+        * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+        */
+       public FanOutShardSubscriber(final String consumerArn, final String 
shardId, final KinesisProxyV2Interface kinesis) {
+               this.kinesis = Preconditions.checkNotNull(kinesis);
+               this.consumerArn = Preconditions.checkNotNull(consumerArn);
+               this.shardId = Preconditions.checkNotNull(shardId);
+       }
+
+       /**
+        * Obtains a subscription to the shard from the specified {@code 
startingPosition}.
+        * {@link SubscribeToShardEvent} received from KDS are delivered to the 
given {@code eventConsumer}.
+        * Returns false if there are records left to consume from the shard.
+        *
+        * @param startingPosition the position in the stream in which to start 
receiving records
+        * @param eventConsumer the consumer to deliver received events to
+        * @return true if there are no more messages (complete), false if a 
subsequent subscription should be obtained
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        * @throws InterruptedException
+        */
+       public boolean subscribeToShardAndConsumeRecords(
+                       final StartingPosition startingPosition,
+                       final Consumer<SubscribeToShardEvent> eventConsumer) 
throws InterruptedException, FanOutSubscriberException {
+               LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+               try {
+                       openSubscriptionToShard(startingPosition);
+               } catch (FanOutSubscriberException ex) {
+                       // The only exception that should cause a failure is a 
ResourceNotFoundException
+                       // Rethrow the exception to trigger the application to 
terminate
+                       if (ex.getCause() instanceof ResourceNotFoundException) 
{
+                               throw (ResourceNotFoundException) ex.getCause();
+                       }
+
+                       throw ex;
+               }
+
+               return consumeAllRecordsFromKinesisShard(eventConsumer);
+       }
+
+       /**
+        * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a 
subscription.
+        * In the event a non-recoverable error occurs this method will rethrow 
the exception.
+        * Once the subscription is acquired the client signals to the producer 
that we are ready to receive records.
+        *
+        * @param startingPosition the position in which to start consuming from
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        */
+       private void openSubscriptionToShard(final StartingPosition 
startingPosition) throws FanOutSubscriberException, InterruptedException {
+               SubscribeToShardRequest request = 
SubscribeToShardRequest.builder()
+                       .consumerARN(consumerArn)
+                       .shardId(shardId)
+                       .startingPosition(startingPosition)
+                       .build();
+
+               AtomicReference<Throwable> exception = new AtomicReference<>();
+               CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+               FanOutShardSubscription subscription = new 
FanOutShardSubscription(waitForSubscriptionLatch);
+
+               SubscribeToShardResponseHandler responseHandler = 
SubscribeToShardResponseHandler
+                       .builder()
+                       .onError(e -> {
+                               // Errors that occur while trying to acquire a 
subscription are only thrown from here
+                               // Errors that occur during the subscription 
are surfaced here and to the FanOutShardSubscription
+                               //      (errors are ignored here once the 
subscription is open)
+                               if (waitForSubscriptionLatch.getCount() > 0) {
+                                       exception.set(e);
+                                       waitForSubscriptionLatch.countDown();
+                               }
+                       })
+                       .subscriber(() -> subscription)
+                       .build();
+
+               kinesis.subscribeToShard(request, responseHandler);
+
+               waitForSubscriptionLatch.await();
+
+               Throwable throwable = exception.get();
+               if (throwable != null) {
+                       handleError(throwable);
+               }
+
+               LOG.debug("Acquired subscription - {} ({})", shardId, 
consumerArn);
+
+               // Request the first record to kick off consumption
+               // Following requests are made by the FanOutShardSubscription 
on the netty thread
+               subscription.requestRecord();
+       }
+
+       /**
+        * Update the reference to the latest networking error in this object.
+        * Parent caller can interrogate to decide how to handle error.
+        *
+        * @param throwable the exception that has occurred
+        */
+       private void handleError(final Throwable throwable) throws 
FanOutSubscriberException {
+               Throwable cause;
+               if (throwable instanceof CompletionException || throwable 
instanceof ExecutionException) {
+                       cause = throwable.getCause();
+               } else {
+                       cause = throwable;
+               }
+
+               LOG.warn("Error occurred on EFO subscription: {} - ({}).  {} 
({})",
+                       throwable.getClass().getName(), throwable.getMessage(), 
shardId, consumerArn, cause);
+
+               throw new FanOutSubscriberException(cause);
+       }
+
+       /**
+        * Once the subscription is open, records will be delivered to the 
{@link BlockingQueue}.
+        * Queue capacity is hardcoded to 1 record, the queue is used solely to 
separate consumption and processing.
+        * However, this buffer will result in latency reduction as records are 
pre-fetched as a result.
+        * This method will poll the queue and exit under any of these 
conditions:
+        * - {@code continuationSequenceNumber} is {@code null}, indicating the 
shard is complete
+        * - The subscription expires, indicated by a {@link 
SubscriptionCompleteEvent}
+        * - There is an error while consuming records, indicated by a {@link 
SubscriptionErrorEvent}
+        *
+        * @param eventConsumer the event consumer to deliver records to
+        * @return true if there are no more messages (complete), false if a 
subsequent subscription should be obtained
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        * @throws InterruptedException
+        */
+       private boolean consumeAllRecordsFromKinesisShard(
+                       final Consumer<SubscribeToShardEvent> eventConsumer) 
throws InterruptedException, FanOutSubscriberException {
+               String continuationSequenceNumber = null;
+
+               do {
+                       // Read timeout will occur after 30 seconds, add a 
sanity timeout here to prevent lockup
+                       FanOutSubscriptionEvent subscriptionEvent = 
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+                       if (subscriptionEvent == null) {
+                               LOG.debug("Timed out polling events from 
network, reacquiring subscription - {} ({})", shardId, consumerArn);
+                               return false;
+                       } else if (subscriptionEvent.isSubscribeToShardEvent()) 
{
+                               SubscribeToShardEvent event = 
subscriptionEvent.getSubscribeToShardEvent();
+                               continuationSequenceNumber = 
event.continuationSequenceNumber();
+                               if (!event.records().isEmpty()) {
+                                       eventConsumer.accept(event);
+                               }
+                       } else if (subscriptionEvent.isSubscriptionComplete()) {
+                               // The subscription is complete, but the shard 
might not be, so we return incomplete
+                               return false;
+                       } else {
+                               handleError(subscriptionEvent.getThrowable());
+                               return false;
+                       }
+               } while (continuationSequenceNumber != null);
+
+               return true;
+       }
+
+       /**
+        * The {@link FanOutShardSubscription} subscribes to the events coming 
from KDS and adds them to the {@link BlockingQueue}.
+        * Backpressure is applied based on the maximum capacity of the queue.
+        * The {@link Subscriber} methods of this class are invoked by a thread 
from the {@link KinesisAsyncClient}.
+        */
+       private class FanOutShardSubscription implements 
Subscriber<SubscribeToShardEventStream> {
+
+               private Subscription subscription;
+
+               private boolean cancelled = false;
+
+               private final CountDownLatch waitForSubscriptionLatch;
+
+               private final Object lockObject = new Object();
+
+               private FanOutShardSubscription(final CountDownLatch 
waitForSubscriptionLatch) {
+                       this.waitForSubscriptionLatch = 
waitForSubscriptionLatch;
+               }
+
+               /**
+                * Flag to the producer that we are ready to receive more 
events.
+                */
+               public void requestRecord() {
+                       if (!cancelled) {
+                               LOG.debug("Requesting more records from EFO 
subscription - {} ({})", shardId, consumerArn);
+                               subscription.request(1);
+                       }
+               }
+
+               @Override
+               public void onSubscribe(Subscription subscription) {
+                       this.subscription = subscription;
+                       waitForSubscriptionLatch.countDown();
+               }
+
+               @Override
+               public void onNext(SubscribeToShardEventStream 
subscribeToShardEventStream) {
+                       subscribeToShardEventStream.accept(new 
SubscribeToShardResponseHandler.Visitor() {
+                               @Override
+                               public void visit(SubscribeToShardEvent event) {
+                                       synchronized (lockObject) {
+                                               if (enqueueEventWithRetry(new 
SubscriptionNextEvent(event))) {
+                                                       requestRecord();
+                                               }
+                                       }
+                               }
+                       });
+               }
+
+               @Override
+               public void onError(Throwable throwable) {
+                       LOG.debug("Error occurred on EFO subscription: {} - 
({}).  {} ({})",
+                               throwable.getClass().getName(), 
throwable.getMessage(), shardId, consumerArn);
+
+                       // Cancel the subscription to signal the onNext to stop 
queuing and requesting data
+                       cancelSubscription();
+
+                       synchronized (lockObject) {
+                               // Empty the queue and add a poison pill to 
terminate this subscriber
+                               // The synchronized block ensures that new data 
is not written in the meantime
+                               queue.clear();
+                               enqueueEvent(new 
SubscriptionErrorEvent(throwable));
+                       }
+               }
+
+               @Override
+               public void onComplete() {
+                       LOG.debug("EFO subscription complete - {} ({})", 
shardId, consumerArn);
+                       enqueueEvent(new SubscriptionCompleteEvent());
+               }
+
+               private void cancelSubscription() {
+                       if (!cancelled) {
+                               cancelled = true;
+                               subscription.cancel();
+                       }
+               }
+
+               /**
+                * Continuously attempt to enqueue an event until successful or 
the subscription is cancelled (due to error).
+                * When backpressure applied by the consumer exceeds 30s for a 
single batch, a ReadTimeoutException will be
+                * thrown by the network stack. This will result in the 
subscription be cancelled and this event being discarded.
+                * The subscription would subsequently be reacquired and the 
discarded data would be fetched again.
+                *
+                * @param event the event to enqueue
+                * @return true if the event was successfully enqueued.
+                */
+               private boolean enqueueEventWithRetry(final 
FanOutSubscriptionEvent event) {
+                       boolean result = false;
+                       do {
+                               if (cancelled) {
+                                       break;
+                               }
+
+                               synchronized (lockObject) {
+                                       result = enqueueEvent(event);
+                               }
+                       } while (!result);
+
+                       return result;
+               }
+
+               /**
+                * Offers the event to the queue.
+                *
+                * @param event the event to enqueue
+                * @return true if the event was successfully enqueued.
+                */
+               private boolean enqueueEvent(final FanOutSubscriptionEvent 
event) {
+                       try {
+                               if (!queue.offer(event, ENQUEUE_WAIT_SECONDS, 
SECONDS)) {

Review comment:
       For my own clarification / understanding:
   As I understand it, in case of `onError` during an ongoing `onNext`, this 
blocks job cancellation up to `ENQUEUE_WAIT_SECONDS`, because an 
`SubscriptionErrorEvent` cannot be added to the queue until the offer has 
timed-out and releases the lock?

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription 
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink 
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do 
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- |                      | 
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | 
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ *      Three types of message are passed over the queue for inter-thread 
communication:
+ *      <ul>
+ *             <li>{@link SubscriptionNextEvent} - passes data from the 
network to the consumer</li>
+ *             <li>{@link SubscriptionCompleteEvent} - indicates a 
subscription has expired</li>
+ *             <li>{@link SubscriptionErrorEvent} - passes an exception from 
the network to the consumer</li>
+ *      </ul>
+ * </p>
+ * <p>
+ *   The blocking queue has a maximum capacity of 1 record.
+ *   This allows backpressure to be applied closer to the network stack and 
results in record prefetch.
+ *   At maximum capacity we will have three {@link SubscribeToShardEvent} in 
memory (per instance of this class):
+ *   <ul>
+ *      <li>1 event being processed by the consumer</li>
+ *      <li>1 event enqueued in the blocking queue</li>
+ *      <li>1 event being added to the queue by the network (blocking)</li>
+ *   </ul>
+ * </p>
+ */

Review comment:
       💯  thank you for this great Javadoc, made review much easier!

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -30,15 +34,28 @@
 @Internal
 public class KinesisProxyV2 implements KinesisProxyV2Interface {
 
+       /** An Asynchronous client used to communicate with AWS services. */
        private final KinesisAsyncClient kinesisAsyncClient;
 
        /**
-        * Create a new KinesisProxyV2 based on the supplied configuration 
properties.
+        * Create a new KinesisProxyV2 using the provided Async Client.
         *
         * @param kinesisAsyncClient the kinesis async client used to 
communicate with Kinesis
         */
        public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
                this.kinesisAsyncClient = 
Preconditions.checkNotNull(kinesisAsyncClient);
        }
 
+       @Override
+       public CompletableFuture<Void> subscribeToShard(
+                       final SubscribeToShardRequest request,
+                       final SubscribeToShardResponseHandler responseHandler) {
+               return kinesisAsyncClient.subscribeToShard(request, 
responseHandler);
+       }
+
+       @Override
+       public void close() {
+               kinesisAsyncClient.close();

Review comment:
       Noted. It's fine to leave it in the next PR, I'll merge them in one 
batch.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription 
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink 
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do 
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- |                      | 
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | 
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ *      Three types of message are passed over the queue for inter-thread 
communication:
+ *      <ul>
+ *             <li>{@link SubscriptionNextEvent} - passes data from the 
network to the consumer</li>
+ *             <li>{@link SubscriptionCompleteEvent} - indicates a 
subscription has expired</li>
+ *             <li>{@link SubscriptionErrorEvent} - passes an exception from 
the network to the consumer</li>
+ *      </ul>
+ * </p>
+ * <p>
+ *   The blocking queue has a maximum capacity of 1 record.
+ *   This allows backpressure to be applied closer to the network stack and 
results in record prefetch.
+ *   At maximum capacity we will have three {@link SubscribeToShardEvent} in 
memory (per instance of this class):
+ *   <ul>
+ *      <li>1 event being processed by the consumer</li>
+ *      <li>1 event enqueued in the blocking queue</li>
+ *      <li>1 event being added to the queue by the network (blocking)</li>
+ *   </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+       /**
+        * The maximum capacity of the queue between the network and 
consumption thread.
+        * The queue is mainly used to isolate networking from consumption such 
that errors do not bubble up.
+        * This queue also acts as a buffer resulting in a record prefetch and 
reduced latency.
+        */
+       private static final int QUEUE_CAPACITY = 1;
+
+       /**
+        * Read timeout will occur after 30 seconds, a sanity timeout to 
prevent lockup in unexpected error states.
+        * If the consumer does not receive a new event within the 
DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+        * Under normal conditions heartbeat events are received even when 
there are no records to consume, so it is not
+        * expected for this timeout to occur under normal conditions.
+        */
+       private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+       /** The time to wait when enqueuing events to allow complete/error 
events to "push in front" of data . */
+       private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+       private final BlockingQueue<FanOutSubscriptionEvent> queue = new 
LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+       private final KinesisProxyV2Interface kinesis;
+
+       private final String consumerArn;
+
+       private final String shardId;
+
+       /**
+        * Create a new Fan Out subscriber.
+        *
+        * @param consumerArn the stream consumer ARN
+        * @param shardId the shard ID to subscribe to
+        * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+        */
+       public FanOutShardSubscriber(final String consumerArn, final String 
shardId, final KinesisProxyV2Interface kinesis) {
+               this.kinesis = Preconditions.checkNotNull(kinesis);
+               this.consumerArn = Preconditions.checkNotNull(consumerArn);
+               this.shardId = Preconditions.checkNotNull(shardId);
+       }
+
+       /**
+        * Obtains a subscription to the shard from the specified {@code 
startingPosition}.
+        * {@link SubscribeToShardEvent} received from KDS are delivered to the 
given {@code eventConsumer}.
+        * Returns false if there are records left to consume from the shard.
+        *
+        * @param startingPosition the position in the stream in which to start 
receiving records
+        * @param eventConsumer the consumer to deliver received events to
+        * @return true if there are no more messages (complete), false if a 
subsequent subscription should be obtained
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        * @throws InterruptedException
+        */
+       public boolean subscribeToShardAndConsumeRecords(
+                       final StartingPosition startingPosition,
+                       final Consumer<SubscribeToShardEvent> eventConsumer) 
throws InterruptedException, FanOutSubscriberException {
+               LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+               try {
+                       openSubscriptionToShard(startingPosition);
+               } catch (FanOutSubscriberException ex) {
+                       // The only exception that should cause a failure is a 
ResourceNotFoundException
+                       // Rethrow the exception to trigger the application to 
terminate
+                       if (ex.getCause() instanceof ResourceNotFoundException) 
{
+                               throw (ResourceNotFoundException) ex.getCause();
+                       }
+
+                       throw ex;
+               }
+
+               return consumeAllRecordsFromKinesisShard(eventConsumer);
+       }
+
+       /**
+        * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a 
subscription.
+        * In the event a non-recoverable error occurs this method will rethrow 
the exception.
+        * Once the subscription is acquired the client signals to the producer 
that we are ready to receive records.
+        *
+        * @param startingPosition the position in which to start consuming from
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        */
+       private void openSubscriptionToShard(final StartingPosition 
startingPosition) throws FanOutSubscriberException, InterruptedException {
+               SubscribeToShardRequest request = 
SubscribeToShardRequest.builder()
+                       .consumerARN(consumerArn)
+                       .shardId(shardId)
+                       .startingPosition(startingPosition)
+                       .build();
+
+               AtomicReference<Throwable> exception = new AtomicReference<>();
+               CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+               FanOutShardSubscription subscription = new 
FanOutShardSubscription(waitForSubscriptionLatch);
+
+               SubscribeToShardResponseHandler responseHandler = 
SubscribeToShardResponseHandler
+                       .builder()
+                       .onError(e -> {
+                               // Errors that occur while trying to acquire a 
subscription are only thrown from here
+                               // Errors that occur during the subscription 
are surfaced here and to the FanOutShardSubscription
+                               //      (errors are ignored here once the 
subscription is open)
+                               if (waitForSubscriptionLatch.getCount() > 0) {
+                                       exception.set(e);
+                                       waitForSubscriptionLatch.countDown();
+                               }
+                       })
+                       .subscriber(() -> subscription)
+                       .build();
+
+               kinesis.subscribeToShard(request, responseHandler);
+
+               waitForSubscriptionLatch.await();
+
+               Throwable throwable = exception.get();
+               if (throwable != null) {
+                       handleError(throwable);
+               }
+
+               LOG.debug("Acquired subscription - {} ({})", shardId, 
consumerArn);
+
+               // Request the first record to kick off consumption
+               // Following requests are made by the FanOutShardSubscription 
on the netty thread
+               subscription.requestRecord();
+       }
+
+       /**
+        * Update the reference to the latest networking error in this object.
+        * Parent caller can interrogate to decide how to handle error.
+        *
+        * @param throwable the exception that has occurred
+        */
+       private void handleError(final Throwable throwable) throws 
FanOutSubscriberException {
+               Throwable cause;
+               if (throwable instanceof CompletionException || throwable 
instanceof ExecutionException) {
+                       cause = throwable.getCause();
+               } else {
+                       cause = throwable;
+               }
+
+               LOG.warn("Error occurred on EFO subscription: {} - ({}).  {} 
({})",
+                       throwable.getClass().getName(), throwable.getMessage(), 
shardId, consumerArn, cause);
+
+               throw new FanOutSubscriberException(cause);
+       }
+
+       /**
+        * Once the subscription is open, records will be delivered to the 
{@link BlockingQueue}.
+        * Queue capacity is hardcoded to 1 record, the queue is used solely to 
separate consumption and processing.
+        * However, this buffer will result in latency reduction as records are 
pre-fetched as a result.
+        * This method will poll the queue and exit under any of these 
conditions:
+        * - {@code continuationSequenceNumber} is {@code null}, indicating the 
shard is complete
+        * - The subscription expires, indicated by a {@link 
SubscriptionCompleteEvent}
+        * - There is an error while consuming records, indicated by a {@link 
SubscriptionErrorEvent}
+        *
+        * @param eventConsumer the event consumer to deliver records to
+        * @return true if there are no more messages (complete), false if a 
subsequent subscription should be obtained
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        * @throws InterruptedException
+        */
+       private boolean consumeAllRecordsFromKinesisShard(
+                       final Consumer<SubscribeToShardEvent> eventConsumer) 
throws InterruptedException, FanOutSubscriberException {
+               String continuationSequenceNumber = null;
+
+               do {
+                       // Read timeout will occur after 30 seconds, add a 
sanity timeout here to prevent lockup
+                       FanOutSubscriptionEvent subscriptionEvent = 
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+                       if (subscriptionEvent == null) {
+                               LOG.debug("Timed out polling events from 
network, reacquiring subscription - {} ({})", shardId, consumerArn);
+                               return false;
+                       } else if (subscriptionEvent.isSubscribeToShardEvent()) 
{
+                               SubscribeToShardEvent event = 
subscriptionEvent.getSubscribeToShardEvent();
+                               continuationSequenceNumber = 
event.continuationSequenceNumber();
+                               if (!event.records().isEmpty()) {
+                                       eventConsumer.accept(event);
+                               }
+                       } else if (subscriptionEvent.isSubscriptionComplete()) {
+                               // The subscription is complete, but the shard 
might not be, so we return incomplete
+                               return false;
+                       } else {
+                               handleError(subscriptionEvent.getThrowable());
+                               return false;
+                       }
+               } while (continuationSequenceNumber != null);
+
+               return true;
+       }
+
+       /**
+        * The {@link FanOutShardSubscription} subscribes to the events coming 
from KDS and adds them to the {@link BlockingQueue}.
+        * Backpressure is applied based on the maximum capacity of the queue.
+        * The {@link Subscriber} methods of this class are invoked by a thread 
from the {@link KinesisAsyncClient}.
+        */
+       private class FanOutShardSubscription implements 
Subscriber<SubscribeToShardEventStream> {
+
+               private Subscription subscription;
+
+               private boolean cancelled = false;

Review comment:
       Should this be `volatile`? It looks like the variable is accessed 
concurrently in `onNext` and `onError`.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
+import org.apache.flink.util.Preconditions;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.StartingPosition;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream;
+import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest;
+import 
software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * This class is responsible for acquiring an Enhanced Fan Out subscription 
and consuming records from a shard.
+ * A queue is used to buffer records between the Kinesis Proxy and Flink 
application. This allows processing
+ * to be separated from consumption; errors thrown in the consumption layer do 
not propagate up to application.
+ *
+ * <pre>{@code [
+ * | ----------- Source Connector Thread ----------- |                      | 
--- KinesisAsyncClient Thread(s) -- |
+ * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | 
KinesisProxyV2 | KinesisAsyncClient |
+ * ]}</pre>
+ * <p>
+ *      Three types of message are passed over the queue for inter-thread 
communication:
+ *      <ul>
+ *             <li>{@link SubscriptionNextEvent} - passes data from the 
network to the consumer</li>
+ *             <li>{@link SubscriptionCompleteEvent} - indicates a 
subscription has expired</li>
+ *             <li>{@link SubscriptionErrorEvent} - passes an exception from 
the network to the consumer</li>
+ *      </ul>
+ * </p>
+ * <p>
+ *   The blocking queue has a maximum capacity of 1 record.
+ *   This allows backpressure to be applied closer to the network stack and 
results in record prefetch.
+ *   At maximum capacity we will have three {@link SubscribeToShardEvent} in 
memory (per instance of this class):
+ *   <ul>
+ *      <li>1 event being processed by the consumer</li>
+ *      <li>1 event enqueued in the blocking queue</li>
+ *      <li>1 event being added to the queue by the network (blocking)</li>
+ *   </ul>
+ * </p>
+ */
+@Internal
+public class FanOutShardSubscriber {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FanOutShardSubscriber.class);
+
+       /**
+        * The maximum capacity of the queue between the network and 
consumption thread.
+        * The queue is mainly used to isolate networking from consumption such 
that errors do not bubble up.
+        * This queue also acts as a buffer resulting in a record prefetch and 
reduced latency.
+        */
+       private static final int QUEUE_CAPACITY = 1;
+
+       /**
+        * Read timeout will occur after 30 seconds, a sanity timeout to 
prevent lockup in unexpected error states.
+        * If the consumer does not receive a new event within the 
DEQUEUE_WAIT_SECONDS it will backoff and resubscribe.
+        * Under normal conditions heartbeat events are received even when 
there are no records to consume, so it is not
+        * expected for this timeout to occur under normal conditions.
+        */
+       private static final int DEQUEUE_WAIT_SECONDS = 35;
+
+       /** The time to wait when enqueuing events to allow complete/error 
events to "push in front" of data . */
+       private static final int ENQUEUE_WAIT_SECONDS = 5;
+
+       private final BlockingQueue<FanOutSubscriptionEvent> queue = new 
LinkedBlockingQueue<>(QUEUE_CAPACITY);
+
+       private final KinesisProxyV2Interface kinesis;
+
+       private final String consumerArn;
+
+       private final String shardId;
+
+       /**
+        * Create a new Fan Out subscriber.
+        *
+        * @param consumerArn the stream consumer ARN
+        * @param shardId the shard ID to subscribe to
+        * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2
+        */
+       public FanOutShardSubscriber(final String consumerArn, final String 
shardId, final KinesisProxyV2Interface kinesis) {
+               this.kinesis = Preconditions.checkNotNull(kinesis);
+               this.consumerArn = Preconditions.checkNotNull(consumerArn);
+               this.shardId = Preconditions.checkNotNull(shardId);
+       }
+
+       /**
+        * Obtains a subscription to the shard from the specified {@code 
startingPosition}.
+        * {@link SubscribeToShardEvent} received from KDS are delivered to the 
given {@code eventConsumer}.
+        * Returns false if there are records left to consume from the shard.
+        *
+        * @param startingPosition the position in the stream in which to start 
receiving records
+        * @param eventConsumer the consumer to deliver received events to
+        * @return true if there are no more messages (complete), false if a 
subsequent subscription should be obtained
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        * @throws InterruptedException
+        */
+       public boolean subscribeToShardAndConsumeRecords(
+                       final StartingPosition startingPosition,
+                       final Consumer<SubscribeToShardEvent> eventConsumer) 
throws InterruptedException, FanOutSubscriberException {
+               LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn);
+
+               try {
+                       openSubscriptionToShard(startingPosition);
+               } catch (FanOutSubscriberException ex) {
+                       // The only exception that should cause a failure is a 
ResourceNotFoundException
+                       // Rethrow the exception to trigger the application to 
terminate
+                       if (ex.getCause() instanceof ResourceNotFoundException) 
{
+                               throw (ResourceNotFoundException) ex.getCause();
+                       }
+
+                       throw ex;
+               }
+
+               return consumeAllRecordsFromKinesisShard(eventConsumer);
+       }
+
+       /**
+        * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a 
subscription.
+        * In the event a non-recoverable error occurs this method will rethrow 
the exception.
+        * Once the subscription is acquired the client signals to the producer 
that we are ready to receive records.
+        *
+        * @param startingPosition the position in which to start consuming from
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        */
+       private void openSubscriptionToShard(final StartingPosition 
startingPosition) throws FanOutSubscriberException, InterruptedException {
+               SubscribeToShardRequest request = 
SubscribeToShardRequest.builder()
+                       .consumerARN(consumerArn)
+                       .shardId(shardId)
+                       .startingPosition(startingPosition)
+                       .build();
+
+               AtomicReference<Throwable> exception = new AtomicReference<>();
+               CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1);
+               FanOutShardSubscription subscription = new 
FanOutShardSubscription(waitForSubscriptionLatch);
+
+               SubscribeToShardResponseHandler responseHandler = 
SubscribeToShardResponseHandler
+                       .builder()
+                       .onError(e -> {
+                               // Errors that occur while trying to acquire a 
subscription are only thrown from here
+                               // Errors that occur during the subscription 
are surfaced here and to the FanOutShardSubscription
+                               //      (errors are ignored here once the 
subscription is open)
+                               if (waitForSubscriptionLatch.getCount() > 0) {
+                                       exception.set(e);
+                                       waitForSubscriptionLatch.countDown();
+                               }
+                       })
+                       .subscriber(() -> subscription)
+                       .build();
+
+               kinesis.subscribeToShard(request, responseHandler);
+
+               waitForSubscriptionLatch.await();
+
+               Throwable throwable = exception.get();
+               if (throwable != null) {
+                       handleError(throwable);
+               }
+
+               LOG.debug("Acquired subscription - {} ({})", shardId, 
consumerArn);
+
+               // Request the first record to kick off consumption
+               // Following requests are made by the FanOutShardSubscription 
on the netty thread
+               subscription.requestRecord();
+       }
+
+       /**
+        * Update the reference to the latest networking error in this object.
+        * Parent caller can interrogate to decide how to handle error.
+        *
+        * @param throwable the exception that has occurred
+        */
+       private void handleError(final Throwable throwable) throws 
FanOutSubscriberException {
+               Throwable cause;
+               if (throwable instanceof CompletionException || throwable 
instanceof ExecutionException) {
+                       cause = throwable.getCause();
+               } else {
+                       cause = throwable;
+               }
+
+               LOG.warn("Error occurred on EFO subscription: {} - ({}).  {} 
({})",
+                       throwable.getClass().getName(), throwable.getMessage(), 
shardId, consumerArn, cause);
+
+               throw new FanOutSubscriberException(cause);
+       }
+
+       /**
+        * Once the subscription is open, records will be delivered to the 
{@link BlockingQueue}.
+        * Queue capacity is hardcoded to 1 record, the queue is used solely to 
separate consumption and processing.
+        * However, this buffer will result in latency reduction as records are 
pre-fetched as a result.
+        * This method will poll the queue and exit under any of these 
conditions:
+        * - {@code continuationSequenceNumber} is {@code null}, indicating the 
shard is complete
+        * - The subscription expires, indicated by a {@link 
SubscriptionCompleteEvent}
+        * - There is an error while consuming records, indicated by a {@link 
SubscriptionErrorEvent}
+        *
+        * @param eventConsumer the event consumer to deliver records to
+        * @return true if there are no more messages (complete), false if a 
subsequent subscription should be obtained
+        * @throws FanOutSubscriberException when an exception is propagated 
from the networking stack
+        * @throws InterruptedException
+        */
+       private boolean consumeAllRecordsFromKinesisShard(
+                       final Consumer<SubscribeToShardEvent> eventConsumer) 
throws InterruptedException, FanOutSubscriberException {
+               String continuationSequenceNumber = null;
+
+               do {
+                       // Read timeout will occur after 30 seconds, add a 
sanity timeout here to prevent lockup
+                       FanOutSubscriptionEvent subscriptionEvent = 
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+
+                       if (subscriptionEvent == null) {
+                               LOG.debug("Timed out polling events from 
network, reacquiring subscription - {} ({})", shardId, consumerArn);
+                               return false;
+                       } else if (subscriptionEvent.isSubscribeToShardEvent()) 
{
+                               SubscribeToShardEvent event = 
subscriptionEvent.getSubscribeToShardEvent();
+                               continuationSequenceNumber = 
event.continuationSequenceNumber();
+                               if (!event.records().isEmpty()) {
+                                       eventConsumer.accept(event);
+                               }
+                       } else if (subscriptionEvent.isSubscriptionComplete()) {
+                               // The subscription is complete, but the shard 
might not be, so we return incomplete
+                               return false;
+                       } else {
+                               handleError(subscriptionEvent.getThrowable());
+                               return false;
+                       }
+               } while (continuationSequenceNumber != null);
+
+               return true;
+       }
+
+       /**
+        * The {@link FanOutShardSubscription} subscribes to the events coming 
from KDS and adds them to the {@link BlockingQueue}.
+        * Backpressure is applied based on the maximum capacity of the queue.
+        * The {@link Subscriber} methods of this class are invoked by a thread 
from the {@link KinesisAsyncClient}.
+        */
+       private class FanOutShardSubscription implements 
Subscriber<SubscribeToShardEventStream> {
+
+               private Subscription subscription;
+
+               private boolean cancelled = false;
+
+               private final CountDownLatch waitForSubscriptionLatch;
+
+               private final Object lockObject = new Object();
+
+               private FanOutShardSubscription(final CountDownLatch 
waitForSubscriptionLatch) {
+                       this.waitForSubscriptionLatch = 
waitForSubscriptionLatch;
+               }
+
+               /**
+                * Flag to the producer that we are ready to receive more 
events.
+                */
+               public void requestRecord() {
+                       if (!cancelled) {
+                               LOG.debug("Requesting more records from EFO 
subscription - {} ({})", shardId, consumerArn);
+                               subscription.request(1);
+                       }
+               }
+
+               @Override
+               public void onSubscribe(Subscription subscription) {
+                       this.subscription = subscription;
+                       waitForSubscriptionLatch.countDown();
+               }
+
+               @Override
+               public void onNext(SubscribeToShardEventStream 
subscribeToShardEventStream) {
+                       subscribeToShardEventStream.accept(new 
SubscribeToShardResponseHandler.Visitor() {
+                               @Override
+                               public void visit(SubscribeToShardEvent event) {
+                                       synchronized (lockObject) {
+                                               if (enqueueEventWithRetry(new 
SubscriptionNextEvent(event))) {
+                                                       requestRecord();
+                                               }
+                                       }
+                               }
+                       });
+               }
+
+               @Override
+               public void onError(Throwable throwable) {
+                       LOG.debug("Error occurred on EFO subscription: {} - 
({}).  {} ({})",
+                               throwable.getClass().getName(), 
throwable.getMessage(), shardId, consumerArn);
+
+                       // Cancel the subscription to signal the onNext to stop 
queuing and requesting data
+                       cancelSubscription();
+
+                       synchronized (lockObject) {
+                               // Empty the queue and add a poison pill to 
terminate this subscriber
+                               // The synchronized block ensures that new data 
is not written in the meantime
+                               queue.clear();
+                               enqueueEvent(new 
SubscriptionErrorEvent(throwable));

Review comment:
       In the case of backpressure in Flink's network stack, would this also 
mean that job failure is postponed until this `SubscriptionErrorEvent` is 
postponed until the source connector thread consumes it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to