ferenc-csaky commented on code in PR #57:
URL: 
https://github.com/apache/flink-connector-hbase/pull/57#discussion_r2537016822


##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseMutationSerialization.java:
##########
@@ -0,0 +1,45 @@
+package org.apache.flink.connector.hbase.util;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Internal utility class for serializing and deserializing HBase mutations.
+ *
+ * <p>This class provides methods to convert HBase {@link Mutation} objects to 
and from their
+ * Protocol Buffer representations for transmission over the wire or storage. 
It supports the
+ * following HBase mutation types: {@link Put} and {@link Delete}.
+ */
+@Internal
+public class HBaseMutationSerialization {

Review Comment:
   Any reason to not put this logic to `HBaseStateSerializer`? That's the only 
class using this util, so I do not see why we need this separate util.
   
   Also, would it make sense to add some unit tests for this ser/deser logic, 
or its functionality already tested properly by IT cases?



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java:
##########
@@ -0,0 +1,27 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+import java.io.Serializable;
+
+/**
+ * This class is used by {@link HBaseSink} and {@link HBaseWriter} to wrap 
HBase {@link Mutation}
+ * objects to be able to serialize them.
+ */
+@Internal
+public class SerializableMutation implements Serializable {

Review Comment:
   Maybe put this to `util` package instead?



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java:
##########
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and 
managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+        this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+    }
+
+    /**
+     * For a given list of HBase write futures, this method will 
asynchronously analyze their
+     * result, and using the provided {@link ResultHandler}, it will instruct 
{@link
+     * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry 
some mutations. In case
+     * of errors which should not be retried, the Flink job will stop with an 
error.
+     *
+     * @param futures list of HBase write futures
+     * @param processedMutationsInOrder list of mutations with their indices 
matching that of the
+     *     futures
+     * @param resultHandler result handler to manage retries and exceptions
+     */
+    public void handleWriteFutures(
+            List<CompletableFuture<Mutation>> futures,
+            List<SerializableMutation> processedMutationsInOrder,
+            ResultHandler<SerializableMutation> resultHandler) {
+        Preconditions.checkArgument(
+                futures.size() == processedMutationsInOrder.size(),
+                "Different number of HBase futures was supplied than 
mutations.");
+
+        ConcurrentLinkedQueue<FailedMutation> failedMutations = new 
ConcurrentLinkedQueue<>();

Review Comment:
   nit: `Queue<FailedMutation>`



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java:
##########
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and 
managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+        this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+    }
+
+    /**
+     * For a given list of HBase write futures, this method will 
asynchronously analyze their
+     * result, and using the provided {@link ResultHandler}, it will instruct 
{@link
+     * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry 
some mutations. In case
+     * of errors which should not be retried, the Flink job will stop with an 
error.
+     *
+     * @param futures list of HBase write futures
+     * @param processedMutationsInOrder list of mutations with their indices 
matching that of the
+     *     futures
+     * @param resultHandler result handler to manage retries and exceptions
+     */
+    public void handleWriteFutures(
+            List<CompletableFuture<Mutation>> futures,
+            List<SerializableMutation> processedMutationsInOrder,
+            ResultHandler<SerializableMutation> resultHandler) {
+        Preconditions.checkArgument(
+                futures.size() == processedMutationsInOrder.size(),
+                "Different number of HBase futures was supplied than 
mutations.");
+
+        ConcurrentLinkedQueue<FailedMutation> failedMutations = new 
ConcurrentLinkedQueue<>();
+
+        // Handle each future separately and store failures.
+        CompletableFuture<?>[] handledFutures = new 
CompletableFuture[futures.size()];
+        for (int i = 0; i < futures.size(); i++) {
+            final int index = i;
+            handledFutures[index] =
+                    futures.get(index)
+                            .exceptionally(
+                                    throwable -> {
+                                        failedMutations.add(
+                                                new FailedMutation(
+                                                        
processedMutationsInOrder.get(index),
+                                                        throwable));
+                                        return null;
+                                    });
+        }
+
+        // Exceptions are already handled here, so it's safe to use 
`thenRun()`.
+        CompletableFuture.allOf(handledFutures)
+                .thenRun(
+                        () -> {
+                            handleFailedRequests(failedMutations, 
resultHandler);
+                        });
+    }
+
+    /**
+     * Handles mutations that failed to write to HBase.
+     *
+     * <p>This method increments the error counter and schedules the failed 
mutations for retry
+     * through the result handler. If the exception should not be retried, the 
job will fail instead
+     * with an exception.
+     *
+     * @param failedMutations the list of mutations that failed to write
+     * @param resultHandler the handler responsible for retry logic
+     */
+    private void handleFailedRequests(

Review Comment:
   Can this get into an endless loop when some mutations always fail and we do 
not detect the given ex as fatal? :)



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java:
##########
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and 
managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+        this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+    }
+
+    /**
+     * For a given list of HBase write futures, this method will 
asynchronously analyze their
+     * result, and using the provided {@link ResultHandler}, it will instruct 
{@link
+     * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry 
some mutations. In case
+     * of errors which should not be retried, the Flink job will stop with an 
error.
+     *
+     * @param futures list of HBase write futures
+     * @param processedMutationsInOrder list of mutations with their indices 
matching that of the
+     *     futures
+     * @param resultHandler result handler to manage retries and exceptions
+     */
+    public void handleWriteFutures(
+            List<CompletableFuture<Mutation>> futures,
+            List<SerializableMutation> processedMutationsInOrder,
+            ResultHandler<SerializableMutation> resultHandler) {
+        Preconditions.checkArgument(
+                futures.size() == processedMutationsInOrder.size(),
+                "Different number of HBase futures was supplied than 
mutations.");
+
+        ConcurrentLinkedQueue<FailedMutation> failedMutations = new 
ConcurrentLinkedQueue<>();
+
+        // Handle each future separately and store failures.
+        CompletableFuture<?>[] handledFutures = new 
CompletableFuture[futures.size()];
+        for (int i = 0; i < futures.size(); i++) {
+            final int index = i;
+            handledFutures[index] =
+                    futures.get(index)
+                            .exceptionally(
+                                    throwable -> {
+                                        failedMutations.add(
+                                                new FailedMutation(
+                                                        
processedMutationsInOrder.get(index),
+                                                        throwable));
+                                        return null;
+                                    });
+        }
+
+        // Exceptions are already handled here, so it's safe to use 
`thenRun()`.
+        CompletableFuture.allOf(handledFutures)
+                .thenRun(
+                        () -> {

Review Comment:
   nit: remove curly braces



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/WrappedElementConverter.java:
##########
@@ -0,0 +1,36 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+/**
+ * This is a helper class used to wrap an {@link ElementConverter} supplied by 
the user that
+ * converts the input data to {@link Mutation}. With this class, the elements 
will be seamlessly
+ * converted to internal {@link SerializableMutation} objects that can be 
serialized by the sink.
+ */
+@Internal
+public class WrappedElementConverter<InputT>

Review Comment:
   Maybe put this to the `util` package?



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java:
##########
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and 
managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+        this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+    }
+
+    /**
+     * For a given list of HBase write futures, this method will 
asynchronously analyze their
+     * result, and using the provided {@link ResultHandler}, it will instruct 
{@link
+     * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry 
some mutations. In case
+     * of errors which should not be retried, the Flink job will stop with an 
error.
+     *
+     * @param futures list of HBase write futures
+     * @param processedMutationsInOrder list of mutations with their indices 
matching that of the
+     *     futures
+     * @param resultHandler result handler to manage retries and exceptions
+     */
+    public void handleWriteFutures(
+            List<CompletableFuture<Mutation>> futures,
+            List<SerializableMutation> processedMutationsInOrder,
+            ResultHandler<SerializableMutation> resultHandler) {
+        Preconditions.checkArgument(

Review Comment:
   nit: static import `Preconditions`



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/SerializableMutation.java:
##########
@@ -0,0 +1,27 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.hadoop.hbase.client.Mutation;
+
+import java.io.Serializable;
+
+/**
+ * This class is used by {@link HBaseSink} and {@link HBaseWriter} to wrap 
HBase {@link Mutation}
+ * objects to be able to serialize them.
+ */
+@Internal
+public class SerializableMutation implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private transient Mutation mutation;

Review Comment:
   This can be `final`.



##########
flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/sink/HBaseWriterAsyncHandler.java:
##########
@@ -0,0 +1,156 @@
+package org.apache.flink.connector.hbase.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
+/**
+ * This class is responsible for managing the async calls to HBase and 
managing the {@link
+ * ResultHandler} to decide which request can be retried.
+ */
+@Internal
+public class HBaseWriterAsyncHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseWriterAsyncHandler.class);
+
+    private final Counter numRecordsOutErrorsCounter;
+
+    public HBaseWriterAsyncHandler(Counter numRecordsOutErrorsCounter) {
+        this.numRecordsOutErrorsCounter = numRecordsOutErrorsCounter;
+    }
+
+    /**
+     * For a given list of HBase write futures, this method will 
asynchronously analyze their
+     * result, and using the provided {@link ResultHandler}, it will instruct 
{@link
+     * org.apache.flink.connector.base.sink.writer.AsyncSinkWriter} to retry 
some mutations. In case
+     * of errors which should not be retried, the Flink job will stop with an 
error.
+     *
+     * @param futures list of HBase write futures
+     * @param processedMutationsInOrder list of mutations with their indices 
matching that of the
+     *     futures
+     * @param resultHandler result handler to manage retries and exceptions
+     */
+    public void handleWriteFutures(
+            List<CompletableFuture<Mutation>> futures,
+            List<SerializableMutation> processedMutationsInOrder,
+            ResultHandler<SerializableMutation> resultHandler) {
+        Preconditions.checkArgument(
+                futures.size() == processedMutationsInOrder.size(),
+                "Different number of HBase futures was supplied than 
mutations.");
+
+        ConcurrentLinkedQueue<FailedMutation> failedMutations = new 
ConcurrentLinkedQueue<>();
+
+        // Handle each future separately and store failures.
+        CompletableFuture<?>[] handledFutures = new 
CompletableFuture[futures.size()];
+        for (int i = 0; i < futures.size(); i++) {
+            final int index = i;
+            handledFutures[index] =
+                    futures.get(index)
+                            .exceptionally(
+                                    throwable -> {
+                                        failedMutations.add(
+                                                new FailedMutation(
+                                                        
processedMutationsInOrder.get(index),
+                                                        throwable));
+                                        return null;
+                                    });
+        }

Review Comment:
   Maybe doing it via the Java stream API and adding a separate method as the 
body of exceptionally makes it a bit cleaner:
   ```java
   // Handle each future separately and store failures.
   CompletableFuture<?>[] handledFutures =
           IntStream.range(0, futures.size())
                   .mapToObj(
                           i ->
                                   addFailedCallback(
                                           futures.get(i),
                                           failedMutations,
                                           processedMutationsInOrder.get(i)))
                   .toArray(CompletableFuture[]::new);
   
   ...
   
   private CompletableFuture<Mutation> addFailedCallback(
           CompletableFuture<Mutation> cf,
           Queue<FailedMutation> failedMutations,
           SerializableMutation failedMutation) {
   
       return cf.exceptionally(
               throwable -> {
                   failedMutations.add(new FailedMutation(failedMutation, 
throwable));
                   return null;
               });
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to