yihua commented on code in PR #18282:
URL: https://github.com/apache/hudi/pull/18282#discussion_r2912896777
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java:
##########
@@ -273,6 +274,15 @@ public class FlinkOptions extends HoodieConfig {
.defaultValue(0D)
.withDescription("Index state ttl in days, default stores the index
permanently");
+ @AdvancedConfig
+ public static final ConfigOption<String> INDEX_BOOTSTRAP_ROCKSDB_PATH =
ConfigOptions
+ .key("index.bootstrap.rocksdb.path")
+ .stringType()
+ .defaultValue(FileIOUtils.getDefaultSpillableMapBasePath())
+ .withDescription("Local directory path for RocksDB when "
Review Comment:
🤖 nit: missing space between the sentences — `"index type."` concatenated
with `"Each task manager"` produces `"index type.Each task manager"`.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java:
##########
@@ -234,6 +234,10 @@ public static HoodieWriteConfig getHoodieClientConfig(
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withEngineType(EngineType.FLINK) // this affects the default
value inference
.enable(conf.get(FlinkOptions.METADATA_ENABLED))
+ .withRecordIndexFileGroupCount(
+
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
"8")),
+
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),
Review Comment:
🤖 The hardcoded default `"8"` doesn't match the actual config default of
`10` in `GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP`. Could you use
the same `.defaultValue() + ""` pattern used for the max prop to stay
consistent and avoid drift?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java:
##########
@@ -143,9 +158,28 @@ public void awaitAllInstantsToCompleteIfNecessary() {
}
}
+ public void awaitPrevInstantsToComplete(long checkpointId) {
+ List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
+ if (!pendingInstants.isEmpty() &&
this.indexBootstrapGuardOption.isPresent()) {
+ this.indexBootstrapGuardOption.get().blockFor(() ->
getPendingInstantsBefore(checkpointId));
+ }
+ }
+
Review Comment:
🤖 There's a race between the `!writerBootstrapEventReceived` check here and
entering `blockUntil` — if `notifyWriterBootstrapEventReceived()` fires between
the two (sets the flag + calls `unblock`), the signal is lost and this thread
could block indefinitely. Could the check be moved inside the lock in
`blockUntil` to close this window?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/AbstractBootstrapOperator.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.bootstrap.aggregate.BootstrapAggFunction;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Base operator for bootstrap stages that emit preloaded index records.
+ */
+@Slf4j
+public abstract class AbstractBootstrapOperator
+ extends AbstractStreamOperator<HoodieFlinkInternalRow>
+ implements OneInputStreamOperator<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> {
+
+ protected final Configuration conf;
+
+ protected AbstractBootstrapOperator(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * The modifier of this method is updated to `protected` sink Flink 2.0,
here we overwrite the method
+ * with `public` modifier to make it compatible considering usage in
hudi-flink module.
+ */
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+ super.setup(containingTask, config, output);
+ }
+
+ @Override
+ public void processElement(StreamRecord<HoodieFlinkInternalRow> element)
throws Exception {
+ output.collect(element);
+ }
+
+ protected void waitForBootstrapReady(int taskID) {
+ GlobalAggregateManager aggregateManager =
getRuntimeContext().getGlobalAggregateManager();
+ int taskNum =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ int readyTaskNum = 1;
+ while (taskNum != readyTaskNum) {
+ try {
+ readyTaskNum = aggregateManager.updateGlobalAggregate(
+ BootstrapAggFunction.NAME + conf.get(FlinkOptions.TABLE_NAME),
+ taskID,
+ new BootstrapAggFunction());
+ log.info("Waiting for other bootstrap tasks to complete, taskId = {},
ready = {}/{}", taskID, readyTaskNum, taskNum);
+ TimeUnit.SECONDS.sleep(5);
+ } catch (Exception e) {
Review Comment:
🤖 Catching `Exception` here will swallow `InterruptedException` without
restoring the thread's interrupt flag. This can prevent Flink from shutting
down the task cleanly during cancellation or failover. Could you catch
`InterruptedException` separately and call `Thread.currentThread().interrupt()`
before continuing?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -355,6 +357,71 @@ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocatio
});
}
+ @Override
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocations(
+ SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>>
fileSlicesFilter) {
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Record index is not initialized in MDT");
+
+ return dataCleanupManager.ensureDataCleanupOnException(v -> {
+ // Get all file slices for the record index partition
+ List<FileSlice> fileSlices = partitionFileSliceMap.computeIfAbsent(
+ RECORD_INDEX.getPartitionPath(),
+ k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
+ metadataMetaClient, getMetadataFileSystemView(),
RECORD_INDEX.getPartitionPath()));
+
+ List<FileSlice> targetFileSlices = fileSlicesFilter.apply(fileSlices);
+ if (targetFileSlices.isEmpty()) {
+ return HoodieListPairData.eager(Collections.emptyList());
+ }
+
+ List<Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>>
iteratorSuppliers =
+ targetFileSlices.stream().map(targetFileSlice -> {
+ return new
Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>() {
+ @Override
+ public ClosableIterator<HoodieRecord<HoodieMetadataPayload>>
get() {
+ return scanRecordsItr(
+ targetFileSlice,
+ metadataRecord -> {
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
+ String rowKey = payload.key != null ? payload.key :
metadataRecord.get(KEY_FIELD_NAME).toString();
+ HoodieKey hoodieKey = new HoodieKey(rowKey,
RECORD_INDEX.getPartitionPath());
+ return new HoodieAvroRecord<>(hoodieKey, payload);
+ });
+ }
+ };
+ }).collect(Collectors.toList());
Review Comment:
🤖 nit: the anonymous `Supplier` class could be simplified to a lambda:
`(Supplier<ClosableIterator<HoodieRecord<HoodieMetadataPayload>>>) () ->
scanRecordsItr(targetFileSlice, metadataRecord -> { ... })`
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/EventBuffers.java:
##########
@@ -143,9 +158,28 @@ public void awaitAllInstantsToCompleteIfNecessary() {
}
}
+ public void awaitPrevInstantsToComplete(long checkpointId) {
+ List<String> pendingInstants = getPendingInstantsBefore(checkpointId);
+ if (!pendingInstants.isEmpty() &&
this.indexBootstrapGuardOption.isPresent()) {
+ this.indexBootstrapGuardOption.get().blockFor(() ->
getPendingInstantsBefore(checkpointId));
+ }
+ }
+
+ public void awaitWriterBootstrapEventReceived() {
+ if (this.writerBootstrapEventGuardOption.isPresent() &&
!writerBootstrapEventReceived) {
+ this.writerBootstrapEventGuardOption.get().blockUntil(() ->
!writerBootstrapEventReceived);
Review Comment:
🤖 I think the condition might be inverted here — `blockUntil(() ->
!writerBootstrapEventReceived)` reads as "block until the event is NOT
received", which is the opposite of the intent. Should this be `() ->
writerBootstrapEventReceived`? (Depends on whether `blockUntil` blocks until
the predicate is true or while it's true.)
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.partitioner.index;
+
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.util.collection.RocksDBDAO;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link IndexBackend} based on RocksDB.
+ */
+public class RocksDBIndexBackend implements IndexBackend {
+ private static final String COLUMN_FAMILY = "index_cache";
+
+ private final RocksDBDAO rocksDBDAO;
+
+ public RocksDBIndexBackend(String rocksDbBasePath) {
+ this.rocksDBDAO = new RocksDBDAO("hudi-index-backend", rocksDbBasePath);
Review Comment:
🤖 I see `RocksDBDAO.close()` deletes the underlying DB directory. Is that
the intended behavior here? On a Flink task restart with bootstrap enabled,
will the data always be re-bootstrapped so the deletion is safe? Just want to
confirm there's no scenario where a RocksDB backend is closed and then
re-opened expecting persisted data.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java:
##########
@@ -407,6 +440,41 @@ private void setupIndexWriteFunction() throws Exception {
indexWriteFunction.open(conf);
}
+ private void setupIndexBootstrapFunction() {
+ bootstrapOperator = Mockito.spy(new RLIBootstrapOperator(conf));
+ CollectOutputAdapter<HoodieFlinkInternalRow> output = new
CollectOutputAdapter<>();
+ bootstrapOperator.setup(streamTask, streamConfig, output);
+ ((RLIBootstrapOperator) bootstrapOperator).setCorrespondent(correspondent);
Review Comment:
🤖 When `subTaskFails` calls `setupIndexBootstrapFunction` a second time, the
previous `bootstrapInitializationFuture` may still be blocked on the
single-thread `bootstrapExecutor`. Since there's no cancellation of the old
future, the new `runAsync` task queues behind the stalled one and
`awaitBootstrapInitializationIfNecessary` on the new future will hang. Could
you cancel the old future (and possibly interrupt the executor thread) before
submitting new work here?
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java:
##########
@@ -383,7 +413,10 @@ public boolean isKeyInState(HoodieKey hoodieKey) {
}
public boolean isAlreadyBootstrap() throws Exception {
Review Comment:
🤖 For `RLIBootstrapOperator`, this always returns `true` (since the operator
is non-null after setup). Is that the intended semantics? It means
`assertBootstrapped()` would pass even if bootstrap hasn't actually loaded any
records yet.
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/event/Correspondent.java:
##########
@@ -100,6 +100,18 @@ public Map<Long, String> requestInflightInstants() {
}
}
+ /**
+ * Requests coordinator to wait until all pending instants are committed if
necessary.
+ */
+ public void awaitPendingInstantsCommitted(boolean isTaskFailover, long
checkpointId) {
+ try {
+ this.gateway.sendRequestToCoordinator(this.operatorID,
Review Comment:
🤖 The `.get()` call here has no timeout. If the coordinator never responds
(e.g., it's stuck or the operator ID is wrong), this will block the operator
initialization thread indefinitely. Could you add a timeout, similar to how
`requestInstant` or `requestInflightInstants` might handle it?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/CommitGuard.java:
##########
@@ -66,6 +68,46 @@ public void blockFor(String instants) {
}
}
+ /**
+ * Wait until the pending instants are committed.
+ *
+ * @param pendingInstants Supplier to get the pending instants
+ */
+ public void blockFor(Supplier<List<String>> pendingInstants) {
+ lock.lock();
+ long nanos = TimeUnit.MILLISECONDS.toNanos(commitAckTimeout);
+ try {
+ while (!pendingInstants.get().isEmpty()) {
Review Comment:
🤖 nit: The timeout error message calls `pendingInstants.get()` a second
time, which could return a different list than the one that triggered the
timeout. Consider capturing the list in a local variable at the top of the loop
iteration.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -355,6 +357,71 @@ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocatio
});
}
+ @Override
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndexLocations(
+ SerializableFunctionUnchecked<List<FileSlice>, List<FileSlice>>
fileSlicesFilter) {
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX),
+ "Record index is not initialized in MDT");
+
+ return dataCleanupManager.ensureDataCleanupOnException(v -> {
+ // Get all file slices for the record index partition
+ List<FileSlice> fileSlices = partitionFileSliceMap.computeIfAbsent(
+ RECORD_INDEX.getPartitionPath(),
+ k -> HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(
+ metadataMetaClient, getMetadataFileSystemView(),
RECORD_INDEX.getPartitionPath()));
Review Comment:
🤖 The file slices list passed to `fileSlicesFilter` comes from the shared
`partitionFileSliceMap` cache. If a caller's filter mutates the input list
in-place (rather than returning a new list), it would corrupt the cache for
subsequent calls. Would it be worth passing `new ArrayList<>(fileSlices)` here
as a defensive copy?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/RLIBootstrapOperator.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bootstrap;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.client.model.HoodieFlinkInternalRow;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.SerializableFunctionUnchecked;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.sink.event.Correspondent;
+import org.apache.hudi.sink.utils.OperatorIDGenerator;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.RuntimeContextUtils;
+
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * Bootstrap operator that loads record level index (RLI) data from metadata
table.
+ *
+ * <p>This operator reads index data from the record_index partition of the
metadata table.
+ * Each subtask reads one RLI partition (bucket) based on its task index,
enabling parallel loading.
+ *
+ * <p>The loaded index records are emitted downstream to initialize the index
state in
+ * {@link org.apache.hudi.sink.partitioner.BucketAssignFunction}.
+ */
+@Slf4j
+public class RLIBootstrapOperator
+ extends AbstractBootstrapOperator {
+
+ private final OperatorID dataWriteOperatorId;
+
+ private transient HoodieTableMetaClient metaClient;
+ private transient HoodieBackedTableMetadata metadataTable;
+ @Setter
+ private transient Correspondent correspondent;
+ private transient long loadedCnt;
+
+ /**
+ * The last checkpoint id, starts from -1.
+ */
+ private long checkpointId = -1;
+
+ /**
+ * List state of the JobID.
+ */
+ private transient ListState<JobID> jobIdState;
+
+ public RLIBootstrapOperator(Configuration conf) {
+ super(conf);
+ String writeOperatorUid = conf.get(FlinkOptions.WRITE_OPERATOR_UID);
+ ValidationUtils.checkArgument(writeOperatorUid != null,
+ "Write operator UID should not be null when index is Record Level
Index.");
+ this.dataWriteOperatorId = OperatorIDGenerator.fromUid(writeOperatorUid);
+ }
+
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<HoodieFlinkInternalRow>> output) {
+ super.setup(containingTask, config, output);
+ this.correspondent = Correspondent.getInstance(dataWriteOperatorId,
+
getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway());
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ this.jobIdState = context.getOperatorStateStore().getListState(
+ new ListStateDescriptor<>(
+ "job-id-state",
+ TypeInformation.of(JobID.class)
+ ));
+ loadedCnt = 0;
+
+ int attemptId = RuntimeContextUtils.getAttemptNumber(getRuntimeContext());
+ if (context.isRestored()) {
+ initCheckpointId(attemptId,
context.getRestoredCheckpointId().orElse(-1L));
+ }
+
+ if (context.isRestored()) {
+ // Wait for pending instants being committed successfully before loading
the record index
+ log.info("Waiting for pending instants committed in
StreamWriteOperatorCoordinator before RLI bootstrap.");
+ correspondent.awaitPendingInstantsCommitted(attemptId > 0, checkpointId);
Review Comment:
🤖 Is there a reason the two `context.isRestored()` checks are separate
blocks rather than combined? It reads a bit oddly having the same condition
checked back-to-back. If it's intentional (e.g., for future code between them),
a brief comment might help.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]