tzulitai commented on a change in pull request #13102: URL: https://github.com/apache/flink/pull/13102#discussion_r489964177
########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ########## @@ -102,6 +107,11 @@ public void run() { try { while (isRunning()) { final RecordPublisherRunResult result = recordPublisher.run(batch -> { + if (!batch.getDeaggregatedRecords().isEmpty()) { Review comment: Could you briefly explain the reason behind adding this log? ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutRecordPublisher.java ########## @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordBatch; +import org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher; +import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.FanOutSubscriberException; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.StartingPosition; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; +import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.EncryptionType; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.COMPLETE; +import static org.apache.flink.streaming.connectors.kinesis.internals.publisher.RecordPublisher.RecordPublisherRunResult.INCOMPLETE; +import static software.amazon.awssdk.services.kinesis.model.StartingPosition.builder; + +/** + * A {@link RecordPublisher} that will read and forward records from Kinesis using EFO, to the subscriber. + * Records are consumed via Enhanced Fan Out subscriptions using SubscribeToShard API. + */ +@Internal +public class FanOutRecordPublisher implements RecordPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutRecordPublisher.class); + + private final FullJitterBackoff backoff; + + private final String consumerArn; + + private final KinesisProxyV2Interface kinesisProxy; + + private final StreamShardHandle subscribedShard; + + private final FanOutRecordPublisherConfiguration configuration; + + /** The current attempt in the case of subsequent recoverable errors. */ + private int attempt = 0; + + private StartingPosition nextStartingPosition; + + /** + * Instantiate a new FanOutRecordPublisher. + * Consumes data from KDS using EFO SubscribeToShard over AWS SDK V2.x + * + * @param startingPosition the position in the shard to start consuming from + * @param consumerArn the consumer ARN of the stream consumer + * @param subscribedShard the shard to consumer from + * @param kinesisProxy the proxy used to talk to Kinesis services + * @param configuration the record publisher configuration + */ + public FanOutRecordPublisher( + final StartingPosition startingPosition, + final String consumerArn, + final StreamShardHandle subscribedShard, + final KinesisProxyV2Interface kinesisProxy, + final FanOutRecordPublisherConfiguration configuration, + final FullJitterBackoff backoff) { + this.nextStartingPosition = Preconditions.checkNotNull(startingPosition); + this.consumerArn = Preconditions.checkNotNull(consumerArn); + this.subscribedShard = Preconditions.checkNotNull(subscribedShard); + this.kinesisProxy = Preconditions.checkNotNull(kinesisProxy); + this.configuration = Preconditions.checkNotNull(configuration); + this.backoff = Preconditions.checkNotNull(backoff); + } + + @Override + public RecordPublisherRunResult run(final RecordBatchConsumer recordConsumer) throws InterruptedException { + LOG.info("Running fan out record publisher on {}::{} from {} - {}", + subscribedShard.getStreamName(), + subscribedShard.getShard().getShardId(), + nextStartingPosition.getShardIteratorType(), + nextStartingPosition.getStartingMarker()); + + Consumer<SubscribeToShardEvent> eventConsumer = event -> { + RecordBatch recordBatch = new RecordBatch(toSdkV1Records(event.records()), subscribedShard, event.millisBehindLatest()); + SequenceNumber sequenceNumber = recordConsumer.accept(recordBatch); + nextStartingPosition = StartingPosition.continueFromSequenceNumber(sequenceNumber); + }; + + RecordPublisherRunResult result = runWithBackoff(eventConsumer); + + LOG.info("Subscription expired {}::{}, with status {}", + subscribedShard.getStreamName(), + subscribedShard.getShard().getShardId(), + result); + + return result; + } + + /** + * Runs the record publisher, will sleep for configuration computed jitter period in the case of certain exceptions. + * Unrecoverable exceptions are thrown to terminate the application. + * + * @param eventConsumer the consumer to pass events to + * @return {@code COMPLETE} if the shard is complete and this shard consumer should exit + * @throws InterruptedException + */ + private RecordPublisherRunResult runWithBackoff( + final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException { + FanOutShardSubscriber fanOutShardSubscriber = new FanOutShardSubscriber( + consumerArn, + subscribedShard.getShard().getShardId(), + kinesisProxy); + boolean complete; + + try { + complete = fanOutShardSubscriber.subscribeToShardAndConsumeRecords( + toSdkV2StartingPosition(nextStartingPosition), eventConsumer); + attempt = 0; + } catch (FanOutSubscriberException ex) { + // We have received an error from the network layer + // This can be due to limits being exceeded, network timeouts, etc + // We should backoff, reacquire a subscription and try again + if (ex.getCause() instanceof ResourceNotFoundException) { + LOG.warn("Received ResourceNotFoundException. Either the shard does not exist, or the stream subscriber has been deregistered." + + "Marking this shard as complete {} ({})", subscribedShard.getShard().getShardId(), consumerArn); + + return COMPLETE; + } + + if (attempt == configuration.getSubscribeToShardMaxRetries()) { + throw new RuntimeException("Maximum reties exceeded for SubscribeToShard. " + + "Failed " + configuration.getSubscribeToShardMaxRetries() + " times."); + } + + backoff(ex); + return INCOMPLETE; + } + + return complete ? COMPLETE : INCOMPLETE; + } + + private void backoff(final Throwable ex) throws InterruptedException { + long backoffMillis = backoff.calculateFullJitterBackoff( + configuration.getSubscribeToShardBaseBackoffMillis(), + configuration.getSubscribeToShardMaxBackoffMillis(), + configuration.getSubscribeToShardExpConstant(), + ++attempt); Review comment: nitpick: What do you think about moving the attempt increment into the `catch` block in `runWithBackoff`? That seems like a more appropriate place flow-wise. ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java ########## @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; +import org.apache.flink.util.Preconditions; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records from a shard. + * A queue is used to buffer records between the Kinesis Proxy and Flink application. This allows processing + * to be separated from consumption; errors thrown in the consumption layer do not propagate up to application. + * + * <pre>{@code [ + * | ----------- Source Connector Thread ----------- | | --- KinesisAsyncClient Thread(s) -- | + * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | KinesisProxyV2 | KinesisAsyncClient | + * ]}</pre> + * <p> + * Three types of message are passed over the queue for inter-thread communication: + * <ul> + * <li>{@link SubscriptionNextEvent} - passes data from the network to the consumer</li> + * <li>{@link SubscriptionCompleteEvent} - indicates a subscription has expired</li> + * <li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer</li> + * </ul> + * </p> + * <p> + * The blocking queue has a maximum capacity of 1 record. + * This allows backpressure to be applied closer to the network stack and results in record prefetch. + * At maximum capacity we will have three {@link SubscribeToShardEvent} in memory (per instance of this class): + * <ul> + * <li>1 event being processed by the consumer</li> + * <li>1 event enqueued in the blocking queue</li> + * <li>1 event being added to the queue by the network (blocking)</li> + * </ul> + * </p> + */ +@Internal +public class FanOutShardSubscriber { Review comment: Could you address the various warnings in this class? Such as `Access can be package-private` and missing serialVersionUID etc. ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java ########## @@ -277,7 +278,9 @@ public static void setAwsClientConfigProperties(ClientConfiguration config, * @return the starting position */ public static StartingPosition getStartingPosition(final SequenceNumber sequenceNumber, final Properties configProps) { - if (SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get().equals(sequenceNumber)) { + if (sequenceNumber.equals(SENTINEL_LATEST_SEQUENCE_NUM.get())) { + return StartingPosition.fromTimestamp(new Date()); Review comment: Since this a pretty delicate thing, could you add a comment here on why we do this "special" translation, so that its easier for future generations to understand the reasoning here :) ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java ########## @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; +import org.apache.flink.util.Preconditions; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records from a shard. + * A queue is used to buffer records between the Kinesis Proxy and Flink application. This allows processing + * to be separated from consumption; errors thrown in the consumption layer do not propagate up to application. + * + * <pre>{@code [ + * | ----------- Source Connector Thread ----------- | | --- KinesisAsyncClient Thread(s) -- | + * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | KinesisProxyV2 | KinesisAsyncClient | + * ]}</pre> + * <p> + * Three types of message are passed over the queue for inter-thread communication: + * <ul> + * <li>{@link SubscriptionNextEvent} - passes data from the network to the consumer</li> + * <li>{@link SubscriptionCompleteEvent} - indicates a subscription has expired</li> + * <li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer</li> + * </ul> + * </p> + * <p> + * The blocking queue has a maximum capacity of 1 record. + * This allows backpressure to be applied closer to the network stack and results in record prefetch. + * At maximum capacity we will have three {@link SubscribeToShardEvent} in memory (per instance of this class): + * <ul> + * <li>1 event being processed by the consumer</li> + * <li>1 event enqueued in the blocking queue</li> + * <li>1 event being added to the queue by the network (blocking)</li> + * </ul> + * </p> + */ +@Internal +public class FanOutShardSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutShardSubscriber.class); + + /** + * The maximum capacity of the queue between the network and consumption thread. + * The queue is mainly used to isolate networking from consumption such that errors do not bubble up. + * This queue also acts as a buffer resulting in a record prefetch and reduced latency. + */ + private static final int QUEUE_CAPACITY = 1; + + /** + * Read timeout will occur after 30 seconds, a sanity timeout to prevent lockup in unexpected error states. + * If the consumer does not receive a new event within the DEQUEUE_WAIT_SECONDS it will backoff and resubscribe. + * Under normal conditions heartbeat events are received even when there are no records to consume, so it is not + * expected for this timeout to occur under normal conditions. + */ + private static final int DEQUEUE_WAIT_SECONDS = 35; + + /** The time to wait when enqueuing events to allow complete/error events to "push in front" of data . */ + private static final int ENQUEUE_WAIT_SECONDS = 5; + + private final BlockingQueue<FanOutSubscriptionEvent> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + + private final KinesisProxyV2Interface kinesis; + + private final String consumerArn; + + private final String shardId; + + /** + * Create a new Fan Out subscriber. + * + * @param consumerArn the stream consumer ARN + * @param shardId the shard ID to subscribe to + * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2 + */ + public FanOutShardSubscriber(final String consumerArn, final String shardId, final KinesisProxyV2Interface kinesis) { + this.kinesis = Preconditions.checkNotNull(kinesis); + this.consumerArn = Preconditions.checkNotNull(consumerArn); + this.shardId = Preconditions.checkNotNull(shardId); + } + + /** + * Obtains a subscription to the shard from the specified {@code startingPosition}. + * {@link SubscribeToShardEvent} received from KDS are delivered to the given {@code eventConsumer}. + * Returns false if there are records left to consume from the shard. + * + * @param startingPosition the position in the stream in which to start receiving records + * @param eventConsumer the consumer to deliver received events to + * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + * @throws InterruptedException + */ + public boolean subscribeToShardAndConsumeRecords( + final StartingPosition startingPosition, + final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException { + LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn); + + try { + openSubscriptionToShard(startingPosition); + } catch (FanOutSubscriberException ex) { + // The only exception that should cause a failure is a ResourceNotFoundException + // Rethrow the exception to trigger the application to terminate + if (ex.getCause() instanceof ResourceNotFoundException) { + throw (ResourceNotFoundException) ex.getCause(); + } + + throw ex; + } + + return consumeAllRecordsFromKinesisShard(eventConsumer); + } + + /** + * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a subscription. + * In the event a non-recoverable error occurs this method will rethrow the exception. + * Once the subscription is acquired the client signals to the producer that we are ready to receive records. + * + * @param startingPosition the position in which to start consuming from + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + */ + private void openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException { + SubscribeToShardRequest request = SubscribeToShardRequest.builder() + .consumerARN(consumerArn) + .shardId(shardId) + .startingPosition(startingPosition) + .build(); + + AtomicReference<Throwable> exception = new AtomicReference<>(); + CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1); + FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch); + + SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler + .builder() + .onError(e -> { + // Errors that occur while trying to acquire a subscription are only thrown from here + // Errors that occur during the subscription are surfaced here and to the FanOutShardSubscription + // (errors are ignored here once the subscription is open) + if (waitForSubscriptionLatch.getCount() > 0) { + exception.set(e); + waitForSubscriptionLatch.countDown(); + } + }) + .subscriber(() -> subscription) + .build(); + + kinesis.subscribeToShard(request, responseHandler); + + waitForSubscriptionLatch.await(); + + Throwable throwable = exception.get(); + if (throwable != null) { + handleError(throwable); + } + + LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn); + + // Request the first record to kick off consumption + // Following requests are made by the FanOutShardSubscription on the netty thread + subscription.requestRecord(); + } + + /** + * Update the reference to the latest networking error in this object. + * Parent caller can interrogate to decide how to handle error. + * + * @param throwable the exception that has occurred + */ + private void handleError(final Throwable throwable) throws FanOutSubscriberException { + Throwable cause; + if (throwable instanceof CompletionException || throwable instanceof ExecutionException) { + cause = throwable.getCause(); + } else { + cause = throwable; + } + + LOG.warn("Error occurred on EFO subscription: {} - ({}). {} ({})", + throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn, cause); + + throw new FanOutSubscriberException(cause); + } + + /** + * Once the subscription is open, records will be delivered to the {@link BlockingQueue}. + * Queue capacity is hardcoded to 1 record, the queue is used solely to separate consumption and processing. + * However, this buffer will result in latency reduction as records are pre-fetched as a result. + * This method will poll the queue and exit under any of these conditions: + * - {@code continuationSequenceNumber} is {@code null}, indicating the shard is complete + * - The subscription expires, indicated by a {@link SubscriptionCompleteEvent} + * - There is an error while consuming records, indicated by a {@link SubscriptionErrorEvent} + * + * @param eventConsumer the event consumer to deliver records to + * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + * @throws InterruptedException + */ + private boolean consumeAllRecordsFromKinesisShard( + final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException { + String continuationSequenceNumber = null; + + do { + // Read timeout will occur after 30 seconds, add a sanity timeout here to prevent lockup + FanOutSubscriptionEvent subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS); + + if (subscriptionEvent == null) { + LOG.debug("Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn); + return false; + } else if (subscriptionEvent.isSubscribeToShardEvent()) { + SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent(); + continuationSequenceNumber = event.continuationSequenceNumber(); + if (!event.records().isEmpty()) { + eventConsumer.accept(event); + } + } else if (subscriptionEvent.isSubscriptionComplete()) { + // The subscription is complete, but the shard might not be, so we return incomplete + return false; + } else { + handleError(subscriptionEvent.getThrowable()); + return false; + } + } while (continuationSequenceNumber != null); + + return true; + } + + /** + * The {@link FanOutShardSubscription} subscribes to the events coming from KDS and adds them to the {@link BlockingQueue}. + * Backpressure is applied based on the maximum capacity of the queue. + * The {@link Subscriber} methods of this class are invoked by a thread from the {@link KinesisAsyncClient}. + */ + private class FanOutShardSubscription implements Subscriber<SubscribeToShardEventStream> { + + private Subscription subscription; + + private boolean cancelled = false; + + private final CountDownLatch waitForSubscriptionLatch; + + private final Object lockObject = new Object(); + + private FanOutShardSubscription(final CountDownLatch waitForSubscriptionLatch) { + this.waitForSubscriptionLatch = waitForSubscriptionLatch; + } + + /** + * Flag to the producer that we are ready to receive more events. + */ + public void requestRecord() { + if (!cancelled) { + LOG.debug("Requesting more records from EFO subscription - {} ({})", shardId, consumerArn); + subscription.request(1); + } + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + waitForSubscriptionLatch.countDown(); + } + + @Override + public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) { + subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor() { + @Override + public void visit(SubscribeToShardEvent event) { + synchronized (lockObject) { + if (enqueueEventWithRetry(new SubscriptionNextEvent(event))) { + requestRecord(); + } + } + } + }); + } + + @Override + public void onError(Throwable throwable) { + LOG.debug("Error occurred on EFO subscription: {} - ({}). {} ({})", + throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn); + + // Cancel the subscription to signal the onNext to stop queuing and requesting data + cancelSubscription(); + + synchronized (lockObject) { + // Empty the queue and add a poison pill to terminate this subscriber + // The synchronized block ensures that new data is not written in the meantime + queue.clear(); + enqueueEvent(new SubscriptionErrorEvent(throwable)); + } + } + + @Override + public void onComplete() { + LOG.debug("EFO subscription complete - {} ({})", shardId, consumerArn); + enqueueEvent(new SubscriptionCompleteEvent()); + } + + private void cancelSubscription() { + if (!cancelled) { + cancelled = true; + subscription.cancel(); + } + } + + /** + * Continuously attempt to enqueue an event until successful or the subscription is cancelled (due to error). + * When backpressure applied by the consumer exceeds 30s for a single batch, a ReadTimeoutException will be + * thrown by the network stack. This will result in the subscription be cancelled and this event being discarded. + * The subscription would subsequently be reacquired and the discarded data would be fetched again. + * + * @param event the event to enqueue + * @return true if the event was successfully enqueued. + */ + private boolean enqueueEventWithRetry(final FanOutSubscriptionEvent event) { + boolean result = false; + do { + if (cancelled) { + break; + } + + synchronized (lockObject) { + result = enqueueEvent(event); + } + } while (!result); + + return result; + } + + /** + * Offers the event to the queue. + * + * @param event the event to enqueue + * @return true if the event was successfully enqueued. + */ + private boolean enqueueEvent(final FanOutSubscriptionEvent event) { + try { + if (!queue.offer(event, ENQUEUE_WAIT_SECONDS, SECONDS)) { Review comment: For my own clarification / understanding: As I understand it, in case of `onError` during an ongoing `onNext`, this blocks job cancellation up to `ENQUEUE_WAIT_SECONDS`, because an `SubscriptionErrorEvent` cannot be added to the queue until the offer has timed-out and releases the lock? ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java ########## @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; +import org.apache.flink.util.Preconditions; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records from a shard. + * A queue is used to buffer records between the Kinesis Proxy and Flink application. This allows processing + * to be separated from consumption; errors thrown in the consumption layer do not propagate up to application. + * + * <pre>{@code [ + * | ----------- Source Connector Thread ----------- | | --- KinesisAsyncClient Thread(s) -- | + * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | KinesisProxyV2 | KinesisAsyncClient | + * ]}</pre> + * <p> + * Three types of message are passed over the queue for inter-thread communication: + * <ul> + * <li>{@link SubscriptionNextEvent} - passes data from the network to the consumer</li> + * <li>{@link SubscriptionCompleteEvent} - indicates a subscription has expired</li> + * <li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer</li> + * </ul> + * </p> + * <p> + * The blocking queue has a maximum capacity of 1 record. + * This allows backpressure to be applied closer to the network stack and results in record prefetch. + * At maximum capacity we will have three {@link SubscribeToShardEvent} in memory (per instance of this class): + * <ul> + * <li>1 event being processed by the consumer</li> + * <li>1 event enqueued in the blocking queue</li> + * <li>1 event being added to the queue by the network (blocking)</li> + * </ul> + * </p> + */ Review comment: 💯 thank you for this great Javadoc, made review much easier! ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java ########## @@ -30,15 +34,28 @@ @Internal public class KinesisProxyV2 implements KinesisProxyV2Interface { + /** An Asynchronous client used to communicate with AWS services. */ private final KinesisAsyncClient kinesisAsyncClient; /** - * Create a new KinesisProxyV2 based on the supplied configuration properties. + * Create a new KinesisProxyV2 using the provided Async Client. * * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis */ public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) { this.kinesisAsyncClient = Preconditions.checkNotNull(kinesisAsyncClient); } + @Override + public CompletableFuture<Void> subscribeToShard( + final SubscribeToShardRequest request, + final SubscribeToShardResponseHandler responseHandler) { + return kinesisAsyncClient.subscribeToShard(request, responseHandler); + } + + @Override + public void close() { + kinesisAsyncClient.close(); Review comment: Noted. It's fine to leave it in the next PR, I'll merge them in one batch. ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java ########## @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; +import org.apache.flink.util.Preconditions; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records from a shard. + * A queue is used to buffer records between the Kinesis Proxy and Flink application. This allows processing + * to be separated from consumption; errors thrown in the consumption layer do not propagate up to application. + * + * <pre>{@code [ + * | ----------- Source Connector Thread ----------- | | --- KinesisAsyncClient Thread(s) -- | + * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | KinesisProxyV2 | KinesisAsyncClient | + * ]}</pre> + * <p> + * Three types of message are passed over the queue for inter-thread communication: + * <ul> + * <li>{@link SubscriptionNextEvent} - passes data from the network to the consumer</li> + * <li>{@link SubscriptionCompleteEvent} - indicates a subscription has expired</li> + * <li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer</li> + * </ul> + * </p> + * <p> + * The blocking queue has a maximum capacity of 1 record. + * This allows backpressure to be applied closer to the network stack and results in record prefetch. + * At maximum capacity we will have three {@link SubscribeToShardEvent} in memory (per instance of this class): + * <ul> + * <li>1 event being processed by the consumer</li> + * <li>1 event enqueued in the blocking queue</li> + * <li>1 event being added to the queue by the network (blocking)</li> + * </ul> + * </p> + */ +@Internal +public class FanOutShardSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutShardSubscriber.class); + + /** + * The maximum capacity of the queue between the network and consumption thread. + * The queue is mainly used to isolate networking from consumption such that errors do not bubble up. + * This queue also acts as a buffer resulting in a record prefetch and reduced latency. + */ + private static final int QUEUE_CAPACITY = 1; + + /** + * Read timeout will occur after 30 seconds, a sanity timeout to prevent lockup in unexpected error states. + * If the consumer does not receive a new event within the DEQUEUE_WAIT_SECONDS it will backoff and resubscribe. + * Under normal conditions heartbeat events are received even when there are no records to consume, so it is not + * expected for this timeout to occur under normal conditions. + */ + private static final int DEQUEUE_WAIT_SECONDS = 35; + + /** The time to wait when enqueuing events to allow complete/error events to "push in front" of data . */ + private static final int ENQUEUE_WAIT_SECONDS = 5; + + private final BlockingQueue<FanOutSubscriptionEvent> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + + private final KinesisProxyV2Interface kinesis; + + private final String consumerArn; + + private final String shardId; + + /** + * Create a new Fan Out subscriber. + * + * @param consumerArn the stream consumer ARN + * @param shardId the shard ID to subscribe to + * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2 + */ + public FanOutShardSubscriber(final String consumerArn, final String shardId, final KinesisProxyV2Interface kinesis) { + this.kinesis = Preconditions.checkNotNull(kinesis); + this.consumerArn = Preconditions.checkNotNull(consumerArn); + this.shardId = Preconditions.checkNotNull(shardId); + } + + /** + * Obtains a subscription to the shard from the specified {@code startingPosition}. + * {@link SubscribeToShardEvent} received from KDS are delivered to the given {@code eventConsumer}. + * Returns false if there are records left to consume from the shard. + * + * @param startingPosition the position in the stream in which to start receiving records + * @param eventConsumer the consumer to deliver received events to + * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + * @throws InterruptedException + */ + public boolean subscribeToShardAndConsumeRecords( + final StartingPosition startingPosition, + final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException { + LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn); + + try { + openSubscriptionToShard(startingPosition); + } catch (FanOutSubscriberException ex) { + // The only exception that should cause a failure is a ResourceNotFoundException + // Rethrow the exception to trigger the application to terminate + if (ex.getCause() instanceof ResourceNotFoundException) { + throw (ResourceNotFoundException) ex.getCause(); + } + + throw ex; + } + + return consumeAllRecordsFromKinesisShard(eventConsumer); + } + + /** + * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a subscription. + * In the event a non-recoverable error occurs this method will rethrow the exception. + * Once the subscription is acquired the client signals to the producer that we are ready to receive records. + * + * @param startingPosition the position in which to start consuming from + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + */ + private void openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException { + SubscribeToShardRequest request = SubscribeToShardRequest.builder() + .consumerARN(consumerArn) + .shardId(shardId) + .startingPosition(startingPosition) + .build(); + + AtomicReference<Throwable> exception = new AtomicReference<>(); + CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1); + FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch); + + SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler + .builder() + .onError(e -> { + // Errors that occur while trying to acquire a subscription are only thrown from here + // Errors that occur during the subscription are surfaced here and to the FanOutShardSubscription + // (errors are ignored here once the subscription is open) + if (waitForSubscriptionLatch.getCount() > 0) { + exception.set(e); + waitForSubscriptionLatch.countDown(); + } + }) + .subscriber(() -> subscription) + .build(); + + kinesis.subscribeToShard(request, responseHandler); + + waitForSubscriptionLatch.await(); + + Throwable throwable = exception.get(); + if (throwable != null) { + handleError(throwable); + } + + LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn); + + // Request the first record to kick off consumption + // Following requests are made by the FanOutShardSubscription on the netty thread + subscription.requestRecord(); + } + + /** + * Update the reference to the latest networking error in this object. + * Parent caller can interrogate to decide how to handle error. + * + * @param throwable the exception that has occurred + */ + private void handleError(final Throwable throwable) throws FanOutSubscriberException { + Throwable cause; + if (throwable instanceof CompletionException || throwable instanceof ExecutionException) { + cause = throwable.getCause(); + } else { + cause = throwable; + } + + LOG.warn("Error occurred on EFO subscription: {} - ({}). {} ({})", + throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn, cause); + + throw new FanOutSubscriberException(cause); + } + + /** + * Once the subscription is open, records will be delivered to the {@link BlockingQueue}. + * Queue capacity is hardcoded to 1 record, the queue is used solely to separate consumption and processing. + * However, this buffer will result in latency reduction as records are pre-fetched as a result. + * This method will poll the queue and exit under any of these conditions: + * - {@code continuationSequenceNumber} is {@code null}, indicating the shard is complete + * - The subscription expires, indicated by a {@link SubscriptionCompleteEvent} + * - There is an error while consuming records, indicated by a {@link SubscriptionErrorEvent} + * + * @param eventConsumer the event consumer to deliver records to + * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + * @throws InterruptedException + */ + private boolean consumeAllRecordsFromKinesisShard( + final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException { + String continuationSequenceNumber = null; + + do { + // Read timeout will occur after 30 seconds, add a sanity timeout here to prevent lockup + FanOutSubscriptionEvent subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS); + + if (subscriptionEvent == null) { + LOG.debug("Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn); + return false; + } else if (subscriptionEvent.isSubscribeToShardEvent()) { + SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent(); + continuationSequenceNumber = event.continuationSequenceNumber(); + if (!event.records().isEmpty()) { + eventConsumer.accept(event); + } + } else if (subscriptionEvent.isSubscriptionComplete()) { + // The subscription is complete, but the shard might not be, so we return incomplete + return false; + } else { + handleError(subscriptionEvent.getThrowable()); + return false; + } + } while (continuationSequenceNumber != null); + + return true; + } + + /** + * The {@link FanOutShardSubscription} subscribes to the events coming from KDS and adds them to the {@link BlockingQueue}. + * Backpressure is applied based on the maximum capacity of the queue. + * The {@link Subscriber} methods of this class are invoked by a thread from the {@link KinesisAsyncClient}. + */ + private class FanOutShardSubscription implements Subscriber<SubscribeToShardEventStream> { + + private Subscription subscription; + + private boolean cancelled = false; Review comment: Should this be `volatile`? It looks like the variable is accessed concurrently in `onNext` and `onError`. ########## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java ########## @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface; +import org.apache.flink.util.Preconditions; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * This class is responsible for acquiring an Enhanced Fan Out subscription and consuming records from a shard. + * A queue is used to buffer records between the Kinesis Proxy and Flink application. This allows processing + * to be separated from consumption; errors thrown in the consumption layer do not propagate up to application. + * + * <pre>{@code [ + * | ----------- Source Connector Thread ----------- | | --- KinesisAsyncClient Thread(s) -- | + * | FanOutRecordPublisher | FanOutShardSubscription | == blocking queue == | KinesisProxyV2 | KinesisAsyncClient | + * ]}</pre> + * <p> + * Three types of message are passed over the queue for inter-thread communication: + * <ul> + * <li>{@link SubscriptionNextEvent} - passes data from the network to the consumer</li> + * <li>{@link SubscriptionCompleteEvent} - indicates a subscription has expired</li> + * <li>{@link SubscriptionErrorEvent} - passes an exception from the network to the consumer</li> + * </ul> + * </p> + * <p> + * The blocking queue has a maximum capacity of 1 record. + * This allows backpressure to be applied closer to the network stack and results in record prefetch. + * At maximum capacity we will have three {@link SubscribeToShardEvent} in memory (per instance of this class): + * <ul> + * <li>1 event being processed by the consumer</li> + * <li>1 event enqueued in the blocking queue</li> + * <li>1 event being added to the queue by the network (blocking)</li> + * </ul> + * </p> + */ +@Internal +public class FanOutShardSubscriber { + + private static final Logger LOG = LoggerFactory.getLogger(FanOutShardSubscriber.class); + + /** + * The maximum capacity of the queue between the network and consumption thread. + * The queue is mainly used to isolate networking from consumption such that errors do not bubble up. + * This queue also acts as a buffer resulting in a record prefetch and reduced latency. + */ + private static final int QUEUE_CAPACITY = 1; + + /** + * Read timeout will occur after 30 seconds, a sanity timeout to prevent lockup in unexpected error states. + * If the consumer does not receive a new event within the DEQUEUE_WAIT_SECONDS it will backoff and resubscribe. + * Under normal conditions heartbeat events are received even when there are no records to consume, so it is not + * expected for this timeout to occur under normal conditions. + */ + private static final int DEQUEUE_WAIT_SECONDS = 35; + + /** The time to wait when enqueuing events to allow complete/error events to "push in front" of data . */ + private static final int ENQUEUE_WAIT_SECONDS = 5; + + private final BlockingQueue<FanOutSubscriptionEvent> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); + + private final KinesisProxyV2Interface kinesis; + + private final String consumerArn; + + private final String shardId; + + /** + * Create a new Fan Out subscriber. + * + * @param consumerArn the stream consumer ARN + * @param shardId the shard ID to subscribe to + * @param kinesis the Kinesis Proxy used to communicate via AWS SDK v2 + */ + public FanOutShardSubscriber(final String consumerArn, final String shardId, final KinesisProxyV2Interface kinesis) { + this.kinesis = Preconditions.checkNotNull(kinesis); + this.consumerArn = Preconditions.checkNotNull(consumerArn); + this.shardId = Preconditions.checkNotNull(shardId); + } + + /** + * Obtains a subscription to the shard from the specified {@code startingPosition}. + * {@link SubscribeToShardEvent} received from KDS are delivered to the given {@code eventConsumer}. + * Returns false if there are records left to consume from the shard. + * + * @param startingPosition the position in the stream in which to start receiving records + * @param eventConsumer the consumer to deliver received events to + * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + * @throws InterruptedException + */ + public boolean subscribeToShardAndConsumeRecords( + final StartingPosition startingPosition, + final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException { + LOG.debug("Subscribing to shard {} ({})", shardId, consumerArn); + + try { + openSubscriptionToShard(startingPosition); + } catch (FanOutSubscriberException ex) { + // The only exception that should cause a failure is a ResourceNotFoundException + // Rethrow the exception to trigger the application to terminate + if (ex.getCause() instanceof ResourceNotFoundException) { + throw (ResourceNotFoundException) ex.getCause(); + } + + throw ex; + } + + return consumeAllRecordsFromKinesisShard(eventConsumer); + } + + /** + * Calls {@link KinesisProxyV2#subscribeToShard} and waits to acquire a subscription. + * In the event a non-recoverable error occurs this method will rethrow the exception. + * Once the subscription is acquired the client signals to the producer that we are ready to receive records. + * + * @param startingPosition the position in which to start consuming from + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + */ + private void openSubscriptionToShard(final StartingPosition startingPosition) throws FanOutSubscriberException, InterruptedException { + SubscribeToShardRequest request = SubscribeToShardRequest.builder() + .consumerARN(consumerArn) + .shardId(shardId) + .startingPosition(startingPosition) + .build(); + + AtomicReference<Throwable> exception = new AtomicReference<>(); + CountDownLatch waitForSubscriptionLatch = new CountDownLatch(1); + FanOutShardSubscription subscription = new FanOutShardSubscription(waitForSubscriptionLatch); + + SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler + .builder() + .onError(e -> { + // Errors that occur while trying to acquire a subscription are only thrown from here + // Errors that occur during the subscription are surfaced here and to the FanOutShardSubscription + // (errors are ignored here once the subscription is open) + if (waitForSubscriptionLatch.getCount() > 0) { + exception.set(e); + waitForSubscriptionLatch.countDown(); + } + }) + .subscriber(() -> subscription) + .build(); + + kinesis.subscribeToShard(request, responseHandler); + + waitForSubscriptionLatch.await(); + + Throwable throwable = exception.get(); + if (throwable != null) { + handleError(throwable); + } + + LOG.debug("Acquired subscription - {} ({})", shardId, consumerArn); + + // Request the first record to kick off consumption + // Following requests are made by the FanOutShardSubscription on the netty thread + subscription.requestRecord(); + } + + /** + * Update the reference to the latest networking error in this object. + * Parent caller can interrogate to decide how to handle error. + * + * @param throwable the exception that has occurred + */ + private void handleError(final Throwable throwable) throws FanOutSubscriberException { + Throwable cause; + if (throwable instanceof CompletionException || throwable instanceof ExecutionException) { + cause = throwable.getCause(); + } else { + cause = throwable; + } + + LOG.warn("Error occurred on EFO subscription: {} - ({}). {} ({})", + throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn, cause); + + throw new FanOutSubscriberException(cause); + } + + /** + * Once the subscription is open, records will be delivered to the {@link BlockingQueue}. + * Queue capacity is hardcoded to 1 record, the queue is used solely to separate consumption and processing. + * However, this buffer will result in latency reduction as records are pre-fetched as a result. + * This method will poll the queue and exit under any of these conditions: + * - {@code continuationSequenceNumber} is {@code null}, indicating the shard is complete + * - The subscription expires, indicated by a {@link SubscriptionCompleteEvent} + * - There is an error while consuming records, indicated by a {@link SubscriptionErrorEvent} + * + * @param eventConsumer the event consumer to deliver records to + * @return true if there are no more messages (complete), false if a subsequent subscription should be obtained + * @throws FanOutSubscriberException when an exception is propagated from the networking stack + * @throws InterruptedException + */ + private boolean consumeAllRecordsFromKinesisShard( + final Consumer<SubscribeToShardEvent> eventConsumer) throws InterruptedException, FanOutSubscriberException { + String continuationSequenceNumber = null; + + do { + // Read timeout will occur after 30 seconds, add a sanity timeout here to prevent lockup + FanOutSubscriptionEvent subscriptionEvent = queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS); + + if (subscriptionEvent == null) { + LOG.debug("Timed out polling events from network, reacquiring subscription - {} ({})", shardId, consumerArn); + return false; + } else if (subscriptionEvent.isSubscribeToShardEvent()) { + SubscribeToShardEvent event = subscriptionEvent.getSubscribeToShardEvent(); + continuationSequenceNumber = event.continuationSequenceNumber(); + if (!event.records().isEmpty()) { + eventConsumer.accept(event); + } + } else if (subscriptionEvent.isSubscriptionComplete()) { + // The subscription is complete, but the shard might not be, so we return incomplete + return false; + } else { + handleError(subscriptionEvent.getThrowable()); + return false; + } + } while (continuationSequenceNumber != null); + + return true; + } + + /** + * The {@link FanOutShardSubscription} subscribes to the events coming from KDS and adds them to the {@link BlockingQueue}. + * Backpressure is applied based on the maximum capacity of the queue. + * The {@link Subscriber} methods of this class are invoked by a thread from the {@link KinesisAsyncClient}. + */ + private class FanOutShardSubscription implements Subscriber<SubscribeToShardEventStream> { + + private Subscription subscription; + + private boolean cancelled = false; + + private final CountDownLatch waitForSubscriptionLatch; + + private final Object lockObject = new Object(); + + private FanOutShardSubscription(final CountDownLatch waitForSubscriptionLatch) { + this.waitForSubscriptionLatch = waitForSubscriptionLatch; + } + + /** + * Flag to the producer that we are ready to receive more events. + */ + public void requestRecord() { + if (!cancelled) { + LOG.debug("Requesting more records from EFO subscription - {} ({})", shardId, consumerArn); + subscription.request(1); + } + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + waitForSubscriptionLatch.countDown(); + } + + @Override + public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) { + subscribeToShardEventStream.accept(new SubscribeToShardResponseHandler.Visitor() { + @Override + public void visit(SubscribeToShardEvent event) { + synchronized (lockObject) { + if (enqueueEventWithRetry(new SubscriptionNextEvent(event))) { + requestRecord(); + } + } + } + }); + } + + @Override + public void onError(Throwable throwable) { + LOG.debug("Error occurred on EFO subscription: {} - ({}). {} ({})", + throwable.getClass().getName(), throwable.getMessage(), shardId, consumerArn); + + // Cancel the subscription to signal the onNext to stop queuing and requesting data + cancelSubscription(); + + synchronized (lockObject) { + // Empty the queue and add a poison pill to terminate this subscriber + // The synchronized block ensures that new data is not written in the meantime + queue.clear(); + enqueueEvent(new SubscriptionErrorEvent(throwable)); Review comment: In the case of backpressure in Flink's network stack, would this also mean that job failure is postponed until this `SubscriptionErrorEvent` is postponed until the source connector thread consumes it? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org