masteryhx commented on code in PR #24739: URL: https://github.com/apache/flink/pull/24739#discussion_r1592121657
########## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStGeneralMultiGetOperation.java: ########## @@ -31,75 +30,50 @@ /** * The general-purpose multiGet operation implementation for ForStDB, which simulates multiGet by * calling the Get API multiple times with multiple threads. - * - * @param <K> The type of key in get access request. - * @param <V> The type of value in get access request. */ -public class ForStGeneralMultiGetOperation<K, V> implements ForStDBOperation<List<V>> { +public class ForStGeneralMultiGetOperation implements ForStDBOperation { private static final Logger LOG = LoggerFactory.getLogger(ForStGeneralMultiGetOperation.class); private final RocksDB db; - private final List<GetRequest<K, V>> batchRequest; + private final List<ForStDBGetRequest<?, ?>> batchRequest; private final Executor executor; ForStGeneralMultiGetOperation( - RocksDB db, List<GetRequest<K, V>> batchRequest, Executor executor) { + RocksDB db, List<ForStDBGetRequest<?, ?>> batchRequest, Executor executor) { this.db = db; this.batchRequest = batchRequest; this.executor = executor; } @Override - public CompletableFuture<List<V>> process() { + public CompletableFuture<Void> process() { - CompletableFuture<List<V>> future = new CompletableFuture<>(); - @SuppressWarnings("unchecked") - V[] result = (V[]) new Object[batchRequest.size()]; - Arrays.fill(result, null); + CompletableFuture<Void> future = new CompletableFuture<>(); AtomicInteger counter = new AtomicInteger(batchRequest.size()); for (int i = 0; i < batchRequest.size(); i++) { - GetRequest<K, V> request = batchRequest.get(i); - final int index = i; + ForStDBGetRequest<?, ?> request = batchRequest.get(i); executor.execute( () -> { try { - ForStInnerTable<K, V> table = request.table; - byte[] key = table.serializeKey(request.key); - byte[] value = db.get(table.getColumnFamilyHandle(), key); - if (value != null) { - result[index] = table.deserializeValue(value); - } + byte[] key = request.buildSerializedKey(); + byte[] value = db.get(request.getColumnFamilyHandle(), key); + request.completeStateFuture(value); } catch (Exception e) { LOG.warn( "Error when process general multiGet operation for forStDB", e); future.completeExceptionally(e); Review Comment: Okay, I think it's reasonble to consider it together with handling mechanism of AEC later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org