This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 821dfc889c [Feature][Transform] DynamicCompile add transform  (#7170)
821dfc889c is described below

commit 821dfc889c8a63f01ddfe3fd634384a0a1f12c88
Author: lizhenglei <127465317+jackyyyyys...@users.noreply.github.com>
AuthorDate: Sat Jul 20 20:25:50 2024 +0800

    [Feature][Transform] DynamicCompile add transform  (#7170)
---
 docs/en/transform-v2/dynamic-compile.md            | 128 +++++++++++++++++
 pom.xml                                            |   1 +
 seatunnel-e2e/pom.xml                              |   8 +-
 .../e2e/transform/TestDynamicCompileIT.java        |  73 ++++++++++
 ...ixed_dynamic_groovy_java_compile_transform.conf | 155 ++++++++++++++++++++
 .../multiple_dynamic_groovy_compile_transform.conf | 155 ++++++++++++++++++++
 .../multiple_dynamic_java_compile_transform.conf   | 157 +++++++++++++++++++++
 .../single_dynamic_groovy_compile_transform.conf   | 110 +++++++++++++++
 .../single_dynamic_java_compile_transform.conf     | 114 +++++++++++++++
 seatunnel-shade/pom.xml                            |   2 +
 seatunnel-shade/seatunnel-janino/pom.xml           | 103 ++++++++++++++
 seatunnel-transforms-v2/pom.xml                    |  11 ++
 .../transform/dynamiccompile/CompileLanguage.java  |  23 +++
 .../dynamiccompile/CompileTransformErrorCode.java  |  42 ++++++
 .../dynamiccompile/DynamicCompileTransform.java    |  94 ++++++++++++
 .../DynamicCompileTransformConfig.java             |  42 ++++++
 .../DynamicCompileTransformFactory.java            |  50 +++++++
 .../dynamiccompile/parse/AbstractParse.java        |  25 ++++
 .../dynamiccompile/parse/GroovyClassParse.java     |  26 ++++
 .../dynamiccompile/parse/GroovyClassUtil.java      |  28 ++++
 .../dynamiccompile/parse/JavaClassParse.java       |  25 ++++
 .../dynamiccompile/parse/JavaClassUtil.java        |  44 ++++++
 .../transform/dynamiccompile/parse/ParseUtil.java  |  29 ++++
 tools/dependencies/known-dependencies.txt          |   4 +-
 24 files changed, 1447 insertions(+), 2 deletions(-)

diff --git a/docs/en/transform-v2/dynamic-compile.md 
b/docs/en/transform-v2/dynamic-compile.md
new file mode 100644
index 0000000000..5bfbbadbe0
--- /dev/null
+++ b/docs/en/transform-v2/dynamic-compile.md
@@ -0,0 +1,128 @@
+# DynamicCompile
+
+> DynamicCompile transform plugin
+
+## Description
+
+Provide a programmable way to process rows, allowing users to customize any 
business behavior, even RPC requests based on existing row fields as 
parameters, or to expand fields by retrieving associated data from other data 
sources. To distinguish businesses, you can also define multiple transforms to 
combine,
+If the conversion is too complex, it may affect performance
+
+## Options
+
+|       name       |  type  | required | default value |
+|------------------|--------|----------|---------------|
+| source_code      | string | yes      |               |
+| compile_language | string | yes      |               |
+
+### source_code [string]
+
+The code must implement two methods: getInlineOutputColumns and 
getInlineOutputFieldValues. getInlineOutputColumns determines the columns you 
want to add or convert, and the original column structure can be obtained from 
CatalogTable
+GetInlineOutputFieldValues determines your column values. You can fulfill any 
of your requirements, and even complete RPC requests to obtain new values based 
on the original columns
+If there are third-party dependency packages, please place them in 
${SEATUNNEL_HOME}/lib, if you use spark or flink, you need to put it under the 
libs of the corresponding service.
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform 
Plugin](common-options.md) for details
+
+### compile_language [string]
+
+Some syntax in Java may not be supported, please refer 
https://github.com/janino-compiler/janino
+GROOVY,JAVA
+
+## Example
+
+The data read from source is a table like this:
+
+|   name   | age | card |
+|----------|-----|------|
+| Joy Ding | 20  | 123  |
+| May Ding | 20  | 123  |
+| Kin Dom  | 20  | 123  |
+| Joy Dom  | 20  | 123  |
+
+```
+transform {
+ DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language="GROOVY"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+                 class demo  {
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                          List<Column> columns = new ArrayList<>();
+                         PhysicalColumn destColumn =
+                         PhysicalColumn.of(
+                         "aa",
+                        BasicType.STRING_TYPE,
+                         10,
+                        true,
+                        "",
+                        "");
+                         columns.add(destColumn);
+                        return columns.toArray(new Column[0]);
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="AA"
+                       return fieldValues;
+                     }
+                 };"""
+
+  }
+}
+
+transform {
+ DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language="JAVA"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column;
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import org.apache.seatunnel.api.table.catalog.*;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+
+                       ArrayList<Column> columns = new ArrayList<Column>();
+                                               PhysicalColumn destColumn =
+                                               PhysicalColumn.of(
+                                               "aa",
+                                              BasicType.STRING_TYPE,
+                                               10,
+                                              true,
+                                              "",
+                                              "");
+                                                 return new Column[]{
+                                                                destColumn
+                                                        };
+
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="AA";
+                       return fieldValues;
+                     }
+                """
+
+  }
+ } 
+```
+
+Then the data in result table `fake1` will like this
+
+|   name   | age | card | aa |
+|----------|-----|------|----|
+| Joy Ding | 20  | 123  | AA |
+| May Ding | 20  | 123  | AA |
+| Kin Dom  | 20  | 123  | AA |
+| Joy Dom  | 20  | 123  | AA |
+
+## Changelog
+
diff --git a/pom.xml b/pom.xml
index 50c0d41200..41854d78fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,6 +133,7 @@
         <spotless.version>2.29.0</spotless.version>
         <jsqlparser.version>4.5</jsqlparser.version>
         <json-path.version>2.7.0</json-path.version>
+        <groovy.version>4.0.16</groovy.version>
         <!-- Option args -->
         <skipUT>false</skipUT>
         <skipIT>true</skipIT>
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
index ff6ad8bea4..661892e54d 100644
--- a/seatunnel-e2e/pom.xml
+++ b/seatunnel-e2e/pom.xml
@@ -36,7 +36,7 @@
 
     <properties>
         <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
-        <rest-assured.version>4.3.1</rest-assured.version>
+        <rest-assured.version>5.4.0</rest-assured.version>
     </properties>
     <dependencies>
         <dependency>
@@ -60,6 +60,12 @@
             <artifactId>rest-assured</artifactId>
             <version>${rest-assured.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.codehaus.groovy</groupId>
+                    <artifactId>groovy</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>io.rest-assured</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
new file mode 100644
index 0000000000..5c5e69dad2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform;
+
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class TestDynamicCompileIT extends TestSuiteBase {
+
+    @TestTemplate
+    public void testDynamicSingleCompileGroovy(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/dynamic_compile/single_dynamic_groovy_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testDynamicSingleCompileJava(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                
container.executeJob("/dynamic_compile/single_dynamic_java_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testDynamicMultipleCompileGroovy(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testDynamicMultipleCompileJava(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/dynamic_compile/multiple_dynamic_java_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testDynamicMixedCompileJavaAndGroovy(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        
"/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf
new file mode 100644
index 0000000000..5c32e8d5a0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+ DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language="JAVA"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column;
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import org.apache.seatunnel.api.table.catalog.*;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+
+
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+
+                       ArrayList<Column> columns = new ArrayList<Column>();
+                                               PhysicalColumn destColumn =
+                                               PhysicalColumn.of(
+                                               "col1",
+                                              BasicType.STRING_TYPE,
+                                               10,
+                                              true,
+                                              "",
+                                              "");
+                                                 return new Column[]{
+                                                                destColumn
+                                                        };
+
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="test1";
+                       return fieldValues;
+                     }
+                """
+
+  }
+ DynamicCompile {
+    source_table_name = "fake1"
+    result_table_name = "fake2"
+    compile_language="GROOVY"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+                 class demo  {
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                          List<Column> columns = new ArrayList<>();
+                         PhysicalColumn destColumn =
+                         PhysicalColumn.of(
+                         "col2",
+                        BasicType.STRING_TYPE,
+                         10,
+                        true,
+                        "",
+                        "");
+                         columns.add(destColumn);
+                        return columns.toArray(new Column[0]);
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="test2"
+                       return fieldValues;
+                     }
+                 };"""
+
+  }
+
+}
+
+
+sink {
+  Assert {
+     source_table_name = "fake2"
+     rules =
+       {
+         row_rules = [
+           {
+             rule_type = MIN_ROW
+             rule_value = 100
+           }
+         ],
+         field_rules = [
+           {
+             field_name = col1
+             field_type = string
+             field_value = [
+               {
+                 rule_type = NOT_NULL
+                 equals_to = "test1"
+
+               }
+             ]
+           },
+           {
+             field_name = col2
+             field_type = string
+             field_value = [
+               {
+                 rule_type = NOT_NULL
+                 equals_to = "test2"
+
+               }
+
+             ]
+           }
+         ]
+       }
+   }
+
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf
new file mode 100644
index 0000000000..31756b9941
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+ DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language="GROOVY"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+                 class demo  {
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                          List<Column> columns = new ArrayList<>();
+                         PhysicalColumn destColumn =
+                         PhysicalColumn.of(
+                         "aa",
+                        BasicType.STRING_TYPE,
+                         10,
+                        true,
+                        "",
+                        "");
+                         columns.add(destColumn);
+                        return columns.toArray(new Column[0]);
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="AA"
+                       return fieldValues;
+                     }
+                 };"""
+
+  }
+  DynamicCompile {
+      source_table_name = "fake1"
+      result_table_name = "fake2"
+      compile_language="GROOVY"
+      source_code="""
+                   import org.apache.seatunnel.api.table.catalog.Column
+                   import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                   import org.apache.seatunnel.api.table.catalog.CatalogTable
+                   import 
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+                   import org.apache.seatunnel.api.table.type.*;
+                   import java.util.ArrayList;
+                   class demo  {
+                       public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                            List<Column> columns = new ArrayList<>();
+                           PhysicalColumn destColumn =
+                           PhysicalColumn.of(
+                           "bb",
+                          BasicType.STRING_TYPE,
+                           10,
+                          true,
+                          "",
+                          "");
+                           columns.add(destColumn);
+                          return columns.toArray(new Column[0]);
+                       }
+                       public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                         Object[] fieldValues = new Object[1];
+                         fieldValues[0]="BB"
+                         return fieldValues;
+                       }
+                   };"""
+
+    }
+}
+
+sink {
+  Assert {
+    source_table_name = "fake2"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = id
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+           {
+                      field_name = bb
+                      field_type = string
+                      field_value = [
+                        {
+                          rule_type = NOT_NULL
+                          equals_to = "BB"
+
+                        }
+
+                      ]
+                    }
+          {
+            field_name = aa
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "AA"
+
+              }
+
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf
new file mode 100644
index 0000000000..94e3a41272
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf
@@ -0,0 +1,157 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+ DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language="JAVA"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column;
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import org.apache.seatunnel.api.table.catalog.*;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+
+
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+
+                       ArrayList<Column> columns = new ArrayList<Column>();
+                                               PhysicalColumn destColumn =
+                                               PhysicalColumn.of(
+                                               "col1",
+                                              BasicType.STRING_TYPE,
+                                               10,
+                                              true,
+                                              "",
+                                              "");
+                                                 return new Column[]{
+                                                                destColumn
+                                                        };
+
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="test1";
+                       return fieldValues;
+                     }
+                """
+
+  }
+  DynamicCompile {
+      source_table_name = "fake1"
+      result_table_name = "fake2"
+      compile_language="JAVA"
+      source_code="""
+                   import org.apache.seatunnel.api.table.catalog.Column;
+                   import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                   import org.apache.seatunnel.api.table.catalog.*;
+                   import org.apache.seatunnel.api.table.type.*;
+                   import java.util.ArrayList;
+                       public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+
+                         ArrayList<Column> columns = new ArrayList<Column>();
+                                                 PhysicalColumn destColumn =
+                                                 PhysicalColumn.of(
+                                                 "col2",
+                                                BasicType.STRING_TYPE,
+                                                 10,
+                                                true,
+                                                "",
+                                                "");
+                                                   return new Column[]{
+                                                                  destColumn
+                                                          };
+
+                       }
+                       public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+
+                         Object[] fieldValues = new Object[1];
+                         fieldValues[0]="test2";
+                         return fieldValues;
+                       }
+                  """
+
+    }
+
+}
+
+
+sink {
+  Assert {
+     source_table_name = "fake2"
+     rules =
+       {
+         row_rules = [
+           {
+             rule_type = MIN_ROW
+             rule_value = 100
+           }
+         ],
+         field_rules = [
+           {
+             field_name = col1
+             field_type = string
+             field_value = [
+               {
+                 rule_type = NOT_NULL
+                 equals_to = "test1"
+
+               }
+             ]
+           },
+           {
+             field_name = col2
+             field_type = string
+             field_value = [
+               {
+                 rule_type = NOT_NULL
+                 equals_to = "test2"
+
+               }
+
+             ]
+           }
+         ]
+       }
+   }
+
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf
new file mode 100644
index 0000000000..c478d33ddc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+ DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language="GROOVY"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor
+                 import org.apache.seatunnel.api.table.catalog.CatalogTable
+                 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+                 class demo  {
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+                          List<Column> columns = new ArrayList<>();
+                         PhysicalColumn destColumn =
+                         PhysicalColumn.of(
+                         "aa",
+                        BasicType.STRING_TYPE,
+                         10,
+                        true,
+                        "",
+                        "");
+                         columns.add(destColumn);
+                        return columns.toArray(new Column[0]);
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="AA"
+                       return fieldValues;
+                     }
+                 };"""
+
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = id
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = aa
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "AA"
+
+              }
+
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf
new file mode 100644
index 0000000000..d3a735b630
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language="JAVA"
+    source_code="""
+                 import org.apache.seatunnel.api.table.catalog.Column;
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import org.apache.seatunnel.api.table.catalog.*;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+
+
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+
+                       ArrayList<Column> columns = new ArrayList<Column>();
+                                               PhysicalColumn destColumn =
+                                               PhysicalColumn.of(
+                                               "col1",
+                                              BasicType.STRING_TYPE,
+                                               10,
+                                              true,
+                                              "",
+                                              "");
+                                                 return new Column[]{
+                                                                destColumn
+                                                        };
+
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="test1";
+                       return fieldValues;
+                     }
+                """
+
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = id
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = col1
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "test1"
+
+              }
+
+            ]
+          }
+        ]
+      }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-shade/pom.xml b/seatunnel-shade/pom.xml
index d8c5ff0003..cc761e6640 100644
--- a/seatunnel-shade/pom.xml
+++ b/seatunnel-shade/pom.xml
@@ -33,6 +33,8 @@
         <module>seatunnel-arrow-5.0</module>
         <module>seatunnel-thrift-service</module>
         <module>seatunnel-hazelcast</module>
+        <module>seatunnel-janino</module>
+
     </modules>
 
     <build>
diff --git a/seatunnel-shade/seatunnel-janino/pom.xml 
b/seatunnel-shade/seatunnel-janino/pom.xml
new file mode 100644
index 0000000000..a661a49845
--- /dev/null
+++ b/seatunnel-shade/seatunnel-janino/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-shade</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>seatunnel-janino</artifactId>
+    <name>SeaTunnel : Shade : Janino</name>
+    <properties>
+        <janino.verion>3.0.11</janino.verion>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.codehaus.janino</groupId>
+            <artifactId>janino</artifactId>
+            <version>${janino.verion}</version>
+            <optional>true</optional>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <finalName>seatunnel-janino</finalName>
+                            <createSourcesJar>true</createSourcesJar>
+                            <shadeSourcesContent>true</shadeSourcesContent>
+                            
<shadedArtifactAttached>false</shadedArtifactAttached>
+                            
<createDependencyReducedPom>false</createDependencyReducedPom>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.codehaus</pattern>
+                                    
<shadedPattern>${seatunnel.shade.package}.org.codehaus</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-artifacts</id>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    
<file>${basedir}/target/seatunnel-janino.jar</file>
+                                    <type>jar</type>
+                                    <classifier>optional</classifier>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/seatunnel-transforms-v2/pom.xml b/seatunnel-transforms-v2/pom.xml
index 4dfaebb76f..ae8909f463 100644
--- a/seatunnel-transforms-v2/pom.xml
+++ b/seatunnel-transforms-v2/pom.xml
@@ -66,6 +66,17 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.groovy</groupId>
+            <artifactId>groovy</artifactId>
+            <version>${groovy.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-janino</artifactId>
+            <version>${project.version}</version>
+            <classifier>optional</classifier>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
new file mode 100644
index 0000000000..be0e468e4d
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.dynamiccompile;
+
+public enum CompileLanguage {
+    GROOVY,
+    JAVA
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileTransformErrorCode.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileTransformErrorCode.java
new file mode 100644
index 0000000000..69ff8f0d76
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileTransformErrorCode.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.dynamiccompile;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum CompileTransformErrorCode implements SeaTunnelErrorCode {
+    COMPILE_TRANSFORM_ERROR_CODE(
+            "COMPILE_TRANSFORM_ERROR_CODE-01", "CompileTransform error please 
check code");
+
+    private final String code;
+    private final String description;
+
+    CompileTransformErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
new file mode 100644
index 0000000000..d798871401
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.dynamiccompile;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
+import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+import org.apache.seatunnel.transform.dynamiccompile.parse.AbstractParse;
+import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse;
+import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse;
+import org.apache.seatunnel.transform.exception.TransformException;
+
+import static 
org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE;
+
+public class DynamicCompileTransform extends MultipleFieldOutputTransform {
+    public static final String PLUGIN_NAME = "DynamicCompile";
+
+    public static final String getInlineOutputColumns = 
"getInlineOutputColumns";
+
+    public static final String getInlineOutputFieldValues = 
"getInlineOutputFieldValues";
+
+    private final String sourceCode;
+
+    private AbstractParse DynamicCompileParse;
+
+    public DynamicCompileTransform(ReadonlyConfig readonlyConfig, CatalogTable 
catalogTable) {
+        super(catalogTable);
+        CompileLanguage compileLanguage =
+                
readonlyConfig.get(DynamicCompileTransformConfig.COMPILE_LANGUAGE);
+        // todo other compile
+        if (CompileLanguage.GROOVY.equals(compileLanguage)) {
+            DynamicCompileParse = new GroovyClassParse();
+        } else if (CompileLanguage.JAVA.equals(compileLanguage)) {
+            DynamicCompileParse = new JavaClassParse();
+        }
+        sourceCode = 
readonlyConfig.get(DynamicCompileTransformConfig.SOURCE_CODE);
+    }
+
+    @Override
+    public String getPluginName() {
+        return PLUGIN_NAME;
+    }
+
+    @Override
+    protected Column[] getOutputColumns() {
+        Object result;
+        try {
+            result =
+                    ReflectionUtils.invoke(
+                            
DynamicCompileParse.parseClass(sourceCode).newInstance(),
+                            getInlineOutputColumns,
+                            inputCatalogTable);
+
+        } catch (Exception e) {
+            throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, 
e.getMessage());
+        }
+
+        return (Column[]) result;
+    }
+
+    @Override
+    protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+        Object result;
+        try {
+            result =
+                    ReflectionUtils.invoke(
+                            
DynamicCompileParse.parseClass(sourceCode).newInstance(),
+                            getInlineOutputFieldValues,
+                            inputRow);
+
+        } catch (Exception e) {
+            throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, 
e.getMessage());
+        }
+        return (Object[]) result;
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java
new file mode 100644
index 0000000000..48a47d0383
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.dynamiccompile;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.io.Serializable;
+
+@Getter
+@Setter
+public class DynamicCompileTransformConfig implements Serializable {
+    public static final Option<String> SOURCE_CODE =
+            Options.key("source_code")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("source_code to compile");
+
+    public static final Option<CompileLanguage> COMPILE_LANGUAGE =
+            Options.key("compile_language")
+                    .enumType(CompileLanguage.class)
+                    .noDefaultValue()
+                    .withDescription("compile language");
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java
new file mode 100644
index 0000000000..422bb0ff14
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.dynamiccompile;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class DynamicCompileTransformFactory implements TableTransformFactory {
+    @Override
+    public String factoryIdentifier() {
+        return DynamicCompileTransform.PLUGIN_NAME;
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(
+                        DynamicCompileTransformConfig.COMPILE_LANGUAGE,
+                        DynamicCompileTransformConfig.SOURCE_CODE)
+                .build();
+    }
+
+    @Override
+    public TableTransform createTransform(TableTransformFactoryContext 
context) {
+        CatalogTable catalogTable = context.getCatalogTables().get(0);
+        return () -> new DynamicCompileTransform(context.getOptions(), 
catalogTable);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java
new file mode 100644
index 0000000000..906e9c2634
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+import java.io.Serializable;
+
+public abstract class AbstractParse implements Serializable {
+
+    public abstract Class<?> parseClass(String sourceCode);
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java
new file mode 100644
index 0000000000..d94607eb1f
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+public class GroovyClassParse extends AbstractParse {
+
+    @Override
+    public Class<?> parseClass(String sourceCode) {
+        return GroovyClassUtil.parseWithCache(sourceCode);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java
new file mode 100644
index 0000000000..5fab0e8761
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+import groovy.lang.GroovyClassLoader;
+
+public class GroovyClassUtil extends ParseUtil {
+    private static final GroovyClassLoader groovyClassLoader = new 
GroovyClassLoader();
+
+    public static Class<?> parseWithCache(String sourceCode) {
+        return classCache.computeIfAbsent(
+                getClassKey(sourceCode), clazz -> 
groovyClassLoader.parseClass(sourceCode));
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java
new file mode 100644
index 0000000000..3cd5bdd96e
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+public class JavaClassParse extends AbstractParse {
+
+    @Override
+    public Class<?> parseClass(String sourceCode) {
+        return JavaClassUtil.parseWithCache(sourceCode);
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java
new file mode 100644
index 0000000000..344b2708d4
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+import 
org.apache.seatunnel.shade.org.codehaus.commons.compiler.CompileException;
+import org.apache.seatunnel.shade.org.codehaus.janino.ClassBodyEvaluator;
+
+import java.util.function.Function;
+
+public class JavaClassUtil extends ParseUtil {
+
+    public static Class<?> parseWithCache(String sourceCode) {
+
+        return classCache.computeIfAbsent(
+                getClassKey(sourceCode),
+                new Function<String, Class<?>>() {
+                    @Override
+                    public Class<?> apply(String classKey) {
+                        try {
+                            ClassBodyEvaluator cbe = new ClassBodyEvaluator();
+                            cbe.cook(sourceCode);
+                            return cbe.getClazz();
+
+                        } catch (CompileException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+    }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java
new file mode 100644
index 0000000000..c4afd47e25
--- /dev/null
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.transform.dynamiccompile.parse;
+
+import org.apache.commons.codec.digest.DigestUtils;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public abstract class ParseUtil {
+    protected static ConcurrentHashMap<String, Class<?>> classCache = new 
ConcurrentHashMap<>();
+    // Abstraction layer: Do not want to serialize and pass the classloader
+    protected static String getClassKey(String sourceCode) {
+        return new 
String(DigestUtils.getMd5Digest().digest(sourceCode.getBytes()));
+    }
+}
diff --git a/tools/dependencies/known-dependencies.txt 
b/tools/dependencies/known-dependencies.txt
index 7c802e0c23..8532f7cba4 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -42,4 +42,6 @@ json-path-2.7.0.jar
 json-smart-2.4.7.jar
 accessors-smart-2.4.7.jar
 asm-9.1.jar
-avro-1.11.1.jar
\ No newline at end of file
+avro-1.11.1.jar
+groovy-4.0.16.jar
+seatunnel-janino-2.3.6-SNAPSHOT-optional.jar
\ No newline at end of file

Reply via email to