ljz2051 commented on code in PR #24739:
URL: https://github.com/apache/flink/pull/24739#discussion_r1590951299


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java:
##########
@@ -31,75 +30,50 @@
 /**
  * The general-purpose multiGet operation implementation for ForStDB, which 
simulates multiGet by
  * calling the Get API multiple times with multiple threads.
- *
- * @param <K> The type of key in get access request.
- * @param <V> The type of value in get access request.
  */
-public class ForStGeneralMultiGetOperation<K, V> implements 
ForStDBOperation<List<V>> {
+public class ForStGeneralMultiGetOperation implements ForStDBOperation {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class);
 
     private final RocksDB db;
 
-    private final List<GetRequest<K, V>> batchRequest;
+    private final List<ForStDBGetRequest<?, ?>> batchRequest;
 
     private final Executor executor;
 
     ForStGeneralMultiGetOperation(
-            RocksDB db, List<GetRequest<K, V>> batchRequest, Executor 
executor) {
+            RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor 
executor) {
         this.db = db;
         this.batchRequest = batchRequest;
         this.executor = executor;
     }
 
     @Override
-    public CompletableFuture<List<V>> process() {
+    public CompletableFuture<Void> process() {
 
-        CompletableFuture<List<V>> future = new CompletableFuture<>();
-        @SuppressWarnings("unchecked")
-        V[] result = (V[]) new Object[batchRequest.size()];
-        Arrays.fill(result, null);
+        CompletableFuture<Void> future = new CompletableFuture<>();
 
         AtomicInteger counter = new AtomicInteger(batchRequest.size());
         for (int i = 0; i < batchRequest.size(); i++) {
-            GetRequest<K, V> request = batchRequest.get(i);
-            final int index = i;
+            ForStDBGetRequest<?, ?> request = batchRequest.get(i);
             executor.execute(
                     () -> {
                         try {
-                            ForStInnerTable<K, V> table = request.table;
-                            byte[] key = table.serializeKey(request.key);
-                            byte[] value = 
db.get(table.getColumnFamilyHandle(), key);
-                            if (value != null) {
-                                result[index] = table.deserializeValue(value);
-                            }
+                            byte[] key = request.buildSerializedKey();
+                            byte[] value = 
db.get(request.getColumnFamilyHandle(), key);
+                            request.completeStateFuture(value);
                         } catch (Exception e) {
                             LOG.warn(
                                     "Error when process general multiGet 
operation for forStDB", e);
                             future.completeExceptionally(e);

Review Comment:
   Yes, I think we need to make `InternalStateFuture` `completeExceptionally`  
in this place.
   
   However, I found that the `AsyncExecutionController`  level currently lacks 
some exception handling mechanism about `InternalStateFuture`.   I tend to 
handle the failure of state request execution  from a  global view  in another 
separate PR.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to