reswqa commented on code in PR #28402: URL: https://github.com/apache/flink/pull/28402#discussion_r3426456705
########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/FileArchiveStorage.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.util.FileUtils; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A file-system backed implementation of {@link ArchiveStorage}. + * + * <p>Each storage key is treated as a relative path under the configured {@code rootPath}. + */ +public class FileArchiveStorage implements ArchiveStorage<File> { + + /** Root directory under which all archive files are stored. */ + private final File rootPath; + + /** + * Creates a new {@link FileArchiveStorage}. + * + * @param rootPath root directory for archive files; must not be {@code null} + * @throws IOException if the canonical path of {@code rootPath} cannot be resolved + */ + public FileArchiveStorage(File rootPath) throws IOException { + this.rootPath = checkNotNull(rootPath).getCanonicalFile(); + } + + @Override + public boolean exists(String key) throws IOException { + return new File(rootPath, key).exists(); Review Comment: I wonder could we avoid the allocation here like using `Files.exists`? ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchiveStorage.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Abstraction for the storage backend used by the {@link HistoryServer} to access archived job + * data. + * + * <p>Implementations can be backed by the local file system, RocksDB, or any other storage medium. + * + * <p>The archive JSON will be stored in different entry types, and each implementation can decide + * whether to read the full content or not. + * + * @param <Entry> Type of the storage entries. + */ +public interface ArchiveStorage<Entry> extends Closeable { + + /** + * Returns whether the entry identified by {@code key} exists in this storage. + * + * @param key storage key (typically the request path, e.g. {@code /jobs/xxx/config.json}) + * @return {@code true} if the entry exists + */ + boolean exists(String key) throws IOException; + + /** + * Returns the entry identified by {@code key} from this storage. + * + * @param key storage key + * @return the entry, or null if the entry does not exist + * @throws IOException if the entry cannot be read + */ + @Nullable + Entry getEntry(String key) throws IOException; + + /** + * Stores the entry identified by {@code key} in this storage. + * + * @param key storage key + * @param archiveContent the archive content to store, this type is string because the archive + * content is always a JSON String + * @throws IOException if the entry cannot be written + */ + void putArchiveContent(String key, String archiveContent) throws IOException; + + /** + * Deletes the entry identified by {@code key} from this storage. + * + * @param key storage key + * @throws IOException if the entry cannot be deleted + */ + void delete(String key) throws IOException; + + /** + * Deletes all entries with key starting with {@code keyPrefix} from this storage. + * + * <p>Such as deleting all archived files for a given job or application. + * + * @param keyPrefix key prefix + * @throws IOException if entries cannot be deleted + */ + void deleteByPrefix(String keyPrefix) throws IOException; + + /** + * Returns the entries identified by {@code prefix} from this storage. + * + * @param prefix storage key prefix + * @return the entries + * @throws IOException if the entries cannot be read + */ + List<Entry> getEntriesByPrefix(String prefix) throws IOException; Review Comment: I suggest either keeping all the `Entries` word from method name or removing them all. ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchiveStorage.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Abstraction for the storage backend used by the {@link HistoryServer} to access archived job + * data. + * + * <p>Implementations can be backed by the local file system, RocksDB, or any other storage medium. + * + * <p>The archive JSON will be stored in different entry types, and each implementation can decide + * whether to read the full content or not. + * + * @param <Entry> Type of the storage entries. + */ +public interface ArchiveStorage<Entry> extends Closeable { + + /** + * Returns whether the entry identified by {@code key} exists in this storage. + * + * @param key storage key (typically the request path, e.g. {@code /jobs/xxx/config.json}) + * @return {@code true} if the entry exists + */ + boolean exists(String key) throws IOException; + + /** + * Returns the entry identified by {@code key} from this storage. + * + * @param key storage key + * @return the entry, or null if the entry does not exist + * @throws IOException if the entry cannot be read + */ + @Nullable + Entry getEntry(String key) throws IOException; + + /** + * Stores the entry identified by {@code key} in this storage. + * + * @param key storage key + * @param archiveContent the archive content to store, this type is string because the archive + * content is always a JSON String + * @throws IOException if the entry cannot be written + */ + void putArchiveContent(String key, String archiveContent) throws IOException; + + /** + * Deletes the entry identified by {@code key} from this storage. + * + * @param key storage key + * @throws IOException if the entry cannot be deleted + */ + void delete(String key) throws IOException; + + /** + * Deletes all entries with key starting with {@code keyPrefix} from this storage. + * + * <p>Such as deleting all archived files for a given job or application. + * + * @param keyPrefix key prefix + * @throws IOException if entries cannot be deleted + */ + void deleteByPrefix(String keyPrefix) throws IOException; + + /** + * Returns the entries identified by {@code prefix} from this storage. + * + * @param prefix storage key prefix + * @return the entries + * @throws IOException if the entries cannot be read Review Comment: `@throws IOException if the entries cannot be read` Any entries or all? ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/FileArchiveStorage.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.webmonitor.history; + +import org.apache.flink.util.FileUtils; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A file-system backed implementation of {@link ArchiveStorage}. + * + * <p>Each storage key is treated as a relative path under the configured {@code rootPath}. + */ +public class FileArchiveStorage implements ArchiveStorage<File> { + + /** Root directory under which all archive files are stored. */ + private final File rootPath; + + /** + * Creates a new {@link FileArchiveStorage}. + * + * @param rootPath root directory for archive files; must not be {@code null} + * @throws IOException if the canonical path of {@code rootPath} cannot be resolved + */ + public FileArchiveStorage(File rootPath) throws IOException { + this.rootPath = checkNotNull(rootPath).getCanonicalFile(); + } + + @Override + public boolean exists(String key) throws IOException { + return new File(rootPath, key).exists(); + } + + @Nullable + @Override + public File getEntry(String key) throws IOException { + if (!exists(key)) { + return null; + } + return new File(rootPath, key); + } + + @Override + public void putArchiveContent(String key, String archiveContent) throws IOException { + File target = new File(rootPath, key); + writeTargetFile(target, archiveContent); + } + + @Override + public void delete(String key) throws IOException { + File target = new File(rootPath, key); + Files.deleteIfExists(target.toPath()); Review Comment: Could we avoid creating the `File` instance here? ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java: ########## @@ -438,7 +456,7 @@ static class RefreshLocation { private final Path path; private final FileSystem fs; - private RefreshLocation(Path path, FileSystem fs) { + protected RefreshLocation(Path path, FileSystem fs) { Review Comment: Why this should be protected? ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java: ########## @@ -464,21 +437,24 @@ void updateOverview() { * <p>For the display in the HistoryServer WebFrontend we have to combine these overviews. */ void updateJobOverview() { - try (JsonGenerator gen = - jacksonFactory.createGenerator( - HistoryServer.createOrGetFile(webDir, JobsOverviewHeaders.URL))) { - File[] overviews = new File(webOverviewDir.getPath()).listFiles(); - if (overviews != null) { - Collection<JobDetails> allJobs = new ArrayList<>(overviews.length); - for (File overview : overviews) { - MultipleJobsDetails subJobs = - mapper.readValue(overview, MultipleJobsDetails.class); - allJobs.addAll(subJobs.getJobs()); + try { + Collection<JobDetails> allJobs = new ArrayList<>(); + List<Entry> overviews = + archiveStorage.getEntriesByPrefix("/" + JOB_OVERVIEWS_SUBDIR + "/"); + for (Entry overview : overviews) { + MultipleJobsDetails subJobs; + if (overview instanceof File) { + subJobs = mapper.readValue((File) overview, MultipleJobsDetails.class); + } else { + subJobs = mapper.readValue((String) overview, MultipleJobsDetails.class); } Review Comment: We assume that `Entry` either a `File` or a `String`, If I introduce a new type of `ArchivedStore` then things got worse. I don't think dev should add new branch here? ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java: ########## @@ -272,47 +268,24 @@ ArchiveEvent processJobArchive(String jobId, Path jobArchive) throws IOException String path = archive.getPath(); String json = archive.getJson(); - File target; + String key; if (path.equals(JobsOverviewHeaders.URL)) { - target = new File(webOverviewDir, jobId + JSON_FILE_ENDING); + key = "/" + JOB_OVERVIEWS_SUBDIR + "/" + jobId + JSON_FILE_ENDING; Review Comment: I suggest we extract `"/" + JOB_OVERVIEWS_SUBDIR + "/"` and `"/" + JOBS_SUBDIR + "/"` as a const String variable(e.g. JOB_OVERVIEWS_KEY_PREFIX and JOBS_KEY_PREFIX or any other reasonable name). ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java: ########## @@ -60,7 +57,8 @@ * ArchiveRetainedStrategy} and {@link * HistoryServerOptions#HISTORY_SERVER_CLEANUP_EXPIRED_APPLICATIONS}. */ -public class HistoryServerApplicationArchiveFetcher extends HistoryServerArchiveFetcher { +public class HistoryServerApplicationArchiveFetcher<Entry> Review Comment: Add java doc for the generic type `Entry`. ########## flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerApplicationArchiveFetcher.java: ########## @@ -71,24 +69,28 @@ public class HistoryServerApplicationArchiveFetcher extends HistoryServerArchive private final Map<Path, Map<String, Set<String>>> cachedApplicationIdsToJobIds = new HashMap<>(); - private final File webApplicationDir; - private final File webApplicationsOverviewDir; - HistoryServerApplicationArchiveFetcher( List<HistoryServer.RefreshLocation> refreshDirs, File webDir, Consumer<HistoryServerApplicationArchiveFetcher.ArchiveEvent> archiveEventListener, boolean cleanupExpiredArchives, - ArchiveRetainedStrategy retainedStrategy) + ArchiveRetainedStrategy retainedStrategy, + ArchiveStorage<Entry> archiveStorage) throws IOException { - super(refreshDirs, webDir, archiveEventListener, cleanupExpiredArchives, retainedStrategy); + super( + refreshDirs, + webDir, + archiveEventListener, + cleanupExpiredArchives, + retainedStrategy, + archiveStorage); for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { cachedApplicationIdsToJobIds.put(refreshDir.getPath(), new HashMap<>()); } - this.webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR); + File webApplicationDir = new File(webDir, APPLICATIONS_SUBDIR); Files.createDirectories(webApplicationDir.toPath()); - this.webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR); + File webApplicationsOverviewDir = new File(webDir, APPLICATION_OVERVIEWS_SUBDIR); Files.createDirectories(webApplicationsOverviewDir.toPath()); Review Comment: It seems that we need the `Path` rather than `File` obj itselft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
