mateczagany commented on code in PR #57:
URL:
https://github.com/apache/flink-connector-hbase/pull/57#discussion_r2555261639
##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java:
##########
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and
managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+ private final Counter numRecordsOutErrorsCounter;
+
+ public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+ this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+ }
+
+ /**
+ * For a given list of HBase write futures, this method will
asynchronously analyze their
+ * result, and using the provided {@link ResultHandler}, it will instruct
{@link
+ * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry
some mutations. In case
+ * of errors which should not be retried, the Flink job will stop with an
error.
+ *
+ * @param futures list of HBase write futures
+ * @param processedMutationsInOrder list of mutations with their indices
matching that of the
+ * futures
+ * @param resultHandler result handler to manage retries and exceptions
+ */
+ public void handleWriteFutures(
+ List<CompletableFuture<Mutation>> futures,
+ List<SerializableMutation> processedMutationsInOrder,
+ ResultHandler<SerializableMutation> resultHandler) {
+ Preconditions.checkArgument(
+ futures.size() == processedMutationsInOrder.size(),
+ "Different number of HBase futures was supplied than
mutations.");
+
+ ConcurrentLinkedQueue<FailedMutation> failedMutations = new
ConcurrentLinkedQueue<>();
+
+ // Handle each future separately and store failures.
+ CompletableFuture<?>[] handledFutures = new
CompletableFuture[futures.size()];
+ for (int i = 0; i < futures.size(); i++) {
+ final int index = i;
+ handledFutures[index] =
+ futures.get(index)
+ .exceptionally(
+ throwable -> {
+ failedMutations.add(
+ new FailedMutation(
+
processedMutationsInOrder.get(index),
+ throwable));
+ return null;
+ });
+ }
+
+ // Exceptions are already handled here, so it's safe to use
`thenRun()`.
+ CompletableFuture.allOf(handledFutures)
+ .thenRun(
+ () -> {
+ handleFailedRequests(failedMutations,
resultHandler);
+ });
+ }
+
+ /**
+ * Handles mutations that failed to write to HBase.
+ *
+ * <p>This method increments the error counter and schedules the failed
mutations for retry
+ * through the result handler. If the exception should not be retried, the
job will fail instead
+ * with an exception.
+ *
+ * @param failedMutations the list of mutations that failed to write
+ * @param resultHandler the handler responsible for retry logic
+ */
+ private void handleFailedRequests(
Review Comment:
That can definitely happen. I don't know if that's a bad thing, or how often
can a job actually run into a non-fatal failure, e.g.
`MasterNotRunningException`.
IIRC, the Async Sink hangs checkpoints until all in-flight requests are
finished, but won't block a checkpoint from finishing if there are any failed,
retryable requests. It will add those requests to its Deque at the head, and
retry them after `maxTimeInBufferMS` in processing time.
I don't think it's appropriate to drop these requests, maybe it could be
configurable to fail the job if any exception is thrown? (even if theoretically
retryable according to the HBase client)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]