yihua commented on code in PR #18282:
URL: https://github.com/apache/hudi/pull/18282#discussion_r2912896777


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -273,6 +274,15 @@ public class FlinkOptions extends HoodieConfig {
       .defaultValue(0D)
       .withDescription("Index state ttl in days, default stores the index 
permanently");
 
+  @AdvancedConfig
+  public static final ConfigOption<String> INDEX_BOOTSTRAP_ROCKSDB_PATH = 
ConfigOptions
+      .key("index.bootstrap.rocksdb.path")
+      .stringType()
+      .defaultValue(FileIOUtils.getDefaultSpillableMapBasePath())
+      .withDescription("Local directory path for RocksDB when "

Review Comment:
   🤖 nit: missing space between the sentences — `"index type."` concatenated 
with `"Each task manager"` produces `"index type.Each task manager"`.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java:
##########
@@ -234,6 +234,10 @@ public static HoodieWriteConfig getHoodieClientConfig(
             .withMetadataConfig(HoodieMetadataConfig.newBuilder()
                 .withEngineType(EngineType.FLINK) // this affects the default 
value inference
                 .enable(conf.get(FlinkOptions.METADATA_ENABLED))
+                .withRecordIndexFileGroupCount(
+                    
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
 "8")),
+                    
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),

Review Comment:
   🤖 The hardcoded default `"8"` doesn't match the actual config default of 
`10` in `GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP`. Could you use 
the same `.defaultValue() + ""` pattern used for the max prop to stay 
consistent and avoid drift?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java:
##########
@@ -143,9 +158,28 @@ public void awaitAllInstantsToCompleteIfNecessary() {
     }
   }
 
+  public void awaitPrevInstantsToComplete(long checkpointId) {
+    List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
+    if (!pendingInstants.isEmpty() && 
this.indexBootstrapGuardOption.isPresent()) {
+      this.indexBootstrapGuardOption.get().blockFor(() -> 
getPendingInstantsBefore(checkpointId));
+    }
+  }
+

Review Comment:
   🤖 There's a race between the `!writerBootstrapEventReceived` check here and 
entering `blockUntil` — if `notifyWriterBootstrapEventReceived()` fires between 
the two (sets the flag + calls `unblock`), the signal is lost and this thread 
could block indefinitely. Could the check be moved inside the lock in 
`blockUntil` to close this window?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base operator for bootstrap stages that emit preloaded index records.
+ */
+@Slf4j
+public abstract class AbstractBootstrapOperator
+    extends AbstractStreamOperator<HoodieFlinkInternalRow>
+    implements OneInputStreamOperator<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> {
+
+  protected final Configuration conf;
+
+  protected AbstractBootstrapOperator(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * The modifier of this method is updated to `protected` sink Flink 2.0, 
here we overwrite the method
+   * with `public` modifier to make it compatible considering usage in 
hudi-flink module.
+   */
+  @Override
+  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+    super.setup(containingTask, config, output);
+  }
+
+  @Override
+  public void processElement(StreamRecord<HoodieFlinkInternalRow> element) 
throws Exception {
+    output.collect(element);
+  }
+
+  protected void waitForBootstrapReady(int taskID) {
+    GlobalAggregateManager aggregateManager = 
getRuntimeContext().getGlobalAggregateManager();
+    int taskNum = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+    int readyTaskNum = 1;
+    while (taskNum != readyTaskNum) {
+      try {
+        readyTaskNum = aggregateManager.updateGlobalAggregate(
+            BootstrapAggFunction.NAME + conf.get(FlinkOptions.TABLE_NAME),
+            taskID,
+            new BootstrapAggFunction());
+        log.info("Waiting for other bootstrap tasks to complete, taskId = {}, 
ready = {}/{}", taskID, readyTaskNum, taskNum);
+        TimeUnit.SECONDS.sleep(5);
+      } catch (Exception e) {

Review Comment:
   🤖 Catching `Exception` here will swallow `InterruptedException` without 
restoring the thread's interrupt flag. This can prevent Flink from shutting 
down the task cleanly during cancellation or failover. Could you catch 
`InterruptedException` separately and call `Thread.currentThread().interrupt()` 
before continuing?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -355,6 +357,71 @@ public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocatio
     });
   }
 
+  @Override
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocations(
+      SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>> 
fileSlicesFilter) {
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+        "Record index is not initialized in MDT");
+
+    return dataCleanupManager.ensureDataCleanupOnException(v -> {
+      // Get all file slices for the record index partition
+      List<FileSlice> fileSlices = partitionFileSliceMap.computeIfAbsent(
+          RECORD_INDEX.getPartitionPath(),
+          k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
+              metadataMetaClient, getMetadataFileSystemView(), 
RECORD_INDEX.getPartitionPath()));
+
+      List<FileSlice> targetFileSlices = fileSlicesFilter.apply(fileSlices);
+      if (targetFileSlices.isEmpty()) {
+        return HoodieListPairData.eager(Collections.emptyList());
+      }
+
+      List<Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>> 
iteratorSuppliers =
+          targetFileSlices.stream().map(targetFileSlice -> {
+            return new 
Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>() {
+              @Override
+              public ClosableIterator<HoodieRecord<HoodieMetadataPayload>> 
get() {
+                return scanRecordsItr(
+                    targetFileSlice,
+                    metadataRecord -> {
+                      HoodieMetadataPayload payload = new 
HoodieMetadataPayload(Option.of(metadataRecord));
+                      String rowKey = payload.key != null ? payload.key : 
metadataRecord.get(KEY_FIELD_NAME).toString();
+                      HoodieKey hoodieKey = new HoodieKey(rowKey, 
RECORD_INDEX.getPartitionPath());
+                      return new HoodieAvroRecord<>(hoodieKey, payload);
+                    });
+              }
+            };
+          }).collect(Collectors.toList());

Review Comment:
   🤖 nit: the anonymous `Supplier` class could be simplified to a lambda: 
`(Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>) () -> 
scanRecordsItr(targetFileSlice, metadataRecord -> { ... })`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java:
##########
@@ -143,9 +158,28 @@ public void awaitAllInstantsToCompleteIfNecessary() {
     }
   }
 
+  public void awaitPrevInstantsToComplete(long checkpointId) {
+    List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
+    if (!pendingInstants.isEmpty() && 
this.indexBootstrapGuardOption.isPresent()) {
+      this.indexBootstrapGuardOption.get().blockFor(() -> 
getPendingInstantsBefore(checkpointId));
+    }
+  }
+
+  public void awaitWriterBootstrapEventReceived() {
+    if (this.writerBootstrapEventGuardOption.isPresent() && 
!writerBootstrapEventReceived) {
+      this.writerBootstrapEventGuardOption.get().blockUntil(() -> 
!writerBootstrapEventReceived);

Review Comment:
   🤖 I think the condition might be inverted here — `blockUntil(() -> 
!writerBootstrapEventReceived)` reads as "block until the event is NOT 
received", which is the opposite of the intent. Should this be `() -> 
writerBootstrapEventReceived`? (Depends on whether `blockUntil` blocks until 
the predicate is true or while it's true.)



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.RocksDBDAO;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link IndexBackend} based on RocksDB.
+ */
+public class RocksDBIndexBackend implements IndexBackend {
+  private static final String COLUMN_FAMILY = "index_cache";
+
+  private final RocksDBDAO rocksDBDAO;
+
+  public RocksDBIndexBackend(String rocksDbBasePath) {
+    this.rocksDBDAO = new RocksDBDAO("hudi-index-backend", rocksDbBasePath);

Review Comment:
   🤖 I see `RocksDBDAO.close()` deletes the underlying DB directory. Is that 
the intended behavior here? On a Flink task restart with bootstrap enabled, 
will the data always be re-bootstrapped so the deletion is safe? Just want to 
confirm there's no scenario where a RocksDB backend is closed and then 
re-opened expecting persisted data.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java:
##########
@@ -407,6 +440,41 @@ private void setupIndexWriteFunction() throws Exception {
     indexWriteFunction.open(conf);
   }
 
+  private void setupIndexBootstrapFunction() {
+    bootstrapOperator = Mockito.spy(new RLIBootstrapOperator(conf));
+    CollectOutputAdapter<HoodieFlinkInternalRow> output = new 
CollectOutputAdapter<>();
+    bootstrapOperator.setup(streamTask, streamConfig, output);
+    ((RLIBootstrapOperator) bootstrapOperator).setCorrespondent(correspondent);

Review Comment:
   🤖 When `subTaskFails` calls `setupIndexBootstrapFunction` a second time, the 
previous `bootstrapInitializationFuture` may still be blocked on the 
single-thread `bootstrapExecutor`. Since there's no cancellation of the old 
future, the new `runAsync` task queues behind the stalled one and 
`awaitBootstrapInitializationIfNecessary` on the new future will hang. Could 
you cancel the old future (and possibly interrupt the executor thread) before 
submitting new work here?



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java:
##########
@@ -383,7 +413,10 @@ public boolean isKeyInState(HoodieKey hoodieKey) {
   }
 
   public boolean isAlreadyBootstrap() throws Exception {

Review Comment:
   🤖 For `RLIBootstrapOperator`, this always returns `true` (since the operator 
is non-null after setup). Is that the intended semantics? It means 
`assertBootstrapped()` would pass even if bootstrap hasn't actually loaded any 
records yet.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java:
##########
@@ -100,6 +100,18 @@ public Map<Long, String> requestInflightInstants() {
     }
   }
 
+  /**
+   * Requests coordinator to wait until all pending instants are committed if 
necessary.
+   */
+  public void awaitPendingInstantsCommitted(boolean isTaskFailover, long 
checkpointId) {
+    try {
+      this.gateway.sendRequestToCoordinator(this.operatorID,

Review Comment:
   🤖 The `.get()` call here has no timeout. If the coordinator never responds 
(e.g., it's stuck or the operator ID is wrong), this will block the operator 
initialization thread indefinitely. Could you add a timeout, similar to how 
`requestInstant` or `requestInflightInstants` might handle it?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java:
##########
@@ -66,6 +68,46 @@ public void blockFor(String instants) {
     }
   }
 
+  /**
+   * Wait until the pending instants are committed.
+   *
+   * @param pendingInstants Supplier to get the pending instants
+   */
+  public void blockFor(Supplier<List<String>> pendingInstants) {
+    lock.lock();
+    long nanos = TimeUnit.MILLISECONDS.toNanos(commitAckTimeout);
+    try {
+      while (!pendingInstants.get().isEmpty()) {

Review Comment:
   🤖 nit: The timeout error message calls `pendingInstants.get()` a second 
time, which could return a different list than the one that triggered the 
timeout. Consider capturing the list in a local variable at the top of the loop 
iteration.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -355,6 +357,71 @@ public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocatio
     });
   }
 
+  @Override
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndexLocations(
+      SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>> 
fileSlicesFilter) {
+    
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+        "Record index is not initialized in MDT");
+
+    return dataCleanupManager.ensureDataCleanupOnException(v -> {
+      // Get all file slices for the record index partition
+      List<FileSlice> fileSlices = partitionFileSliceMap.computeIfAbsent(
+          RECORD_INDEX.getPartitionPath(),
+          k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
+              metadataMetaClient, getMetadataFileSystemView(), 
RECORD_INDEX.getPartitionPath()));

Review Comment:
   🤖 The file slices list passed to `fileSlicesFilter` comes from the shared 
`partitionFileSliceMap` cache. If a caller's filter mutates the input list 
in-place (rather than returning a new list), it would corrupt the cache for 
subsequent calls. Would it be worth passing `new ArrayList<>(fileSlices)` here 
as a defensive copy?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.utils.OperatorIDGenerator;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Bootstrap operator that loads record level index (RLI) data from metadata 
table.
+ *
+ * <p>This operator reads index data from the record_index partition of the 
metadata table.
+ * Each subtask reads one RLI partition (bucket) based on its task index, 
enabling parallel loading.
+ *
+ * <p>The loaded index records are emitted downstream to initialize the index 
state in
+ * {@link org.apache.hudi.sink.partitioner.BucketAssignFunction}.
+ */
+@Slf4j
+public class RLIBootstrapOperator
+    extends AbstractBootstrapOperator {
+
+  private final OperatorID dataWriteOperatorId;
+
+  private transient HoodieTableMetaClient metaClient;
+  private transient HoodieBackedTableMetadata metadataTable;
+  @Setter
+  private transient Correspondent correspondent;
+  private transient long loadedCnt;
+
+  /**
+   * The last checkpoint id, starts from -1.
+   */
+  private long checkpointId = -1;
+
+  /**
+   * List state of the JobID.
+   */
+  private transient ListState<JobID> jobIdState;
+
+  public RLIBootstrapOperator(Configuration conf) {
+    super(conf);
+    String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
+    ValidationUtils.checkArgument(writeOperatorUid != null,
+        "Write operator UID should not be null when index is Record Level 
Index.");
+    this.dataWriteOperatorId = OperatorIDGenerator.fromUid(writeOperatorUid);
+  }
+
+  @Override
+  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+    super.setup(containingTask, config, output);
+    this.correspondent = Correspondent.getInstance(dataWriteOperatorId,
+        
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway());
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    this.jobIdState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "job-id-state",
+            TypeInformation.of(JobID.class)
+        ));
+    loadedCnt = 0;
+
+    int attemptId = RuntimeContextUtils.getAttemptNumber(getRuntimeContext());
+    if (context.isRestored()) {
+      initCheckpointId(attemptId, 
context.getRestoredCheckpointId().orElse(-1L));
+    }
+
+    if (context.isRestored()) {
+      // Wait for pending instants being committed successfully before loading 
the record index
+      log.info("Waiting for pending instants committed in 
StreamWriteOperatorCoordinator before RLI bootstrap.");
+      correspondent.awaitPendingInstantsCommitted(attemptId > 0, checkpointId);

Review Comment:
   🤖 Is there a reason the two `context.isRestored()` checks are separate 
blocks rather than combined? It reads a bit oddly having the same condition 
checked back-to-back. If it's intentional (e.g., for future code between them), 
a brief comment might help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to