wzhero1 commented on code in PR #655: URL: https://github.com/apache/flink-agents/pull/655#discussion_r3263199104
########## runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/URLSkillRepository.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.runtime.skill.repository; + +import org.apache.flink.agents.runtime.skill.AgentSkill; +import org.apache.flink.agents.runtime.skill.SkillRepository; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +/** + * Skill repository backed by an http(s) URL pointing to a zip. + * + * <p>The zip is downloaded to a temp file and extracted into a process-local temp directory. The + * downloaded zip itself is removed once extraction completes; the extracted directory is released + * via {@link #close()} (cascaded through {@code SkillManager} → {@code ResourceContextImpl} → + * {@code ResourceCache} on operator close). A JVM shutdown hook acts as fallback cleanup if {@code + * close()} is never called. + * + * <p>Composes a {@link SkillDirectoryReader} for the on-disk parsing; the materialization happens + * upfront in the constructor (no abuse of {@code super(...)}). + */ +public final class URLSkillRepository implements SkillRepository { + + private static final int REQUEST_TIMEOUT_MS = 90_000; + + private final String url; Review Comment: **Field name `owned` is an adjective without a subject.** `Materialized` is already a noun. Rename the field to `materialization` (matches the class name), or rename the inner class to `MaterializedDir` so the field reads `materializedDir`. Current call sites `this.owned.close()` / `if (owned == null)` don't read well. Same applies to `ClasspathSkillRepository.java:73` and `FileSystemSkillRepository.java:51`. ########## runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillRepository.java: ########## @@ -20,14 +20,19 @@ import javax.annotation.Nullable; +import java.nio.file.Path; import java.util.List; import java.util.Map; /** * Source of skills. Mirrors the Python {@code * flink_agents.runtime.skill.skill_repository.SkillRepository}. + * + * <p>Implementations that own a materialized temp directory (URL / classpath / zip) should override Review Comment: **Three sibling repos duplicate the same 5 delegating methods + state** (`reader`, `materialization`, plus `getSkill`/`getSkills`/`getResources`/`getSkillDir`/`close`) — ~45 lines of boilerplate, +15 per future source. Extract `AbstractMaterializedSkillRepository` holding the shared state and implementing the delegators once; concrete repos reduce to source-specific I/O + an abstract `getOrigin()`. ########## runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java: ########## @@ -111,38 +148,94 @@ public List<String> getSkillDirs(List<String> names) { @Nullable public Path getSkillDir(String skillName) { SkillRepository repo = repos.get(skillName); - if (repo instanceof FileSystemSkillRepository) { - return ((FileSystemSkillRepository) repo).getBaseDir().resolve(skillName); - } - return null; + return repo == null ? null : repo.getSkillDir(skillName); } /** Resolve a skill resource's relative path to an absolute path, or {@code null} if missing. */ @Nullable public Path resolveResourcePath(String skillName, String resourcePath) { SkillRepository repo = repos.get(skillName); - if (repo instanceof FileSystemSkillRepository) { - Path resolved = - ((FileSystemSkillRepository) repo) - .getBaseDir() - .resolve(skillName) - .resolve(resourcePath); - if (Files.isRegularFile(resolved)) { - return resolved; + if (repo == null) { + return null; + } + Path dir = repo.getSkillDir(skillName); + if (dir == null) { + return null; + } + Path resolved = dir.resolve(resourcePath); + return Files.isRegularFile(resolved) ? resolved : null; + } + + private void loadAll() { + for (SkillSourceSpec spec : config.getSources()) { + try { + SkillRepository repo = + SkillSourceRegistry.get(spec.getScheme()) + .open(spec.getParams(), classLoader); + registerRepo(repo, originOf(spec)); + } catch (IOException | IllegalArgumentException e) { + throw new IllegalStateException( + "Failed to load skills from " + spec.getScheme() + ":" + spec.getParams(), + e); + } + } + } + + /** Build a {@link SkillOrigin} from a spec for diagnostics (WARN on duplicates, etc.). */ Review Comment: **`originOf` re-introduces the parallel scheme ladder the registry was meant to eliminate.** ```java switch (spec.getScheme()) { case "local": ...; case "url": ...; case "classpath": ...; case "package": ...; } ``` Python's `_origin_of` mirrors this. Adding scheme `s3` now requires 3 file edits (registry ×2 + originOf ×2), not 1. Java's `case "package"` is dead code — no Java `package` handler exists. Fix: add `default String describeLocation(params)` to `SkillSourceHandler`. Each handler describes its own location at registration. ########## runtime/src/main/java/org/apache/flink/agents/runtime/skill/AgentSkill.java: ########## @@ -44,6 +44,7 @@ public final class AgentSkill { @Nullable private volatile Map<String, String> resources; @Nullable private Supplier<Map<String, String>> resourceLoader; private volatile boolean activated; + @Nullable private volatile SkillOrigin origin; Review Comment: **Java/Python serialization surface for `origin` is asymmetric.** Python's `AgentSkill` extends `BaseModel`, so `origin` is in default `model_dump()` output. Java's `AgentSkill` is a POJO with no Jackson annotation — any future debug/checkpoint/Pemja serialization path silently drops it on Java while Python emits it. Same class of cross-language drop A3 fixed at the `Skills` envelope. Add `@JsonProperty("origin")` + `@JsonInclude(NON_NULL)`. Near-zero cost now; cost grows once `AgentSkill` travels on the wire. ########## runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java: ########## @@ -18,33 +18,63 @@ package org.apache.flink.agents.runtime.skill; +import org.apache.flink.agents.api.skills.SkillSourceSpec; import org.apache.flink.agents.api.skills.Skills; -import org.apache.flink.agents.runtime.skill.repository.FileSystemSkillRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** * Loads and indexes all skills referenced by a {@link Skills} configuration. * * <p>Mirrors the Python {@code flink_agents.runtime.skill.skill_manager.SkillManager}. + * + * <p>Owned by {@code ResourceContextImpl}; closed via {@code ResourceCache.close()} on operator + * close, which cascades to each repository's temp directory. Avoid constructing a {@code + * SkillManager} outside that flow without {@code try-with-resources}. */ -public class SkillManager { +public class SkillManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(SkillManager.class); private final Skills config; + private final ClassLoader classLoader; private final Map<String, AgentSkill> skills = new LinkedHashMap<>(); private final Map<String, SkillRepository> repos = new HashMap<>(); - public SkillManager(Skills config) { + /** + * Construct a {@code SkillManager} that resolves {@code classpath:} sources against {@code + * classLoader}. Production code passes the Flink user-code class loader (threaded through + * {@code ResourceCache} from {@code ActionExecutionOperator}); tests / standalone use may use + * {@link #SkillManager(Skills)}. + */ + public SkillManager(Skills config, ClassLoader classLoader) { Review Comment: **Constructor-failure leak: partial `loadAll()` leaves registered repos with no closeable handle.** If the 2nd of 3 sources succeeds (repo + hook registered) but the 3rd throws, the constructor exits with `IllegalStateException` → caller never gets a `SkillManager` reference → cannot call `close()` → 2nd repo's hook + temp dir leak until JVM exit. Same monotonic-leak class as review #10, just triggered by partial-construction instead of hot-reload. 5-line fix: wrap `loadAll`'s loop in try/catch, call `close()` before rethrowing. ########## runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java: ########## @@ -111,38 +148,94 @@ public List<String> getSkillDirs(List<String> names) { @Nullable public Path getSkillDir(String skillName) { SkillRepository repo = repos.get(skillName); - if (repo instanceof FileSystemSkillRepository) { - return ((FileSystemSkillRepository) repo).getBaseDir().resolve(skillName); - } - return null; + return repo == null ? null : repo.getSkillDir(skillName); } /** Resolve a skill resource's relative path to an absolute path, or {@code null} if missing. */ @Nullable public Path resolveResourcePath(String skillName, String resourcePath) { SkillRepository repo = repos.get(skillName); - if (repo instanceof FileSystemSkillRepository) { - Path resolved = - ((FileSystemSkillRepository) repo) - .getBaseDir() - .resolve(skillName) - .resolve(resourcePath); - if (Files.isRegularFile(resolved)) { - return resolved; + if (repo == null) { + return null; + } + Path dir = repo.getSkillDir(skillName); + if (dir == null) { + return null; + } + Path resolved = dir.resolve(resourcePath); + return Files.isRegularFile(resolved) ? resolved : null; + } + + private void loadAll() { + for (SkillSourceSpec spec : config.getSources()) { + try { + SkillRepository repo = + SkillSourceRegistry.get(spec.getScheme()) + .open(spec.getParams(), classLoader); + registerRepo(repo, originOf(spec)); + } catch (IOException | IllegalArgumentException e) { + throw new IllegalStateException( + "Failed to load skills from " + spec.getScheme() + ":" + spec.getParams(), + e); + } + } + } + + /** Build a {@link SkillOrigin} from a spec for diagnostics (WARN on duplicates, etc.). */ + private static SkillOrigin originOf(SkillSourceSpec spec) { + Map<String, String> p = spec.getParams(); + String location; + switch (spec.getScheme()) { + case "local": + location = p.getOrDefault("path", ""); + break; + case "url": + location = p.getOrDefault("url", ""); + break; + case "classpath": + location = p.getOrDefault("resource", ""); + break; + case "package": + location = p.getOrDefault("package", "") + "/" + p.getOrDefault("resource", ""); + break; + default: + location = p.toString(); + } + return new SkillOrigin(spec.getScheme(), location); + } + + /** + * Close every owned {@link SkillRepository}, releasing any temp directories materialized for + * URL / classpath-zip / classpath-jar sources. Idempotent. + */ + @Override + public void close() { + // repos may map multiple skill names to the same repo instance; dedup by identity. + Set<SkillRepository> unique = Collections.newSetFromMap(new IdentityHashMap<>()); + unique.addAll(repos.values()); Review Comment: **`SkillManager.close()` swallows exceptions silently; inconsistent with `ResourceCache.close()`.** ```java for (SkillRepository repo : unique) { try { repo.close(); } catch (Exception ignored) {} } ``` `ResourceCache.close()` (line 135-160) collects first-exception + suppressed and rethrows. Same close cascade, two styles. Real failures (locked file, permission denied, disk full) give callers no signal. `ResourceContextImpl.ensureSkillManager()` line 116 has the same silent catch on config swap. Minimum: `LOG.warn(..., e)`. Preferable: align with `ResourceCache.close()`'s rethrow pattern. ########## plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java: ########## @@ -532,11 +533,11 @@ private void addSkills(Map<String, Skills> skillsObjects) throws Exception { ResourceType.TOOL, new ResourceDescriptor(BashTool.class.getName(), new HashMap<>()))); - LinkedHashSet<String> paths = new LinkedHashSet<>(); + LinkedHashSet<SkillSourceSpec> sources = new LinkedHashSet<>(); for (Skills s : skillsObjects.values()) { Review Comment: **Multi-`@Skills` merge order is non-deterministic on Java; WARN reports collisions but the "winner" is unspecified.** `AgentPlan.addSkills` iterates `skillsObjects.values()` in `LinkedHashMap` order, but insertion order comes from `Class.getMethods()` — JLS doesn't guarantee declaration order. Test self-discloses at `AgentPlanDeclareSkillsTest:163`: "Class.getMethods() does not preserve declaration order". Two `@Skills` methods with the same skill name pick different winners across deployments / JDK versions; WARN fires but is non-reproducible. Python `dict` preserves insertion order — Python is deterministic, Java is the bad case. Min fix: sort method names lexicographically before merging + javadoc note. Stronger: `@Skills(priority=N)` or fail-fast on collision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
