Copilot commented on code in PR #4394:
URL: https://github.com/apache/flink-cdc/pull/4394#discussion_r3241440307


##########
flink-cdc-common/src/test/java/org/apache/flink/cdc/common/model/AiModelClientFactoryTest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.model;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/** Tests for the default {@link AiModelClientFactory#validate} method. */
+class AiModelClientFactoryTest {
+
+    private static final String IDENTIFIER = "test-provider";
+    private static final String MODEL_NAME = "my-model";
+
+    private static final class StubFactory implements AiModelClientFactory {
+        private final Set<String> required;
+        private final Set<String> optional;
+
+        StubFactory(Set<String> required, Set<String> optional) {
+            this.required = required;
+            this.optional = optional;
+        }
+
+        @Override
+        public String identifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public Set<String> requiredOptions() {
+            return required;
+        }
+
+        @Override
+        public Set<String> optionalOptions() {
+            return optional;
+        }
+
+        @Override
+        public AiModelClient createClient(ModelContext context) {
+            return new AiModelClient() {};
+        }
+    }
+
+    private static ModelContext contextWithOptions(Map<String, String> 
options) {
+        return new ModelContext() {
+            @Override
+            public String getModelName() {
+                return MODEL_NAME;
+            }
+
+            @Override
+            public Map<String, String> getOptions() {
+                return options;
+            }
+
+            @Override
+            public ClassLoader getClassLoader() {
+                return Thread.currentThread().getContextClassLoader();
+            }
+        };
+    }
+
+    @Test
+    void testValidatePassesWithAllRequiredOptions() {
+        StubFactory factory = new StubFactory(Set.of("api-key", "endpoint"), 
Set.of("timeout"));
+
+        Map<String, String> options = new HashMap<>();
+        options.put("api-key", "sk-xxx");
+        options.put("endpoint", "https://api.example.com";);
+
+        // Should not throw
+        factory.validate(contextWithOptions(options));
+    }
+
+    @Test
+    void testValidatePassesWithRequiredAndOptionalOptions() {
+        StubFactory factory = new StubFactory(Set.of("api-key", "endpoint"), 
Set.of("timeout"));
+
+        Map<String, String> options = new HashMap<>();
+        options.put("api-key", "sk-xxx");
+        options.put("endpoint", "https://api.example.com";);
+        options.put("timeout", "30000");
+
+        factory.validate(contextWithOptions(options));
+    }
+
+    @Test
+    void testValidateThrowsOnMissingRequiredOption() {
+        StubFactory factory = new StubFactory(Set.of("api-key", "endpoint"), 
Set.of("timeout"));
+
+        // Missing "endpoint"
+        Map<String, String> options = new HashMap<>();
+        options.put("api-key", "sk-xxx");
+
+        Assertions.assertThatThrownBy(() -> 
factory.validate(contextWithOptions(options)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Missing required options for model 'my-model' 
(type='test-provider'): [endpoint]");
+    }
+
+    @Test
+    void testValidateThrowsOnMultipleMissingRequiredOptions() {
+        StubFactory factory = new StubFactory(Set.of("api-key", "endpoint", 
"model"), Set.of());
+
+        // All required options missing
+        Assertions.assertThatThrownBy(
+                        () -> 
factory.validate(contextWithOptions(Collections.emptyMap())))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Missing required options for model 'my-model' 
(type='test-provider'): [endpoint, api-key, model]");
+    }
+
+    @Test
+    void testValidateThrowsOnUnknownOption() {
+        StubFactory factory = new StubFactory(Set.of("api-key"), 
Set.of("timeout"));
+
+        Map<String, String> options = new HashMap<>();
+        options.put("api-key", "sk-xxx");
+        options.put("bogus", "unexpected");
+
+        Assertions.assertThatThrownBy(() -> 
factory.validate(contextWithOptions(options)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Unknown options for model 'my-model' 
(type='test-provider'): [bogus]");
+    }
+
+    @Test
+    void testValidateThrowsOnMultipleUnknownOptions() {
+        StubFactory factory = new StubFactory(Set.of("api-key"), Set.of());
+
+        Map<String, String> options = new HashMap<>();
+        options.put("api-key", "sk-xxx");
+        options.put("foo", "a");
+        options.put("bar", "b");
+
+        Assertions.assertThatThrownBy(() -> 
factory.validate(contextWithOptions(options)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Unknown options for model 'my-model' 
(type='test-provider'): [bar, foo]");

Review Comment:
   Tests assert exact error message strings that include the contents of a 
`Set` (e.g. `[endpoint, api-key, model]` and `[bar, foo]`). The `validate()` 
implementation in `AiModelClientFactory` collects missing/unknown keys into an 
unordered `HashSet` via `Collectors.toSet()`. The iteration order is not 
guaranteed, so these tests are flaky and may fail intermittently depending on 
JVM/hash seed. Either sort the keys before formatting in `validate()`, or use 
unordered assertions (e.g. parse the bracket contents and 
assertContainsExactlyInAnyOrder).



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java:
##########
@@ -68,6 +85,11 @@ public static ExpressionEvaluator compileExpression(
                             
argumentClasses.add(Class.forName(udfFunction.getClasspath()));
                         }
 
+                        for (String paramName : modelClients.keySet()) {
+                            argumentNames.add(paramName);
+                            argumentClasses.add(AiModelClient.class);
+                        }

Review Comment:
   `COMPILED_EXPRESSION_CACHE` is a global static cache keyed by 
`TransformExpressionKey`, but the parameter list passed to Janino now varies by 
`modelClients` (each model name becomes a positional parameter). The legacy 
overload still compiles with `Collections.emptyMap()`. If the legacy overload 
is invoked first for a key that should bind model-client parameters (e.g. an 
expression that uses an AI function whose model name happens not to influence 
the key/expression text), the cached evaluator will be parameter-incompatible 
with later callers, causing argument-count or compile errors. At minimum, 
include the model client parameter names/types in the cache key, or remove the 
legacy overload.



##########
flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClient.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.models.openai;
+
+import org.apache.flink.cdc.common.model.AiModelClient;
+import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding;
+import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration;
+
+import com.openai.client.OpenAIClient;
+import com.openai.client.okhttp.OpenAIOkHttpClient;
+import com.openai.models.chat.completions.ChatCompletion;
+import com.openai.models.chat.completions.ChatCompletionCreateParams;
+import com.openai.models.embeddings.CreateEmbeddingResponse;
+import com.openai.models.embeddings.Embedding;
+import com.openai.models.embeddings.EmbeddingCreateParams;
+
+import java.util.List;
+
+/** AI model client that connects to any OpenAI-compatible endpoint. */
+public class OpenAiCompatibleModelClient
+        implements AiModelClient, SupportsTextGeneration, SupportsEmbedding {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String endpoint;
+    private final String apiKey;
+    private final String modelName;
+
+    private transient OpenAIClient client;
+
+    public OpenAiCompatibleModelClient(String endpoint, String apiKey, String 
modelName) {
+        this.endpoint = endpoint;
+        this.apiKey = apiKey;
+        this.modelName = modelName;
+    }

Review Comment:
   `OpenAiCompatibleModelClient` holds non-transient `endpoint`, `apiKey`, and 
`modelName` fields and is serialized along with the operator (per the 
`AiModelClient extends Serializable` contract). The `apiKey` will end up in 
checkpoints, savepoints, JobManager metadata, and any logs that print the 
operator state — a security concern, especially for shared/managed Flink 
clusters. Consider redacting in `toString`, or document this clearly and 
recommend secret-store integration. At minimum, the field should never appear 
in log lines.



##########
flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClientFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.models.openai;
+
+import org.apache.flink.cdc.common.model.AiModelClient;
+import org.apache.flink.cdc.common.model.AiModelClientFactory;
+import org.apache.flink.cdc.common.model.ModelContext;
+
+import java.util.Set;
+
+/** SPI factory for {@link OpenAiCompatibleModelClient}. */
+public class OpenAiCompatibleModelClientFactory implements 
AiModelClientFactory {
+
+    @Override
+    public String identifier() {
+        return "openai-compatible";
+    }
+
+    @Override
+    public Set<String> requiredOptions() {
+        return Set.of("endpoint", "api-key", "model-name");
+    }
+
+    @Override
+    public Set<String> optionalOptions() {
+        return Set.of();
+    }
+
+    @Override
+    public AiModelClient createClient(ModelContext context) {
+        String endpoint = context.getOptions().get("endpoint");
+        String apiKey = context.getOptions().get("api-key");
+        String modelName = context.getOptions().get("model-name");
+        return new OpenAiCompatibleModelClient(endpoint, apiKey, modelName);
+    }

Review Comment:
   The OpenAI factory hardcodes `requiredOptions()` and `optionalOptions()` as 
bare string constants ("endpoint", "api-key", "model-name"). The same string 
keys are also used inline in `createClient()` and would need to be kept in 
lock-step manually. Extract these as `static final String` constants (or 
`ConfigOption`s, consistent with how the rest of the codebase declares 
connector options) to avoid drift between the option set and the keys actually 
read.



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -428,24 +427,45 @@ private List<ModelDef> parseModels(JsonNode modelsNode) {
         } else {
             modelDefs.add(convertJsonNodeToModelDef(modelsNode));
         }
+        Set<String> seenNames = new HashSet<>();
+        for (ModelDef model : modelDefs) {
+            if (!seenNames.add(model.getName())) {
+                throw new IllegalArgumentException(
+                        "Duplicate model name '" + model.getName() + "' in 
pipeline definition.");
+            }
+        }
         return modelDefs;
     }
 
     private ModelDef convertJsonNodeToModelDef(JsonNode modelNode) {
+        Preconditions.checkArgument(
+                modelNode instanceof ObjectNode,
+                "`model` in `pipeline` should be an object, but got %s",
+                modelNode);
+        ObjectNode node = (ObjectNode) modelNode;
         String name =
                 checkNotNull(
-                                modelNode.get(MODEL_NAME_KEY),
+                                node.remove(MODEL_NAME_KEY),
                                 "Missing required field \"%s\" in `model`",
                                 MODEL_NAME_KEY)
                         .asText();
-        String model =
+        Preconditions.checkArgument(
+                name.matches("[a-zA-Z_][a-zA-Z0-9_]*") && 
!name.startsWith("__"),
+                "Model name \"%s\" is not a valid identifier. "
+                        + "It must start with a letter or underscore, "
+                        + "contain only letters, digits, or underscores, "
+                        + "and must not start with double underscores.",
+                name);

Review Comment:
   The model name regex `[a-zA-Z_][a-zA-Z0-9_]*` rejects names containing 
hyphens (e.g. `my-model`), which is a very common convention and was implicitly 
allowed by the previous `class-name` based scheme (only the model logical name 
was a free string). Combined with the rename to `name`/`type`, this further 
narrows the set of valid pipelines users can write. Either document this 
constraint prominently or make the rule less restrictive — the constraint 
exists only because the model name is later emitted as a Java identifier in 
JaninoCompiler, but you could escape/translate hyphens internally instead.



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -428,24 +427,45 @@ private List<ModelDef> parseModels(JsonNode modelsNode) {
         } else {
             modelDefs.add(convertJsonNodeToModelDef(modelsNode));
         }
+        Set<String> seenNames = new HashSet<>();
+        for (ModelDef model : modelDefs) {
+            if (!seenNames.add(model.getName())) {
+                throw new IllegalArgumentException(
+                        "Duplicate model name '" + model.getName() + "' in 
pipeline definition.");
+            }
+        }
         return modelDefs;
     }
 
     private ModelDef convertJsonNodeToModelDef(JsonNode modelNode) {
+        Preconditions.checkArgument(
+                modelNode instanceof ObjectNode,
+                "`model` in `pipeline` should be an object, but got %s",
+                modelNode);
+        ObjectNode node = (ObjectNode) modelNode;
         String name =
                 checkNotNull(
-                                modelNode.get(MODEL_NAME_KEY),
+                                node.remove(MODEL_NAME_KEY),
                                 "Missing required field \"%s\" in `model`",
                                 MODEL_NAME_KEY)
                         .asText();
-        String model =
+        Preconditions.checkArgument(
+                name.matches("[a-zA-Z_][a-zA-Z0-9_]*") && 
!name.startsWith("__"),
+                "Model name \"%s\" is not a valid identifier. "
+                        + "It must start with a letter or underscore, "
+                        + "contain only letters, digits, or underscores, "
+                        + "and must not start with double underscores.",
+                name);
+        String type =
                 checkNotNull(
-                                modelNode.get(MODEL_CLASS_NAME_KEY),
+                                node.remove(MODEL_TYPE_KEY),
                                 "Missing required field \"%s\" in `model`",
-                                MODEL_CLASS_NAME_KEY)
+                                MODEL_TYPE_KEY)
                         .asText();
-        Map<String, String> properties = mapper.convertValue(modelNode, 
Map.class);
-        return new ModelDef(name, model, properties);
+        Map<String, String> options = new HashMap<>();
+        node.fields()
+                .forEachRemaining(entry -> options.put(entry.getKey(), 
entry.getValue().asText()));
+        return new ModelDef(name, type, options);
     }
 

Review Comment:
   YAML values such as `temperature: 0.7` are converted to strings via 
`entry.getValue().asText()`. For booleans/numbers this works, but for nested 
objects or arrays (e.g. `headers: { Authorization: ... }` or `stop: ["\n", 
"###"]`), `asText()` returns an empty string, silently dropping configuration. 
If providers may want structured options, use `entry.getValue().toString()` or 
a proper JSON-to-string conversion; otherwise validate up front that all option 
values are scalar.
   



##########
flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClient.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.models.openai;
+
+import org.apache.flink.cdc.common.model.AiModelClient;
+import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding;
+import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration;
+
+import com.openai.client.OpenAIClient;
+import com.openai.client.okhttp.OpenAIOkHttpClient;
+import com.openai.models.chat.completions.ChatCompletion;
+import com.openai.models.chat.completions.ChatCompletionCreateParams;
+import com.openai.models.embeddings.CreateEmbeddingResponse;
+import com.openai.models.embeddings.Embedding;
+import com.openai.models.embeddings.EmbeddingCreateParams;
+
+import java.util.List;
+
+/** AI model client that connects to any OpenAI-compatible endpoint. */
+public class OpenAiCompatibleModelClient
+        implements AiModelClient, SupportsTextGeneration, SupportsEmbedding {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String endpoint;
+    private final String apiKey;
+    private final String modelName;
+
+    private transient OpenAIClient client;
+
+    public OpenAiCompatibleModelClient(String endpoint, String apiKey, String 
modelName) {
+        this.endpoint = endpoint;
+        this.apiKey = apiKey;
+        this.modelName = modelName;
+    }
+
+    @Override
+    public void open() {
+        client = 
OpenAIOkHttpClient.builder().baseUrl(endpoint).apiKey(apiKey).build();
+    }
+
+    @Override
+    public void close() {
+        client = null;

Review Comment:
   `close()` simply nulls out the `client` reference without closing the 
underlying `OpenAIClient` / OkHttp connection pool. This will leak HTTP 
connections, threads, and sockets each time a pipeline operator is closed (or 
restarted from a checkpoint). The OpenAI Java SDK's `OpenAIClient` is 
`AutoCloseable` — call `client.close()` first, then null the reference.
   



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/model/AiModelClientFactory.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.common.model;
+
+import org.apache.flink.cdc.common.annotation.Experimental;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * SPI interface for AI model client factories. Each provider (e.g. 
OpenAI-compatible, DashScope)
+ * ships one implementation, discoverable via {@link java.util.ServiceLoader}.
+ *
+ * <p>The {@link #identifier()} value maps to the {@code type} field of a 
{@code pipeline.model}
+ * entry in the pipeline YAML.
+ */
+@Experimental
+public interface AiModelClientFactory {
+
+    /** A unique, lower-case identifier for this provider, e.g. {@code 
"openai-compatible"}. */
+    String identifier();
+
+    /** Option keys that must be present in the model YAML options block. */
+    Set<String> requiredOptions();
+
+    /** Option keys that may optionally appear in the model YAML options 
block. */
+    Set<String> optionalOptions();
+
+    /**
+     * Validates that the given context contains all required options and no 
unknown options.
+     * Subclasses may override this to add custom validation logic.
+     */
+    default void validate(ModelContext context) {
+        Set<String> required = requiredOptions();
+        Set<String> optional = optionalOptions();
+        if (required != null) {
+            Set<String> missing =
+                    required.stream()
+                            .filter(k -> !context.getOptions().containsKey(k))
+                            .collect(Collectors.toSet());
+            if (!missing.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Missing required options for model '"
+                                + context.getModelName()
+                                + "' (type='"
+                                + identifier()
+                                + "'): "
+                                + missing);
+            }
+        }
+        if (required != null && optional != null) {
+            Set<String> unknown =
+                    context.getOptions().keySet().stream()
+                            .filter(k -> !required.contains(k) && 
!optional.contains(k))
+                            .collect(Collectors.toSet());
+            if (!unknown.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Unknown options for model '"
+                                + context.getModelName()
+                                + "' (type='"
+                                + identifier()
+                                + "'): "
+                                + unknown);
+            }
+        }
+    }

Review Comment:
   `requiredOptions()` and `optionalOptions()` are documented to never be null 
in subclass implementations, but `validate()` defensively allows them to be 
null (`if (required != null)` / `if (required != null && optional != null)`). 
The latter combined check is also incorrect: if `optional == null` but 
`required != null`, the unknown-options check is skipped entirely, silently 
accepting any user-provided keys. Either drop the null-tolerance and document 
the non-null contract, or treat null as an empty set so that validation still 
runs for the other side.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/ai/AiTextFunctionDef.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.ai;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+
+/**
+ * Built-in AI text generation function definitions with their prompt 
templates and type metadata.
+ */
+public enum AiTextFunctionDef {
+    AI_COMPLETE(
+            "AI_COMPLETE",
+            RowType.of(new DataType[] {DataTypes.STRING()}, new String[] 
{"systemPrompt"}),
+            RowType.of(new DataType[] {DataTypes.STRING()}, new String[] 
{"result"}),
+            "%s\n"),
+
+    AI_SUMMARIZE(
+            "AI_SUMMARIZE",
+            RowType.of(new DataType[] {DataTypes.INT()}, new String[] 
{"maxLength"}),
+            RowType.of(new DataType[] {DataTypes.STRING()}, new String[] 
{"summary"}),
+            "You are a text summarization expert. Generate an accurate, 
coherent, and informative "
+                    + "summary that does not exceed %d characters.\n"
+                    + "Output requirements:\n"
+                    + "- summary: the summarized content\n"
+                    + "Principles:\n"
+                    + "- Stay within the specified length\n"
+                    + "- Preserve core ideas and key information\n"
+                    + "- Use concise language with clear logic\n"
+                    + "- Maintain text coherence\n"
+                    + "- Avoid subjective opinions\n");
+
+    private final String functionName;
+    private final RowType inputType;
+    private final RowType outputType;
+    private final String promptTemplate;
+
+    AiTextFunctionDef(
+            String functionName, RowType inputType, RowType outputType, String 
promptTemplate) {
+        this.functionName = functionName;
+        this.inputType = inputType;
+        this.outputType = outputType;
+        this.promptTemplate = promptTemplate;
+    }
+
+    public String getFunctionName() {
+        return functionName;
+    }
+
+    /**
+     * Returns the additional parameter types for promptTemplate placeholders.
+     *
+     * <p>Input text parameter is always added by runtime, not included here.
+     */
+    public RowType getInputType() {
+        return inputType;
+    }
+
+    public RowType getOutputType() {
+        return outputType;
+    }
+
+    /** Builds the core system prompt by filling in the template placeholders. 
*/
+    public String buildPrompt(Object... args) {
+        return String.format(promptTemplate, args);
+    }

Review Comment:
   The system prompt is passed to `String.format(promptTemplate, args)` via 
`AiTextFunctionDef.buildPrompt`. For `AI_COMPLETE` the template is `"%s\n"` and 
the user-provided system prompt is the format argument; for `AI_SUMMARIZE` the 
schema hint also percolates into formatted strings later. If the system prompt 
(which is typically free-form user-supplied text) contains a literal `%` 
character, this will throw `java.util.MissingFormatArgumentException` / 
`IllegalFormatConversionException` at runtime, breaking the pipeline. Consider 
escaping `%` or using a safer substitution mechanism (e.g. `String.replace` 
with a placeholder).



##########
flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/main/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClient.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.models.openai;
+
+import org.apache.flink.cdc.common.model.AiModelClient;
+import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding;
+import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration;
+
+import com.openai.client.OpenAIClient;
+import com.openai.client.okhttp.OpenAIOkHttpClient;
+import com.openai.models.chat.completions.ChatCompletion;
+import com.openai.models.chat.completions.ChatCompletionCreateParams;
+import com.openai.models.embeddings.CreateEmbeddingResponse;
+import com.openai.models.embeddings.Embedding;
+import com.openai.models.embeddings.EmbeddingCreateParams;
+
+import java.util.List;
+
+/** AI model client that connects to any OpenAI-compatible endpoint. */
+public class OpenAiCompatibleModelClient
+        implements AiModelClient, SupportsTextGeneration, SupportsEmbedding {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String endpoint;
+    private final String apiKey;
+    private final String modelName;
+
+    private transient OpenAIClient client;
+
+    public OpenAiCompatibleModelClient(String endpoint, String apiKey, String 
modelName) {
+        this.endpoint = endpoint;
+        this.apiKey = apiKey;
+        this.modelName = modelName;
+    }
+
+    @Override
+    public void open() {
+        client = 
OpenAIOkHttpClient.builder().baseUrl(endpoint).apiKey(apiKey).build();
+    }
+
+    @Override
+    public void close() {
+        client = null;
+    }
+
+    @Override
+    public String generate(String systemPrompt, String userInput) {
+        ChatCompletionCreateParams params =
+                ChatCompletionCreateParams.builder()
+                        .model(modelName)
+                        .addSystemMessage(systemPrompt)
+                        .addUserMessage(userInput)
+                        .build();
+        ChatCompletion completion = client.chat().completions().create(params);
+        return completion.choices().get(0).message().content().orElse(null);
+    }
+
+    @Override
+    public float[] embed(String text) {
+        EmbeddingCreateParams params =
+                
EmbeddingCreateParams.builder().model(modelName).input(text).build();
+        CreateEmbeddingResponse response = client.embeddings().create(params);
+        List<Embedding> data = response.data();
+        if (data.isEmpty()) {
+            return new float[0];

Review Comment:
   `embed()` returns an empty `float[0]` when the OpenAI response data list is 
empty. Callers (`AiFunctions.aiEmbed`) wrap this with `Floats.asList`, 
producing an empty list that is then materialized into the output column 
without any indication that the embedding actually failed/was missing. A silent 
empty vector is hard to distinguish from a legitimate zero-length result and 
will pollute downstream consumers (e.g. vector DBs). Consider throwing or 
logging a warning when the response contains no embeddings.
   



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java:
##########
@@ -526,23 +528,44 @@ private static Java.Rvalue generateOtherFunctionOperation(
                     context.udfDescriptors.stream()
                             .filter(e -> 
e.getName().equalsIgnoreCase(operationName))
                             .findFirst();
-            return udfFunctionOptional
-                    .map(
-                            udfFunction ->
-                                    new Java.MethodInvocation(
-                                            Location.NOWHERE,
-                                            null,
-                                            
generateInvokeExpression(udfFunction),
-                                            atoms))
-                    .orElseGet(
-                            () ->
-                                    new Java.MethodInvocation(
-                                            Location.NOWHERE,
-                                            null,
-                                            StringUtils.convertToCamelCase(
-                                                    
sqlBasicCall.getOperator().getName()),
-                                            atoms));
+            if (udfFunctionOptional.isPresent()) {
+                return new Java.MethodInvocation(
+                        Location.NOWHERE,
+                        null,
+                        generateInvokeExpression(udfFunctionOptional.get()),
+                        atoms);
+            }
+            if (isAiFunction(operationName) && atoms.length >= 1) {
+                rewriteAiFunctionModelArg(atoms);
+            }
+            return new Java.MethodInvocation(
+                    Location.NOWHERE,
+                    null,
+                    
StringUtils.convertToCamelCase(sqlBasicCall.getOperator().getName()),
+                    atoms);
+        }
+    }
+
+    private static boolean isAiFunction(String upperCaseName) {
+        for (AiTextFunctionDef def : AiTextFunctionDef.values()) {
+            if (def.getFunctionName().equals(upperCaseName)) {
+                return true;
+            }
+        }
+        for (AiEmbeddingFunctionDef def : AiEmbeddingFunctionDef.values()) {
+            if (def.getFunctionName().equals(upperCaseName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static void rewriteAiFunctionModelArg(Java.Rvalue[] atoms) {
+        String modelName = atoms[0].toString();
+        if (modelName.startsWith("\"") && modelName.endsWith("\"")) {
+            modelName = modelName.substring(1, modelName.length() - 1);
         }
+        atoms[0] = new Java.AmbiguousName(Location.NOWHERE, new String[] 
{modelName});
     }

Review Comment:
   `rewriteAiFunctionModelArg` blindly calls `atoms[0].toString()` and assumes 
the result is either a quoted string literal `"name"` or an identifier. If the 
user passes anything else as the first argument to an AI function (e.g. a 
column reference, a function call, a non-string literal), this will produce a 
malformed Java identifier such as a number or expression text and the generated 
Janino code will fail to compile with a confusing error. Validate that the 
first argument is actually a string literal at SQL parse time, or emit a clear 
error here when stripping quotes does not yield a valid identifier.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/AiFunctionSqlOperatorTable.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.parser.metadata;
+
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.runtime.ai.AiEmbeddingFunctionDef;
+import org.apache.flink.cdc.runtime.ai.AiTextFunctionDef;
+import org.apache.flink.cdc.runtime.typeutils.CalciteDataTypeConverter;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlTypeFamily;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.util.SqlOperatorTables;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Creates SqlOperatorTable from {@link AiTextFunctionDef} definitions. */
+public class AiFunctionSqlOperatorTable {
+
+    private AiFunctionSqlOperatorTable() {}
+
+    /** Creates an SqlOperatorTable containing all AI functions defined in 
AiFunctionDef. */
+    public static org.apache.calcite.sql.SqlOperatorTable create() {
+        List<SqlFunction> functions = new ArrayList<>();
+        for (AiTextFunctionDef def : AiTextFunctionDef.values()) {
+            functions.add(createTextSqlFunction(def));
+        }
+        for (AiEmbeddingFunctionDef def : AiEmbeddingFunctionDef.values()) {
+            functions.add(createEmbeddingSqlFunction(def));
+        }
+        return SqlOperatorTables.of(functions);
+    }
+
+    private static SqlFunction createTextSqlFunction(AiTextFunctionDef def) {
+        return new SqlFunction(
+                def.getFunctionName(),
+                SqlKind.OTHER_FUNCTION,
+                ReturnTypes.explicit(SqlTypeName.VARIANT),
+                null,
+                
OperandTypes.family(toSqlTypeFamiliesWithAdditionalParams(def.getInputType())),
+                SqlFunctionCategory.USER_DEFINED_FUNCTION);
+    }
+
+    private static SqlFunction 
createEmbeddingSqlFunction(AiEmbeddingFunctionDef def) {
+        return new SqlFunction(
+                def.getFunctionName(),
+                SqlKind.OTHER_FUNCTION,
+                opBinding ->
+                        CalciteDataTypeConverter.convertCalciteType(
+                                opBinding.getTypeFactory(), 
def.getOutputType()),
+                null,
+                OperandTypes.family(SqlTypeFamily.STRING, 
toSqlTypeFamily(def.getInputType())),
+                SqlFunctionCategory.USER_DEFINED_FUNCTION);
+    }
+
+    /**
+     * Converts inputType to SqlTypeFamily array, prepending additional 
parameters: modelName
+     * (STRING) and input (STRING).
+     */
+    private static SqlTypeFamily[] 
toSqlTypeFamiliesWithAdditionalParams(RowType inputType) {
+        List<SqlTypeFamily> families = new ArrayList<>();
+        families.add(SqlTypeFamily.STRING); // modelName
+        families.add(SqlTypeFamily.STRING); // input
+        for (DataType fieldType : inputType.getFieldTypes()) {
+            families.add(toSqlTypeFamily(fieldType));
+        }
+        return families.toArray(new SqlTypeFamily[0]);
+    }
+
+    private static SqlTypeFamily toSqlTypeFamily(DataType dataType) {
+        switch (dataType.getTypeRoot()) {
+            case VARCHAR:
+            case CHAR:
+                return SqlTypeFamily.STRING;
+            case INTEGER:
+                return SqlTypeFamily.INTEGER;
+            case BIGINT:
+                return SqlTypeFamily.NUMERIC;
+            case FLOAT:
+            case DOUBLE:
+                return SqlTypeFamily.APPROXIMATE_NUMERIC;
+            case BOOLEAN:
+                return SqlTypeFamily.BOOLEAN;
+            default:
+                return SqlTypeFamily.ANY;
+        }

Review Comment:
   `toSqlTypeFamily` maps `INTEGER` to `SqlTypeFamily.INTEGER` but `BIGINT` to 
`SqlTypeFamily.NUMERIC`. This is asymmetric: `NUMERIC` is a much broader family 
that admits decimals/floats and would let users pass a DOUBLE column where a 
BIGINT is expected. Map BIGINT to `SqlTypeFamily.EXACT_NUMERIC` (or to 
`INTEGER` if you accept the narrowing implications consistent with the existing 
INTEGER case) for consistency.



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -96,9 +97,8 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
     private static final String UDF_OPTIONS_KEY = "options";
 
     // Model related keys
-    private static final String MODEL_NAME_KEY = "model-name";
-
-    private static final String MODEL_CLASS_NAME_KEY = "class-name";
+    private static final String MODEL_NAME_KEY = "name";
+    private static final String MODEL_TYPE_KEY = "type";

Review Comment:
   Renaming `model-name`/`class-name` to `name`/`type` is a breaking change to 
the public pipeline YAML schema. The PR removes the previous `class-name`-based 
configuration entirely without a deprecation path or migration shim, so any 
user upgrading with an existing `pipeline.model` block will fail to parse. 
Please add a release note flagging this incompatibility, and consider 
supporting the legacy keys with a deprecation warning for at least one release.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -558,4 +571,28 @@ private void destroyUdf() {
         udfDescriptors.clear();
         udfFunctionInstances.clear();
     }
+
+    private void initializeAiModelClients() {
+        for (Map.Entry<String, AiModelClient> entry : modelClients.entrySet()) 
{
+            try {
+                entry.getValue().open();
+                LOG.info("Successfully opened AI model client '{}'.", 
entry.getKey());
+            } catch (Exception e) {
+                LOG.error("Failed to open AI model client '{}'.", 
entry.getKey(), e);
+                throw new FlinkRuntimeException(
+                        "Failed to initialize AI model: " + entry.getKey(), e);
+            }
+        }
+    }
+
+    private void destroyAiModelClients() {
+        for (Map.Entry<String, AiModelClient> entry : modelClients.entrySet()) 
{
+            try {
+                entry.getValue().close();
+                LOG.info("Successfully closed AI model client '{}'.", 
entry.getKey());
+            } catch (Exception e) {
+                LOG.warn("Failed to close AI model client '{}'.", 
entry.getKey(), e);
+            }
+        }
+    }

Review Comment:
   `destroyAiModelClients` swallows any exception thrown by `client.close()` 
with only a `LOG.warn`. While not failing close is generally desirable in a 
teardown path, downstream operational issues (e.g. blocked HTTP threads not 
released) will be invisible to users that only check job status. Consider 
including a stack trace placeholder hint, or aggregating exceptions and 
rethrowing after iterating all entries so a single bad client does not silence 
everything. Also worth noting that `initializeAiModelClients` runs before 
`initializeUdf` — if a model `open()` fails, UDFs will never be initialized but 
the operator's `udfDescriptors`/`udfFunctionInstances` collections may be 
referenced by other code paths during teardown.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/impl/AiFunctions.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.functions.impl;
+
+import org.apache.flink.cdc.common.model.AiModelClient;
+import org.apache.flink.cdc.common.model.abilities.SupportsEmbedding;
+import org.apache.flink.cdc.common.model.abilities.SupportsTextGeneration;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.types.variant.BinaryVariant;
+import org.apache.flink.cdc.common.types.variant.BinaryVariantInternalBuilder;
+import org.apache.flink.cdc.runtime.ai.AiTextFunctionDef;
+
+import org.apache.flink.shaded.guava31.com.google.common.primitives.Floats;
+
+import java.util.List;
+
+/** Built-in AI functions available as static imports in Janino-compiled 
transform expressions. */
+public class AiFunctions {

Review Comment:
   `AiFunctions` has no public constructor declared, so a default public one is 
generated. Since this class is purely a static utility (matching the pattern of 
other `*Functions` helper classes), make the constructor private to prevent 
instantiation, mirroring the style used by `AiFunctionSqlOperatorTable` (which 
already has `private AiFunctionSqlOperatorTable()`).



##########
flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/pom.xml:
##########
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-cdc-pipeline-model</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-cdc-pipeline-model-openai-compatible</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jackson.version>2.13.4</jackson.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.openai</groupId>
+            <artifactId>openai-java</artifactId>
+            <version>4.32.0</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>shade-flink</id>
+                        <configuration>
+                            <artifactSet>
+                                <includes combine.children="append">
+                                    <include>*:*</include>
+                                </includes>
+                            </artifactSet>
+                            <filters combine.children="append">
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        
<exclude>META-INF/services/com.fasterxml.**</exclude>
+                                        
<exclude>META-INF/services/kotlin.**</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.models.openai.shaded.com.fasterxml</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>okhttp3</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.models.openai.shaded.okhttp3</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>okio</pattern>
+                                    
<shadedPattern>org.apache.flink.cdc.models.openai.shaded.okio</shadedPattern>
+                                </relocation>

Review Comment:
   The shaded `openai-compatible` module relocates `com.fasterxml`, `okhttp3`, 
and `okio`, but does not relocate `kotlin.*`. The OpenAI Java SDK depends on 
Kotlin stdlib; if a different Kotlin version is loaded by Flink or another 
connector at runtime, this will conflict. Either add a `kotlin` relocation here 
or confirm that the OpenAI SDK does not bring kotlin onto the classpath in the 
shaded jar.
   



##########
flink-cdc-pipeline-model/flink-cdc-pipeline-model-openai-compatible/src/test/java/org/apache/flink/cdc/models/openai/OpenAiCompatibleModelClientITCase.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.models.openai;
+
+import org.assertj.core.api.Assumptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class OpenAiCompatibleModelClientITCase {
+
+    private OpenAiCompatibleModelClient client;
+
+    @BeforeEach
+    void setUp() {
+        String endpoint = System.getenv("OPENAI_BASE_URL");
+        String apiKey = System.getenv("OPENAI_API_KEY");
+        String model = System.getenv("OPENAI_MODEL");
+        Assumptions.assumeThat(endpoint != null && apiKey != null && model != 
null)
+                .as("OPENAI_BASE_URL, OPENAI_API_KEY and OPENAI_MODEL must be 
set")
+                .isTrue();
+
+        client = new OpenAiCompatibleModelClient(endpoint, apiKey, model);
+        client.open();
+    }

Review Comment:
   `Assumptions.assumeThat(...).isTrue()` is being misused: 
`org.assertj.core.api.Assumptions` should be invoked with a value to assume, 
but here the `as(...)` description message is misleading — when `endpoint`, 
`apiKey` or `model` is null, the `assumeThat` call still receives the boolean 
result of the AND expression, so AssertJ skips the test. That part works, but 
the `.as("OPENAI_BASE_URL, OPENAI_API_KEY and OPENAI_MODEL must be set")` 
description is only shown when the assumption fails (i.e. test is skipped), 
which the developer probably intends. However the same pattern is duplicated in 
the e2e test — consider extracting a small helper, and double-check that JUnit 
Jupiter actually reports the test as skipped (not passed) on missing env vars 
in your CI to avoid silently green results.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java:
##########
@@ -526,23 +528,44 @@ private static Java.Rvalue generateOtherFunctionOperation(
                     context.udfDescriptors.stream()
                             .filter(e -> 
e.getName().equalsIgnoreCase(operationName))
                             .findFirst();
-            return udfFunctionOptional
-                    .map(
-                            udfFunction ->
-                                    new Java.MethodInvocation(
-                                            Location.NOWHERE,
-                                            null,
-                                            
generateInvokeExpression(udfFunction),
-                                            atoms))
-                    .orElseGet(
-                            () ->
-                                    new Java.MethodInvocation(
-                                            Location.NOWHERE,
-                                            null,
-                                            StringUtils.convertToCamelCase(
-                                                    
sqlBasicCall.getOperator().getName()),
-                                            atoms));
+            if (udfFunctionOptional.isPresent()) {
+                return new Java.MethodInvocation(
+                        Location.NOWHERE,
+                        null,
+                        generateInvokeExpression(udfFunctionOptional.get()),
+                        atoms);
+            }
+            if (isAiFunction(operationName) && atoms.length >= 1) {
+                rewriteAiFunctionModelArg(atoms);
+            }
+            return new Java.MethodInvocation(
+                    Location.NOWHERE,
+                    null,
+                    
StringUtils.convertToCamelCase(sqlBasicCall.getOperator().getName()),
+                    atoms);
+        }
+    }
+
+    private static boolean isAiFunction(String upperCaseName) {
+        for (AiTextFunctionDef def : AiTextFunctionDef.values()) {
+            if (def.getFunctionName().equals(upperCaseName)) {
+                return true;
+            }
+        }
+        for (AiEmbeddingFunctionDef def : AiEmbeddingFunctionDef.values()) {
+            if (def.getFunctionName().equals(upperCaseName)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static void rewriteAiFunctionModelArg(Java.Rvalue[] atoms) {
+        String modelName = atoms[0].toString();
+        if (modelName.startsWith("\"") && modelName.endsWith("\"")) {
+            modelName = modelName.substring(1, modelName.length() - 1);
         }
+        atoms[0] = new Java.AmbiguousName(Location.NOWHERE, new String[] 
{modelName});
     }

Review Comment:
   There are no unit tests for `JaninoCompiler.rewriteAiFunctionModelArg` 
itself nor for `AiFunctionSqlOperatorTable`. The new code paths in 
`JaninoCompiler` (`isAiFunction`, `rewriteAiFunctionModelArg`) are exercised 
only indirectly via `AiFunctionParserTest`. Given the fragility of the 
model-name extraction logic (raw `toString()` parse), please add direct tests 
covering edge cases such as: model name not in quotes, model name being a 
non-string-literal expression, model name containing characters disallowed by 
the YAML model-name regex, and case mismatch between `pipeline.model.name` and 
the SQL literal.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/AiFunctionE2eITCase.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.assertj.core.api.Assumptions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Path;
+import java.time.Duration;
+
+/** E2e tests for AI functions with the dummy model and openai-compatible 
model. */
+class AiFunctionE2eITCase extends PipelineTestEnvironment {
+
+    @Test
+    void testAiFunctionsWithDummyModel() throws Exception {
+        String pipelineJob =
+                "source:\n"
+                        + "  type: values\n"
+                        + "  event-set.id: SINGLE_SPLIT_MULTI_TABLES\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: values\n"
+                        + "\n"
+                        + "transform:\n"
+                        + "  - source-table: 
default_namespace.default_schema.table1\n"
+                        + "    projection: col1, AI_COMPLETE('myModel', col1, 
'Classify into catA or catB') AS cls\n"
+                        + "  - source-table: 
default_namespace.default_schema.table2\n"
+                        + "    projection: col1, AI_EMBED('myModel', col1) AS 
embedding\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1\n"
+                        + "  schema.change.behavior: evolve\n"
+                        + "  model:\n"
+                        + "    name: myModel\n"
+                        + "    type: dummy\n"
+                        + "    debug: true\n";
+
+        Path dummyModelJar = TestUtils.getResource("dummy-model.jar");
+        submitPipelineJob(pipelineJob, dummyModelJar);
+        waitUntilJobFinished(Duration.ofMinutes(3));
+
+        validateResult(
+                "Successfully opened AI model client 'myModel'.",
+                
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`cls` VARIANT}, primaryKeys=col1, 
options=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=INSERT, 
meta=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=INSERT, 
meta=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, {\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=INSERT, 
meta=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 
{\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], after=[], op=DELETE, 
meta=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 
{\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], after=[2, 
{\"result\":\"dummy result\",\"summary\":\"TL;DR\"}], op=UPDATE, meta=()}",
+                
"CreateTableEvent{tableId=default_namespace.default_schema.table2, 
schema=columns={`col1` STRING NOT NULL,`embedding` ARRAY<FLOAT>}, 
primaryKeys=col1, options=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], 
after=[1, [3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0]], op=INSERT, meta=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], 
after=[2, [3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0]], op=INSERT, meta=()}",
+                
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], 
after=[3, [3.0, 1.0, 4.0, 1.0, 5.0, 9.0, 2.0, 6.0]], op=INSERT, meta=()}",
+                "Successfully closed AI model client 'myModel'.");

Review Comment:
   `testAiFunctionsWithDummyModel` asserts on log messages such as 
`"Successfully opened AI model client 'myModel'."` via `validateResult`. These 
are info-level log strings emitted by the operator (not pipeline output 
records), and are easily broken by any cosmetic logging change. Tying e2e test 
correctness to log text is fragile; prefer asserting on emitted records and 
decouple lifecycle verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to