reswqa commented on code in PR #28402:
URL: https://github.com/apache/flink/pull/28402#discussion_r3426875595


##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/ArchiveStorageTest.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Basic tests for {@link ArchiveStorage} implementations.
+ *
+ * <p>Subclasses need to provide a concrete {@link ArchiveStorage} instance 
via {@link
+ * #createStorage()}. All assertions in this base class go through the {@link 
ArchiveStorage}
+ * interface itself (e.g. {@link ArchiveStorage#exists(String)}), so this test 
class is independent
+ * of any specific storage backend (file system, RocksDB, etc.).
+ *
+ * @param <T> entry type returned by the storage under test
+ */
+abstract class ArchiveStorageTest<T> {
+
+    protected ArchiveStorage<T> storage;
+
+    /**
+     * Creates {@link ArchiveStorage} instance for a single test.
+     *
+     * @return storage instance
+     */
+    protected abstract ArchiveStorage<T> createStorage() throws Exception;
+
+    /** Reads the textual content of a storage entry. */
+    protected abstract String readContent(T entry) throws Exception;

Review Comment:
   If we introduce a method like `getAsContent` in the `ArchivedStore` that 
directly returns the string, then this method and the generic type of the test 
class would no longer be necessary. Furthermore, we should be able to convert 
it into parameterized test class.



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/AbstractHistoryServerHandlerTest.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.webmonitor.testutils.HttpUtils;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Common HTTP-level tests for {@link AbstractHistoryServerHandler} 
subclasses. Subclasses only need
+ * to provide a concrete handler instance via {@link #createHandler(File)}.
+ */
+abstract class AbstractHistoryServerHandlerTest {
+
+    /** Creates the concrete handler under test, bound to the given web 
directory. */
+    protected abstract AbstractHistoryServerHandler<?> createHandler(File 
webDir) throws Exception;
+
+    /**
+     * Tests requests against static files served from the {@code web/} 
resources packaged with the
+     * handler module:
+     *
+     * <ul>
+     *   <li>a request for {@code /} is rewritten to {@code /index.html};
+     *   <li>{@code /index.html} is loaded via the classloader fallback when 
not present on disk;
+     *   <li>a missing static file (e.g. {@code /hello.html}) results in 404.
+     * </ul>
+     */
+    @Test
+    void testRespondWithStaticFile(@TempDir Path tmpDir) throws Exception {
+        runWithTestBody(
+                tmpDir,
+                (webDir, handler, baseUrl) -> {
+                    // /index.html is loaded from the classloader 
(web/index.html) and served
+                    Tuple2<Integer, String> index = 
HttpUtils.getFromHTTP(baseUrl + "/index.html");
+                    assertThat(index.f0).isEqualTo(200);
+                    assertThat(index.f1).contains("Apache Flink Web 
Dashboard");
+
+                    // a trailing slash is rewritten to "/index.html"
+                    Tuple2<Integer, String> index2 = 
HttpUtils.getFromHTTP(baseUrl + "/");
+                    assertThat(index2).isEqualTo(index);
+
+                    // a static file that is neither on disk nor in the 
classloader yields 404
+                    Tuple2<Integer, String> missing =
+                            HttpUtils.getFromHTTP(baseUrl + "/hello.html");
+                    assertThat(missing.f0).isEqualTo(404);
+                    assertThat(missing.f1).contains("not found");
+                });
+    }
+
+    /**
+     * Tests file-system safety checks performed by {@code responseWithFile}:
+     *
+     * <ul>
+     *   <li>a directory request returns 405;
+     *   <li>a path that escapes the {@code webDir} returns 403.
+     * </ul>
+     */
+    @Test
+    void testRespondWithFile(@TempDir Path tmpDir) throws Exception {
+        runWithTestBody(
+                tmpDir,
+                (webDir, handler, baseUrl) -> {
+                    // requesting a directory rather than a file yields 405
+                    Files.createDirectory(webDir.resolve("dir.json"));
+                    Tuple2<Integer, String> dirNotFound =
+                            HttpUtils.getFromHTTP(baseUrl + "/dir.json");
+                    assertThat(dirNotFound.f0).isEqualTo(405);
+                    assertThat(dirNotFound.f1).contains("not found");
+
+                    // requesting a file outside of the webDir is rejected 
with 403
+                    Files.createFile(tmpDir.resolve("secret"));
+                    Tuple2<Integer, String> outsideWebDir =
+                            HttpUtils.getFromHTTP(baseUrl + "/../secret");
+                    assertThat(outsideWebDir.f0).isEqualTo(403);
+                    assertThat(outsideWebDir.f1).contains("Forbidden");
+                });
+    }
+
+    /**
+     * Tests requests served via the {@link ArchiveStorage} resource.
+     *
+     * <ul>
+     *   <li>an existing entry is returned with its content;
+     *   <li>a missing entry results in 404.
+     * </ul>
+     */
+    @Test
+    void testRespondWithResource(@TempDir Path tmpDir) throws Exception {
+        runWithTestBody(
+                tmpDir,
+                (webDir, handler, baseUrl) -> {
+                    String resourcePath = "/overviews/job1.json";
+                    String resourceContent = "{\"job\":\"job1\"}";
+                    handler.archiveStorage.putArchiveContent(resourcePath, 
resourceContent);
+
+                    // request without an extension: handler will append 
".json" and serve the
+                    // entry from the ArchiveStorage via respondWithResource
+                    Tuple2<Integer, String> resource =
+                            HttpUtils.getFromHTTP(baseUrl + "/overviews/job1");
+                    assertThat(resource.f0).isEqualTo(200);
+                    assertThat(resource.f1).isEqualTo(resourceContent);
+
+                    // request a missing resource: archiveStorage returns null 
and the handler
+                    // responds 404
+                    Tuple2<Integer, String> missing = 
HttpUtils.getFromHTTP(baseUrl + "/hello");
+                    assertThat(missing.f0).isEqualTo(404);
+                    assertThat(missing.f1).contains("not found");
+                });
+    }
+
+    /**
+     * Boots up a {@link WebFrontendBootstrap} backed by the handler under 
test and hands the
+     * resolved {@code webDir}, the handler instance and the server's base URL 
to the given test
+     * body. Takes care of tearing the server down afterwards.
+     */
+    private void runWithTestBody(Path tmpDir, TestBody testBody) throws 
Exception {
+        final Path webDir = Files.createDirectory(tmpDir.resolve("webDir"));
+        final Path uploadDir = 
Files.createDirectory(tmpDir.resolve("uploadDir"));
+
+        AbstractHistoryServerHandler<?> handler = 
createHandler(webDir.toFile());
+        Router router = new Router().addGet("/:*", handler);

Review Comment:
   Raw use of parameterized class 'Router' 



##########
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/AbstractHistoryServerHandlerTest.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.router.Router;
+import org.apache.flink.runtime.webmonitor.testutils.HttpUtils;
+import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Common HTTP-level tests for {@link AbstractHistoryServerHandler} 
subclasses. Subclasses only need
+ * to provide a concrete handler instance via {@link #createHandler(File)}.
+ */
+abstract class AbstractHistoryServerHandlerTest {

Review Comment:
   This could be a parameterized test.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;

Review Comment:
   ```suggestion
               return db.keyExists(key.getBytes(UTF_8));
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;
+        } catch (RocksDBException e) {
+            LOG.warn("Failed to check existence of key: {}", key, e);
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public String getEntry(String key) throws IOException {
+        try {
+            byte[] value = db.get(key.getBytes(UTF_8));
+            if (value == null) {
+                return null;
+            }
+            return new String(value, UTF_8);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to get key: " + key, e);
+        }
+    }
+
+    @Override
+    public void putArchiveContent(String key, String value) throws IOException 
{
+        try {
+            db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to put key: " + key, e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws IOException {
+        try {
+            db.delete(writeOptions, key.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete key: " + key, e);
+        }
+    }
+
+    @Override
+    public void deleteByPrefix(String keyPrefix) throws IOException {
+        if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) {
+            return;
+        }
+
+        try {
+            // Delete all keys that start with the given prefix
+            byte[] startKey = keyPrefix.getBytes(UTF_8);
+            byte[] endKey = keyPrefix.getBytes(UTF_8);

Review Comment:
   Could we use somethings like `Arrays.copyOf/System.arraycopy` to avoid extra 
encoding?



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;
+        } catch (RocksDBException e) {
+            LOG.warn("Failed to check existence of key: {}", key, e);
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public String getEntry(String key) throws IOException {
+        try {
+            byte[] value = db.get(key.getBytes(UTF_8));
+            if (value == null) {
+                return null;
+            }
+            return new String(value, UTF_8);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to get key: " + key, e);
+        }
+    }
+
+    @Override
+    public void putArchiveContent(String key, String value) throws IOException 
{
+        try {
+            db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to put key: " + key, e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws IOException {
+        try {
+            db.delete(writeOptions, key.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete key: " + key, e);
+        }
+    }
+
+    @Override
+    public void deleteByPrefix(String keyPrefix) throws IOException {
+        if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) {
+            return;
+        }
+
+        try {
+            // Delete all keys that start with the given prefix
+            byte[] startKey = keyPrefix.getBytes(UTF_8);
+            byte[] endKey = keyPrefix.getBytes(UTF_8);
+            // Add 1 to the last byte to get the next lexicographic byte
+            endKey[endKey.length - 1]++;
+            db.deleteRange(writeOptions, startKey, endKey);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete prefix: " + keyPrefix, e);
+        }
+    }
+
+    @Override
+    public List<String> getEntriesByPrefix(String prefix) throws IOException {
+        List<String> result = new ArrayList<>();
+        if (prefix == null || prefix.isEmpty()) {

Review Comment:
   `StringUtils.isNullOrWhitespaceOnly(prefix))`



##########
flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java:
##########
@@ -247,5 +247,69 @@ public class HistoryServerOptions {
                                             text(CONFIGURE_CONSISTENT))
                                     .build());
 
+    public static final ConfigOption<HistoryServerArchiveStorageType>
+            HISTORY_SERVER_ARCHIVE_STORAGE_TYPE =
+                    key("historyserver.archive.storage.type")
+                            .enumType(HistoryServerArchiveStorageType.class)
+                            .defaultValue(HistoryServerArchiveStorageType.FILE)
+                            .withDescription(
+                                    Description.builder()
+                                            .text("The type of archive 
storage.")
+                                            .build());
+
+    public static final ConfigOption<String> 
HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR =
+            key("historyserver.archive.rocksdb.native-lib-dir")
+                    .stringType()
+                    .defaultValue(System.getProperty("java.io.tmpdir"))
+                    .withDescription(
+                            "Base directory used to extract the RocksDB native 
library for the "
+                                    + "HistoryServer archive storage. Each 
HistoryServer instance "
+                                    + "extracts the library into a unique 
sub-directory under this "
+                                    + "directory. Defaults to the JVM 
'java.io.tmpdir' when not "
+                                    + "configured. Only applies when "
+                                    + "'historyserver.archive.storage.type' is 
'ROCKSDB'.");
+
+    /** Compression type used for the non-bottommost levels of the RocksDB SST 
files. */
+    public static final ConfigOption<RocksDBCompressionType>
+            HISTORY_SERVER_ARCHIVE_ROCKSDB_COMPRESSION =
+                    key("historyserver.archive.rocksdb.compression")
+                            .enumType(RocksDBCompressionType.class)
+                            
.defaultValue(RocksDBCompressionType.LZ4_COMPRESSION)
+                            .withDescription(
+                                    "Compression type used for the 
non-bottommost levels of the RocksDB "
+                                            + "SST files. Only applies when 
'historyserver.archive.storage.type' is 'ROCKSDB'.");
+
+    /** Compression type used for the bottommost level of the RocksDB SST 
files. */
+    public static final ConfigOption<RocksDBCompressionType>
+            HISTORY_SERVER_ARCHIVE_ROCKSDB_BOTTOMMOST_COMPRESSION =
+                    key("historyserver.archive.rocksdb.bottommost-compression")
+                            .enumType(RocksDBCompressionType.class)
+                            
.defaultValue(RocksDBCompressionType.ZSTD_COMPRESSION)
+                            .withDescription(
+                                    "Compression type used for the bottommost 
level of the "
+                                            + "RocksDB SST files. Only applies 
when 'historyserver.archive.storage.type' is 'ROCKSDB'.");

Review Comment:
   These configurations are too low-level. Do any users actually have the need 
to optimize these? If so, it's not too late to introduce them later. I prefer 
to remove these two configuration options first.



##########
flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java:
##########
@@ -247,5 +247,69 @@ public class HistoryServerOptions {
                                             text(CONFIGURE_CONSISTENT))
                                     .build());
 
+    public static final ConfigOption<HistoryServerArchiveStorageType>
+            HISTORY_SERVER_ARCHIVE_STORAGE_TYPE =
+                    key("historyserver.archive.storage.type")
+                            .enumType(HistoryServerArchiveStorageType.class)
+                            .defaultValue(HistoryServerArchiveStorageType.FILE)
+                            .withDescription(
+                                    Description.builder()
+                                            .text("The type of archive 
storage.")
+                                            .build());
+
+    public static final ConfigOption<String> 
HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR =
+            key("historyserver.archive.rocksdb.native-lib-dir")
+                    .stringType()
+                    .defaultValue(System.getProperty("java.io.tmpdir"))
+                    .withDescription(
+                            "Base directory used to extract the RocksDB native 
library for the "
+                                    + "HistoryServer archive storage. Each 
HistoryServer instance "
+                                    + "extracts the library into a unique 
sub-directory under this "
+                                    + "directory. Defaults to the JVM 
'java.io.tmpdir' when not "
+                                    + "configured. Only applies when "
+                                    + "'historyserver.archive.storage.type' is 
'ROCKSDB'.");

Review Comment:
   I prefer `HISTORY_SERVER_ARCHIVE_STORAGE_TYPE.key()` rather than hard-coded 
`historyserver.archive.storage.type`.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;

Review Comment:
   Could be a local variable.



##########
docs/content.zh/docs/deployment/advanced/historyserver.md:
##########
@@ -85,6 +85,26 @@ historyserver.archive.fs.refresh-interval: 10000
 
 请查看配置页面以获取[配置选项的完整列表]({{< ref "docs/deployment/config" >}}#history-server)。
 
+**存档存储后端**
+
+HistoryServer 支持通过 `historyserver.archive.storage.type` 选择本地存档存储后端:
+
+* `FILE`(默认):每个存档解压为多个 JSON 文件,存放在 `historyserver.web.tmpdir` 下。
+* `ROCKSDB`:所有存档以 KV 形式存储在 `historyserver.web.tmpdir/rocksdb` 的单个 RocksDB 
实例中,可避免"大量小文件"问题,适合存档数量较多的场景。
+
+启用 RocksDB 后端示例:
+
+```yaml
+historyserver.archive.storage.type: ROCKSDB
+
+# 可选:RocksDB native 库解压目录,默认使用 JVM 的 java.io.tmpdir
+historyserver.archive.rocksdb.native-lib-dir: /path/to/rocksdb/native-lib
+
+# 可选:SST 文件压缩类型
+historyserver.archive.rocksdb.compression: LZ4_COMPRESSION
+historyserver.archive.rocksdb.bottommost-compression: ZSTD_COMPRESSION

Review Comment:
   removed.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java:
##########
@@ -87,196 +48,27 @@
  * WebInterface and that the caching of the "/joboverview" page is prevented.
  */
 @ChannelHandler.Sharable
-public class HistoryServerStaticFileServerHandler
-        extends SimpleChannelInboundHandler<RoutedRequest> {
-
-    /** Default logger, if none is specified. */
-    private static final Logger LOG =
-            
LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class);
+public class HistoryServerStaticFileServerHandler extends 
AbstractHistoryServerHandler<File> {
 
     // ------------------------------------------------------------------------
 
-    /** The path in which the static documents are. */
-    private final File rootPath;
-
     public HistoryServerStaticFileServerHandler(File rootPath) throws 
IOException {
-        this.rootPath = checkNotNull(rootPath).getCanonicalFile();
+        this(new FileArchiveStorage(rootPath), rootPath);
+    }
+
+    public HistoryServerStaticFileServerHandler(
+            FileArchiveStorage fileArchiveStorage, File rootPath) throws 
IOException {
+        super(fileArchiveStorage, rootPath);

Review Comment:
   Why this changes live in the third commit.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);

Review Comment:
   If we raise here, will the `db` still be able to close properly (i.e. 
`handlesToClose` still be able to run)?



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;

Review Comment:
   Could be a local variable.



##########
docs/content/docs/deployment/advanced/historyserver.md:
##########
@@ -79,6 +79,26 @@ The contained archives are downloaded and cached in the 
local filesystem. The lo
 
 Check out the configuration page for a [complete list of configuration 
options]({{< ref "docs/deployment/config" >}}#history-server).
 
+**Archive Storage Backend**
+
+The HistoryServer supports pluggable local archive storage backends, selected 
via `historyserver.archive.storage.type`:
+
+* `FILE` (default): Each archive is unpacked into multiple JSON files under 
`historyserver.web.tmpdir`.
+* `ROCKSDB`: All archives are stored as key-value pairs in a single embedded 
RocksDB instance under `historyserver.web.tmpdir/rocksdb`. This avoids the 
"many small files" problem and is recommended when there is a large number of 
archives.
+
+Example for enabling the RocksDB backend:
+
+```yaml
+historyserver.archive.storage.type: ROCKSDB
+
+# Optional: base directory used to extract the RocksDB native library; 
defaults to the JVM "java.io.tmpdir"
+historyserver.archive.rocksdb.native-lib-dir: /path/to/rocksdb/native-lib
+
+# Optional: SST file compression types
+historyserver.archive.rocksdb.compression: LZ4_COMPRESSION
+historyserver.archive.rocksdb.bottommost-compression: ZSTD_COMPRESSION

Review Comment:
   removed.



##########
docs/content.zh/docs/deployment/advanced/historyserver.md:
##########
@@ -85,6 +85,26 @@ historyserver.archive.fs.refresh-interval: 10000
 
 请查看配置页面以获取[配置选项的完整列表]({{< ref "docs/deployment/config" >}}#history-server)。
 
+**存档存储后端**

Review Comment:
   存储后端



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;
+        } catch (RocksDBException e) {
+            LOG.warn("Failed to check existence of key: {}", key, e);
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public String getEntry(String key) throws IOException {
+        try {
+            byte[] value = db.get(key.getBytes(UTF_8));
+            if (value == null) {
+                return null;
+            }
+            return new String(value, UTF_8);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to get key: " + key, e);
+        }
+    }
+
+    @Override
+    public void putArchiveContent(String key, String value) throws IOException 
{
+        try {
+            db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to put key: " + key, e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws IOException {
+        try {
+            db.delete(writeOptions, key.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete key: " + key, e);
+        }
+    }
+
+    @Override
+    public void deleteByPrefix(String keyPrefix) throws IOException {
+        if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) {
+            return;
+        }
+
+        try {
+            // Delete all keys that start with the given prefix
+            byte[] startKey = keyPrefix.getBytes(UTF_8);
+            byte[] endKey = keyPrefix.getBytes(UTF_8);
+            // Add 1 to the last byte to get the next lexicographic byte
+            endKey[endKey.length - 1]++;
+            db.deleteRange(writeOptions, startKey, endKey);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete prefix: " + keyPrefix, e);
+        }
+    }
+
+    @Override
+    public List<String> getEntriesByPrefix(String prefix) throws IOException {
+        List<String> result = new ArrayList<>();
+        if (prefix == null || prefix.isEmpty()) {
+            return result;
+        }
+
+        try (RocksIterator iterator = db.newIterator()) {
+            byte[] prefixBytes = prefix.getBytes(UTF_8);
+            iterator.seek(prefixBytes);
+            while (iterator.isValid()) {

Review Comment:
   Could we use `ReadOptions.setIterateUpperBound(Slice)` to set upbound here?
   
   ```
           byte[] upper = xxx;
           try (Slice upperSlice = new Slice(upper);
                ReadOptions ro = new 
ReadOptions().setIterateUpperBound(upperSlice);
                RocksIterator it = db.newIterator(ro)) {
               for (it.seek(prefixBytes); it.isValid(); it.next()) {
                   result.add(new String(it.value(), UTF_8));
               }
               try {
                   it.status();
               } catch (RocksDBException e) {
                   throw new IOException(e);
               }
           }
   
   ```



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;
+        } catch (RocksDBException e) {
+            LOG.warn("Failed to check existence of key: {}", key, e);
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public String getEntry(String key) throws IOException {
+        try {
+            byte[] value = db.get(key.getBytes(UTF_8));
+            if (value == null) {
+                return null;
+            }
+            return new String(value, UTF_8);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to get key: " + key, e);
+        }
+    }
+
+    @Override
+    public void putArchiveContent(String key, String value) throws IOException 
{
+        try {
+            db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to put key: " + key, e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws IOException {
+        try {
+            db.delete(writeOptions, key.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete key: " + key, e);
+        }
+    }
+
+    @Override
+    public void deleteByPrefix(String keyPrefix) throws IOException {
+        if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) {
+            return;
+        }
+
+        try {
+            // Delete all keys that start with the given prefix
+            byte[] startKey = keyPrefix.getBytes(UTF_8);
+            byte[] endKey = keyPrefix.getBytes(UTF_8);
+            // Add 1 to the last byte to get the next lexicographic byte
+            endKey[endKey.length - 1]++;

Review Comment:
   IIUC, this trick should heavily rely on the fact that the byte array is 
encoded in UTF-8. It would be best to add some comments to explain why it works.



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;
+        } catch (RocksDBException e) {
+            LOG.warn("Failed to check existence of key: {}", key, e);
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public String getEntry(String key) throws IOException {
+        try {
+            byte[] value = db.get(key.getBytes(UTF_8));
+            if (value == null) {
+                return null;
+            }
+            return new String(value, UTF_8);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to get key: " + key, e);
+        }
+    }
+
+    @Override
+    public void putArchiveContent(String key, String value) throws IOException 
{
+        try {
+            db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to put key: " + key, e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws IOException {
+        try {
+            db.delete(writeOptions, key.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete key: " + key, e);
+        }
+    }
+
+    @Override
+    public void deleteByPrefix(String keyPrefix) throws IOException {
+        if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) {
+            return;
+        }
+
+        try {
+            // Delete all keys that start with the given prefix
+            byte[] startKey = keyPrefix.getBytes(UTF_8);
+            byte[] endKey = keyPrefix.getBytes(UTF_8);
+            // Add 1 to the last byte to get the next lexicographic byte
+            endKey[endKey.length - 1]++;
+            db.deleteRange(writeOptions, startKey, endKey);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete prefix: " + keyPrefix, e);
+        }
+    }
+
+    @Override
+    public List<String> getEntriesByPrefix(String prefix) throws IOException {
+        List<String> result = new ArrayList<>();
+        if (prefix == null || prefix.isEmpty()) {
+            return result;
+        }
+
+        try (RocksIterator iterator = db.newIterator()) {
+            byte[] prefixBytes = prefix.getBytes(UTF_8);
+            iterator.seek(prefixBytes);
+            while (iterator.isValid()) {
+                byte[] keyBytes = iterator.key();
+                byte[] valueBytes = iterator.value();
+                String currentKey = new String(keyBytes, UTF_8);
+                String currentValue = new String(valueBytes, UTF_8);
+
+                if (currentKey.startsWith(prefix)) {
+                    result.add(currentValue);
+                    iterator.next();
+                } else {
+                    break;
+                }
+            }

Review Comment:
   Perhaps we should add a check after `while` block per 
https://github.com/facebook/rocksdb/wiki/Iterator 🤔 
   
   ```
   while(iterator.isValid()){
   xxx
   }
   iterator.status();
   ```
   



##########
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/RocksDBArchiveStorage.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HistoryServerOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.CompressionType;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ArchiveStorage} implementation backed by RocksDB.
+ *
+ * <p>All archived data is stored as key-value pairs in a single RocksDB 
instance, avoiding the
+ * problem of numerous small files. The key is the request path (e.g. {@code
+ * /jobs/xxx/config.json}), and the value is a JSON string.
+ */
+public class RocksDBArchiveStorage implements ArchiveStorage<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBArchiveStorage.class);
+
+    private final RocksDB db;
+
+    private BlockBasedTableConfig tableFormatConfig;
+
+    private Options dbOptions;
+
+    private WriteOptions writeOptions;
+
+    private BloomFilter bloomFilter;
+
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance with default 
RocksDB options.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @throws IOException if the RocksDB database cannot be opened
+     */
+    public RocksDBArchiveStorage(File dbPath) throws IOException {
+        this(dbPath, new Configuration());
+    }
+
+    /**
+     * Creates a new {@link RocksDBArchiveStorage} instance.
+     *
+     * @param dbPath the RocksDB database directory path
+     * @param config the configuration used to read RocksDB related options 
(see {@link
+     *     HistoryServerOptions})
+     * @throws IOException if the RocksDB native library cannot be loaded or 
the database cannot be
+     *     opened
+     */
+    public RocksDBArchiveStorage(File dbPath, ReadableConfig config) throws 
IOException {
+        checkNotNull(dbPath, "dbPath cannot be null");
+        checkNotNull(config, "config cannot be null");
+        this.handlesToClose = new ArrayList<>();
+        String rocksDBNativeLibDir =
+                
config.get(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_ROCKSDB_NATIVE_LIB_DIR);
+
+        try {
+            loadRocksDBLibrary(rocksDBNativeLibDir);
+            loadConfiguration(config);
+            this.db = RocksDB.open(dbOptions, dbPath.getAbsolutePath());
+            handlesToClose.add(db);
+        } catch (Throwable t) {
+            throw new IOException("Failed to initialize 
RocksDBArchiveStorage", t);
+        }
+    }
+
+    @Override
+    public boolean exists(String key) throws IOException {
+        try {
+            return db.get(key.getBytes(UTF_8)) != null;
+        } catch (RocksDBException e) {
+            LOG.warn("Failed to check existence of key: {}", key, e);
+            return false;
+        }
+    }
+
+    @Nullable
+    @Override
+    public String getEntry(String key) throws IOException {
+        try {
+            byte[] value = db.get(key.getBytes(UTF_8));
+            if (value == null) {
+                return null;
+            }
+            return new String(value, UTF_8);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to get key: " + key, e);
+        }
+    }
+
+    @Override
+    public void putArchiveContent(String key, String value) throws IOException 
{
+        try {
+            db.put(writeOptions, key.getBytes(UTF_8), value.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to put key: " + key, e);
+        }
+    }
+
+    @Override
+    public void delete(String key) throws IOException {
+        try {
+            db.delete(writeOptions, key.getBytes(UTF_8));
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete key: " + key, e);
+        }
+    }
+
+    @Override
+    public void deleteByPrefix(String keyPrefix) throws IOException {
+        if (StringUtils.isNullOrWhitespaceOnly(keyPrefix)) {
+            return;
+        }
+
+        try {
+            // Delete all keys that start with the given prefix
+            byte[] startKey = keyPrefix.getBytes(UTF_8);
+            byte[] endKey = keyPrefix.getBytes(UTF_8);
+            // Add 1 to the last byte to get the next lexicographic byte
+            endKey[endKey.length - 1]++;
+            db.deleteRange(writeOptions, startKey, endKey);
+        } catch (RocksDBException e) {
+            throw new IOException("Failed to delete prefix: " + keyPrefix, e);
+        }
+    }
+
+    @Override
+    public List<String> getEntriesByPrefix(String prefix) throws IOException {
+        List<String> result = new ArrayList<>();
+        if (prefix == null || prefix.isEmpty()) {
+            return result;
+        }
+
+        try (RocksIterator iterator = db.newIterator()) {
+            byte[] prefixBytes = prefix.getBytes(UTF_8);
+            iterator.seek(prefixBytes);
+            while (iterator.isValid()) {
+                byte[] keyBytes = iterator.key();
+                byte[] valueBytes = iterator.value();
+                String currentKey = new String(keyBytes, UTF_8);
+                String currentValue = new String(valueBytes, UTF_8);
+
+                if (currentKey.startsWith(prefix)) {
+                    result.add(currentValue);
+                    iterator.next();
+                } else {
+                    break;
+                }
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public void close() {
+        handlesToClose.forEach(IOUtils::closeQuietly);
+        handlesToClose.clear();
+    }
+
+    /**
+     * Loads the RocksDB native library. The default configuration is suitable 
for most use cases.
+     * So we only expose a few options.
+     */
+    private void loadConfiguration(ReadableConfig config) {
+
+        // Use the full (non-block-based) BloomFilter format with 10 bits/key, 
which is the
+        // RocksDB-recommended default and yields ~1% false positive rate.
+        // 
https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#full-filters-new-format
+        this.bloomFilter = new BloomFilter(10.0D, false);
+        handlesToClose.add(bloomFilter);
+
+        this.tableFormatConfig =
+                new BlockBasedTableConfig()
+                        .setFilterPolicy(bloomFilter)
+                        .setEnableIndexCompression(false)
+                        .setIndexBlockRestartInterval(8)
+                        .setFormatVersion(5);
+
+        this.dbOptions =
+                new Options()
+                        .setCreateIfMissing(true)
+                        .setBottommostCompressionType(
+                                convertCompressionType(
+                                        config.get(
+                                                HistoryServerOptions
+                                                        
.HISTORY_SERVER_ARCHIVE_ROCKSDB_BOTTOMMOST_COMPRESSION)))
+                        .setCompressionType(
+                                convertCompressionType(
+                                        config.get(
+                                                HistoryServerOptions
+                                                        
.HISTORY_SERVER_ARCHIVE_ROCKSDB_COMPRESSION)))
+                        .setTableFormatConfig(tableFormatConfig);
+        handlesToClose.add(dbOptions);
+
+        this.writeOptions = new WriteOptions().setDisableWAL(true);
+        handlesToClose.add(writeOptions);
+    }
+
+    private static void loadRocksDBLibrary(String baseTempDir) throws 
IOException {
+        File baseDir = new File(checkNotNull(baseTempDir, "baseTempDir cannot 
be null"));
+        File libDir = new File(baseDir, "flink-history-rocksdb-lib-" + 
UUID.randomUUID());
+        try {
+            Files.createDirectories(libDir.toPath());
+            LOG.info("Try to load RocksDB native library to '{}'.", 
libDir.getAbsolutePath());
+            
NativeLibraryLoader.getInstance().loadLibrary(libDir.getAbsolutePath());
+            // Sanity check that the library is fully usable.
+            RocksDB.loadLibrary();
+            LOG.info(
+                    "Successfully loaded RocksDB native library to '{}'.",
+                    libDir.getAbsolutePath());
+        } catch (Throwable t) {
+            LOG.warn("Failed to load RocksDB native library to '{}'.", 
libDir.getAbsolutePath(), t);
+            deleteDirectoryQuietly(libDir);

Review Comment:
   Should we clean up this `libDir` even if history server stop w/o error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to