hudi-agent commented on code in PR #18821:
URL: https://github.com/apache/hudi/pull/18821#discussion_r3291595991
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -37,18 +38,21 @@
@Slf4j
public class RocksDBIndexBackend implements GlobalIndexBackend {
private static final String COLUMN_FAMILY = "index_cache";
+ private static final String BASE_PATH = "hudi-index-backend";
Review Comment:
🤖 nit: `BASE_PATH` reads like a filesystem path, but the value
`"hudi-index-backend"` is clearly a logical factory key. Something like
`FACTORY_KEY` or `INDEX_BACKEND_KEY` would make its purpose immediately obvious
to a future reader.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -37,18 +38,21 @@
@Slf4j
public class RocksDBIndexBackend implements GlobalIndexBackend {
private static final String COLUMN_FAMILY = "index_cache";
+ private static final String BASE_PATH = "hudi-index-backend";
private final RocksDBDAO rocksDBDAO;
+ private final String rocksDbBasePath;
private transient FlinkRocksDBIndexMetrics rocksDBIndexMetrics;
public RocksDBIndexBackend(String rocksDbBasePath, boolean
isPartitionedTable) {
+ this.rocksDbBasePath = rocksDbBasePath;
// Register custom serializer for HoodieRecordGlobalLocation to minimize
storage overhead
ConcurrentHashMap<String, CustomSerializer<?>> serializers = new
ConcurrentHashMap<>();
serializers.put(COLUMN_FAMILY, isPartitionedTable
? new CodedRecordGlobalLocationSerializer()
: new RecordGlobalLocationSerializer());
- this.rocksDBDAO = new RocksDBDAO("hudi-index-backend", rocksDbBasePath,
serializers, true);
+ this.rocksDBDAO = RocksDBDAOFactory.getOrCreate(BASE_PATH,
rocksDbBasePath, serializers, true);
this.rocksDBDAO.addColumnFamily(COLUMN_FAMILY);
Review Comment:
🤖 If `addColumnFamily` throws here (e.g. `RocksDBException` wrapped as
`HoodieException`, or the `checkArgument(!closed)` on a racing close),
`getOrCreate` has already incremented the factory's refCount but the
constructor fails so the caller never gets a backend to release. The cached
entry would then never be evicted. Could you wrap the call in a try/catch that
calls `RocksDBDAOFactory.release(...)` on failure?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -94,6 +98,6 @@ public double getRatioMetric(TickerType hitTicker, TickerType
missTicker) {
@Override
public void close() throws IOException {
- this.rocksDBDAO.close();
+ RocksDBDAOFactory.release(BASE_PATH, rocksDbBasePath);
Review Comment:
🤖 `close()` is no longer idempotent — the previous `rocksDBDAO.close()` was
guarded by `RocksDBDAO`'s internal `closed` flag. With multiple references via
the factory, a double `close()` on this backend would decrement the refCount
twice and could prematurely evict/close the shared DAO while another backend
still holds a reference. Could you guard with a local `closed` flag so
accidental double-close stays a no-op (per the `Closeable` contract)?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAOFactory.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.common.serialization.CustomSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Factory that creates and caches {@link RocksDBDAO} singletons keyed by
{@code (basePath, rocksDBBasePath)}.
+ *
+ * <p>Multiple callers that supply identical parameters share a single
underlying RocksDB instance.
+ * The factory tracks a reference count; the instance is physically closed and
evicted only when
+ * every holder has called {@link #release}.
+ *
+ * <p>Callers <em>must</em> call {@link #release} (not {@link
RocksDBDAO#close}) when they are
+ * done with the instance, so that the reference count is decremented
correctly.
+ *
+ * <p>Note: column-family serializers and the {@code disableWAL} flag are
applied only on the
+ * first creation for a given key. Subsequent callers that pass different
values for these
+ * parameters will share the instance created by the first caller.
+ */
+public class RocksDBDAOFactory {
+ private static final Object LOCK = new Object();
+ private static final Map<String, Entry> INSTANCES = new HashMap<>();
+
+ private RocksDBDAOFactory() {
+ }
+
+ /**
+ * Returns a shared {@link RocksDBDAO} for the given parameters, creating
one if absent.
+ *
+ * <p>Increments the internal reference count. Callers must invoke {@link
#release} with the
+ * same {@code basePath} and {@code rocksDBBasePath} when the instance is no
longer needed.
+ *
+ * @param basePath logical base path identifying the Hudi
table
+ * @param rocksDBBasePath filesystem path under which RocksDB stores
its data
+ * @param columnFamilySerializers per-column-family serializers (applied
only on first creation)
+ * @param disableWAL whether to disable the write-ahead log
(applied only on first creation)
+ * @return shared {@link RocksDBDAO} instance
+ */
+ public static RocksDBDAO getOrCreate(
+ String basePath,
+ String rocksDBBasePath,
+ ConcurrentHashMap<String, CustomSerializer<?>> columnFamilySerializers,
Review Comment:
🤖 nit: could you widen this parameter type to `Map<String,
CustomSerializer<?>>` instead of `ConcurrentHashMap`? The method only reads
from the map, so tying the public API to a specific implementation forces
callers to use `ConcurrentHashMap` even if they hold a plain `Map`.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]