mateczagany commented on code in PR #57:
URL: 
https://github.com/apache/flink-connector-hbase/pull/57#discussion_r2555154630


##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java:
##########
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and 
managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+        this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+    }
+
+    /**
+     * For a given list of HBase write futures, this method will 
asynchronously analyze their
+     * result, and using the provided {@link ResultHandler}, it will instruct 
{@link
+     * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry 
some mutations. In case
+     * of errors which should not be retried, the Flink job will stop with an 
error.
+     *
+     * @param futures list of HBase write futures
+     * @param processedMutationsInOrder list of mutations with their indices 
matching that of the
+     *     futures
+     * @param resultHandler result handler to manage retries and exceptions
+     */
+    public void handleWriteFutures(
+            List<CompletableFuture<Mutation>> futures,
+            List<SerializableMutation> processedMutationsInOrder,
+            ResultHandler<SerializableMutation> resultHandler) {
+        Preconditions.checkArgument(
+                futures.size() == processedMutationsInOrder.size(),
+                "Different number of HBase futures was supplied than 
mutations.");
+
+        ConcurrentLinkedQueue<FailedMutation> failedMutations = new 
ConcurrentLinkedQueue<>();
+
+        // Handle each future separately and store failures.
+        CompletableFuture<?>[] handledFutures = new 
CompletableFuture[futures.size()];
+        for (int i = 0; i < futures.size(); i++) {
+            final int index = i;
+            handledFutures[index] =
+                    futures.get(index)
+                            .exceptionally(
+                                    throwable -> {
+                                        failedMutations.add(
+                                                new FailedMutation(
+                                                        
processedMutationsInOrder.get(index),
+                                                        throwable));
+                                        return null;
+                                    });
+        }

Review Comment:
   I remember that I've spent some trying to make this cleaner, but this did 
not cross my mind, thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to