This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 9df557cb12 [Improve][Transform] Improve DynamicCompile transform (#7264) 9df557cb12 is described below commit 9df557cb12d39831a737658994e2fe2662230e1a Author: lizhenglei <127465317+jackyyyyys...@users.noreply.github.com> AuthorDate: Tue Jul 30 21:37:04 2024 +0800 [Improve][Transform] Improve DynamicCompile transform (#7264) --- docs/en/transform-v2/dynamic-compile.md | 30 ++++++++++++++-- .../e2e/common/container/TestContainer.java | 2 ++ .../flink/AbstractTestFlinkContainer.java | 7 ++++ .../ConnectorPackageServiceContainer.java | 6 ++++ .../container/seatunnel/SeaTunnelContainer.java | 6 ++++ .../spark/AbstractTestSparkContainer.java | 7 ++++ .../e2e/transform/TestDynamicCompileIT.java | 34 +++++++++++++----- ...ixed_dynamic_groovy_java_compile_transform.conf | 2 ++ .../multiple_dynamic_groovy_compile_transform.conf | 2 ++ .../multiple_dynamic_java_compile_transform.conf | 2 ++ .../single_dynamic_groovy_compile_transform.conf | 1 + .../single_dynamic_java_compile_transform.conf | 1 + .../single_groovy_path_compile.conf} | 29 ++------------- .../single_java_path_compile.conf} | 32 ++--------------- .../dynamic_compile/source_file/GroovyFile | 42 ++++++++++++++++++++++ .../resources/dynamic_compile/source_file/JavaFile | 39 ++++++++++++++++++++ .../AbstractParse.java => CompilePattern.java} | 10 +++--- .../dynamiccompile/DynamicCompileTransform.java | 30 +++++++++++++--- .../DynamicCompileTransformConfig.java | 12 +++++++ .../DynamicCompileTransformFactory.java | 8 +++++ .../dynamiccompile/parse/AbstractParse.java | 2 +- .../parse/{ParseUtil.java => AbstractParser.java} | 2 +- .../dynamiccompile/parse/GroovyClassParse.java | 4 +-- ...GroovyClassUtil.java => GroovyClassParser.java} | 4 +-- .../dynamiccompile/parse/JavaClassParse.java | 4 +-- .../{JavaClassUtil.java => JavaClassParser.java} | 27 ++++++++------ 26 files changed, 246 insertions(+), 99 deletions(-) diff --git a/docs/en/transform-v2/dynamic-compile.md b/docs/en/transform-v2/dynamic-compile.md index 5bfbbadbe0..4a772e8cbf 100644 --- a/docs/en/transform-v2/dynamic-compile.md +++ b/docs/en/transform-v2/dynamic-compile.md @@ -11,8 +11,10 @@ If the conversion is too complex, it may affect performance | name | type | required | default value | |------------------|--------|----------|---------------| -| source_code | string | yes | | -| compile_language | string | yes | | +| source_code | string | no | | +| compile_language | Enum | yes | | +| compile_pattern | Enum | no | SOURCE_CODE | +| absolute_path | string | no | | ### source_code [string] @@ -24,11 +26,20 @@ If there are third-party dependency packages, please place them in ${SEATUNNEL_H Transform plugin common parameters, please refer to [Transform Plugin](common-options.md) for details -### compile_language [string] +### compile_language [Enum] Some syntax in Java may not be supported, please refer https://github.com/janino-compiler/janino GROOVY,JAVA +### compile_pattern [Enum] + +SOURCE_CODE,ABSOLUTE_PATH +If it is a SOURCE-CODE enumeration; the SOURCE-CODE attribute is required, and the ABSOLUTE_PATH enumeration;ABSOLUTE_PATH attribute is required + +### absolute_path [string] + +The absolute path of Java or Groovy files on the server + ## Example The data read from source is a table like this: @@ -46,6 +57,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" compile_language="GROOVY" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor @@ -82,6 +94,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" compile_language="JAVA" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; @@ -113,6 +126,17 @@ transform { } } + + transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="GROOVY" + compile_pattern="ABSOLUTE_PATH" + absolute_path="""/tmp/GroovyFile""" + + } +} ``` Then the data in result table `fake1` will like this diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java index 33b196eeba..07fef2c295 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java @@ -55,4 +55,6 @@ public interface TestContainer extends TestResource { } String getServerLogs(); + + void copyFileToContainer(String path, String targetPath); } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java index 7145da6242..ff16c0c754 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.e2e.common.container.flink; import org.apache.seatunnel.e2e.common.container.AbstractTestContainer; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; @@ -168,4 +169,10 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer { throws IOException, InterruptedException { return jobManager.execInContainer("bash", "-c", command).getStdout(); } + + @Override + public void copyFileToContainer(String path, String targetPath) { + ContainerUtil.copyFileIntoContainers( + ContainerUtil.getResourcesFile(path).toPath(), targetPath, jobManager); + } } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java index 4f5ea99029..3a27d78d42 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java @@ -234,4 +234,10 @@ public class ConnectorPackageServiceContainer extends AbstractTestContainer { public String getServerLogs() { return server1.getLogs(); } + + @Override + public void copyFileToContainer(String path, String targetPath) { + ContainerUtil.copyFileIntoContainers( + ContainerUtil.getResourcesFile(path).toPath(), targetPath, server1); + } } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java index 2d9e76ea3b..802b1c32fb 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java @@ -455,4 +455,10 @@ public class SeaTunnelContainer extends AbstractTestContainer { public String getServerLogs() { return server.getLogs(); } + + @Override + public void copyFileToContainer(String path, String targetPath) { + ContainerUtil.copyFileIntoContainers( + ContainerUtil.getResourcesFile(path).toPath(), targetPath, server); + } } diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java index fe07d082af..9970ffb3aa 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.common.container.spark; import org.apache.seatunnel.e2e.common.container.AbstractTestContainer; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.util.ContainerUtil; import org.testcontainers.containers.Container; import org.testcontainers.containers.GenericContainer; @@ -118,4 +119,10 @@ public abstract class AbstractTestSparkContainer extends AbstractTestContainer { public String getServerLogs() { return master.getLogs(); } + + @Override + public void copyFileToContainer(String path, String targetPath) { + ContainerUtil.copyFileIntoContainers( + ContainerUtil.getResourcesFile(path).toPath(), targetPath, master); + } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java index 5c5e69dad2..b57b332353 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java @@ -27,12 +27,13 @@ import java.io.IOException; public class TestDynamicCompileIT extends TestSuiteBase { + private final String basePath = "/dynamic_compile/conf/"; + @TestTemplate public void testDynamicSingleCompileGroovy(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = - container.executeJob( - "/dynamic_compile/single_dynamic_groovy_compile_transform.conf"); + container.executeJob(basePath + "single_dynamic_groovy_compile_transform.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } @@ -40,7 +41,7 @@ public class TestDynamicCompileIT extends TestSuiteBase { public void testDynamicSingleCompileJava(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = - container.executeJob("/dynamic_compile/single_dynamic_java_compile_transform.conf"); + container.executeJob(basePath + "single_dynamic_java_compile_transform.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } @@ -48,8 +49,7 @@ public class TestDynamicCompileIT extends TestSuiteBase { public void testDynamicMultipleCompileGroovy(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = - container.executeJob( - "/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf"); + container.executeJob(basePath + "multiple_dynamic_groovy_compile_transform.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } @@ -57,8 +57,7 @@ public class TestDynamicCompileIT extends TestSuiteBase { public void testDynamicMultipleCompileJava(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = - container.executeJob( - "/dynamic_compile/multiple_dynamic_java_compile_transform.conf"); + container.executeJob(basePath + "multiple_dynamic_java_compile_transform.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } @@ -66,8 +65,25 @@ public class TestDynamicCompileIT extends TestSuiteBase { public void testDynamicMixedCompileJavaAndGroovy(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = - container.executeJob( - "/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf"); + container.executeJob(basePath + "mixed_dynamic_groovy_java_compile_transform.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + public void testDynamicSinglePathGroovy(TestContainer container) + throws IOException, InterruptedException { + container.copyFileToContainer("/dynamic_compile/source_file/GroovyFile", "/tmp/GroovyFile"); + Container.ExecResult execResult = + container.executeJob(basePath + "single_groovy_path_compile.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + public void testDynamicSinglePathJava(TestContainer container) + throws IOException, InterruptedException { + container.copyFileToContainer("/dynamic_compile/source_file/JavaFile", "/tmp/JavaFile"); + Container.ExecResult execResult = + container.executeJob(basePath + "single_java_path_compile.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf similarity index 98% rename from seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf rename to seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf index 5c32e8d5a0..e91765fbf3 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/mixed_dynamic_groovy_java_compile_transform.conf @@ -43,6 +43,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" compile_language="JAVA" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; @@ -80,6 +81,7 @@ transform { source_table_name = "fake1" result_table_name = "fake2" compile_language="GROOVY" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf similarity index 98% rename from seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf rename to seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf index 31756b9941..8689404a17 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_groovy_compile_transform.conf @@ -40,6 +40,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" compile_language="GROOVY" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor @@ -73,6 +74,7 @@ transform { source_table_name = "fake1" result_table_name = "fake2" compile_language="GROOVY" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf similarity index 98% rename from seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf rename to seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf index 94e3a41272..9e59a5e535 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/multiple_dynamic_java_compile_transform.conf @@ -43,6 +43,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" compile_language="JAVA" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; @@ -80,6 +81,7 @@ transform { source_table_name = "fake1" result_table_name = "fake2" compile_language="JAVA" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf similarity index 98% copy from seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf copy to seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf index c478d33ddc..7958b88076 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_groovy_compile_transform.conf @@ -40,6 +40,7 @@ transform { source_table_name = "fake" result_table_name = "fake1" compile_language="GROOVY" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf similarity index 98% copy from seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf copy to seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf index d3a735b630..b65877d465 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform.conf @@ -40,6 +40,7 @@ DynamicCompile { source_table_name = "fake" result_table_name = "fake1" compile_language="JAVA" + compile_pattern="SOURCE_CODE" source_code=""" import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_groovy_path_compile.conf similarity index 58% rename from seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf rename to seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_groovy_path_compile.conf index c478d33ddc..c9b00bdee8 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_groovy_path_compile.conf @@ -40,33 +40,8 @@ transform { source_table_name = "fake" result_table_name = "fake1" compile_language="GROOVY" - source_code=""" - import org.apache.seatunnel.api.table.catalog.Column - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor - import org.apache.seatunnel.api.table.catalog.CatalogTable - import org.apache.seatunnel.api.table.catalog.PhysicalColumn; - import org.apache.seatunnel.api.table.type.*; - import java.util.ArrayList; - class demo { - public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { - List<Column> columns = new ArrayList<>(); - PhysicalColumn destColumn = - PhysicalColumn.of( - "aa", - BasicType.STRING_TYPE, - 10, - true, - "", - ""); - columns.add(destColumn); - return columns.toArray(new Column[0]); - } - public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { - Object[] fieldValues = new Object[1]; - fieldValues[0]="AA" - return fieldValues; - } - };""" + compile_pattern="ABSOLUTE_PATH" + absolute_path="""/tmp/GroovyFile""" } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_java_path_compile.conf similarity index 55% rename from seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf rename to seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_java_path_compile.conf index d3a735b630..3925dbe91e 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_java_path_compile.conf @@ -40,38 +40,10 @@ DynamicCompile { source_table_name = "fake" result_table_name = "fake1" compile_language="JAVA" - source_code=""" - import org.apache.seatunnel.api.table.catalog.Column; - import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; - import org.apache.seatunnel.api.table.catalog.*; - import org.apache.seatunnel.api.table.type.*; - import java.util.ArrayList; + compile_pattern="ABSOLUTE_PATH" + absolute_path="""/tmp/JavaFile""" - public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { - - ArrayList<Column> columns = new ArrayList<Column>(); - PhysicalColumn destColumn = - PhysicalColumn.of( - "col1", - BasicType.STRING_TYPE, - 10, - true, - "", - ""); - return new Column[]{ - destColumn - }; - - } - public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { - - Object[] fieldValues = new Object[1]; - fieldValues[0]="test1"; - return fieldValues; - } - """ - } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile new file mode 100644 index 0000000000..9bb6a8fcdf --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/GroovyFile @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.seatunnel.api.table.catalog.Column +import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor +import org.apache.seatunnel.api.table.catalog.CatalogTable +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.type.*; +import java.util.ArrayList; +class demo { + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + List<Column> columns = new ArrayList<>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "aa", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + columns.add(destColumn); + return columns.toArray(new Column[0]); + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] fieldValues = new Object[1]; + fieldValues[0]="AA" + return fieldValues; + } +}; \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile new file mode 100644 index 0000000000..7d1947c077 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/source_file/JavaFile @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + +import java.util.ArrayList; + + + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + + ArrayList<Column> columns = new ArrayList<Column>(); + PhysicalColumn destColumn = + PhysicalColumn.of("col1", BasicType.STRING_TYPE, 10, true, "", ""); + return new Column[] {destColumn}; + } + + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + + Object[] fieldValues = new Object[1]; + fieldValues[0] = "test1"; + return fieldValues; + } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompilePattern.java similarity index 78% copy from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java copy to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompilePattern.java index 906e9c2634..9b8c83a89d 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompilePattern.java @@ -15,11 +15,9 @@ * limitations under the License. */ -package org.apache.seatunnel.transform.dynamiccompile.parse; +package org.apache.seatunnel.transform.dynamiccompile; -import java.io.Serializable; - -public abstract class AbstractParse implements Serializable { - - public abstract Class<?> parseClass(String sourceCode); +public enum CompilePattern { + SOURCE_CODE, + ABSOLUTE_PATH } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java index d798871401..ea55569420 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.transform.dynamiccompile; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; @@ -28,6 +29,8 @@ import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse; import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse; import org.apache.seatunnel.transform.exception.TransformException; +import java.nio.file.Paths; + import static org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE; public class DynamicCompileTransform extends MultipleFieldOutputTransform { @@ -39,6 +42,8 @@ public class DynamicCompileTransform extends MultipleFieldOutputTransform { private final String sourceCode; + private final CompilePattern compilePattern; + private AbstractParse DynamicCompileParse; public DynamicCompileTransform(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { @@ -51,7 +56,18 @@ public class DynamicCompileTransform extends MultipleFieldOutputTransform { } else if (CompileLanguage.JAVA.equals(compileLanguage)) { DynamicCompileParse = new JavaClassParse(); } - sourceCode = readonlyConfig.get(DynamicCompileTransformConfig.SOURCE_CODE); + compilePattern = readonlyConfig.get(DynamicCompileTransformConfig.COMPILE_PATTERN); + + if (CompilePattern.SOURCE_CODE.equals(compilePattern)) { + sourceCode = readonlyConfig.get(DynamicCompileTransformConfig.SOURCE_CODE); + } else { + // NPE will never happen because it is required in the ABSOLUTE_PATH mode + sourceCode = + FileUtils.readFileToStr( + Paths.get( + readonlyConfig.get( + DynamicCompileTransformConfig.ABSOLUTE_PATH))); + } } @Override @@ -65,7 +81,7 @@ public class DynamicCompileTransform extends MultipleFieldOutputTransform { try { result = ReflectionUtils.invoke( - DynamicCompileParse.parseClass(sourceCode).newInstance(), + getCompileLanguageInstance(), getInlineOutputColumns, inputCatalogTable); @@ -82,13 +98,17 @@ public class DynamicCompileTransform extends MultipleFieldOutputTransform { try { result = ReflectionUtils.invoke( - DynamicCompileParse.parseClass(sourceCode).newInstance(), - getInlineOutputFieldValues, - inputRow); + getCompileLanguageInstance(), getInlineOutputFieldValues, inputRow); } catch (Exception e) { throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, e.getMessage()); } return (Object[]) result; } + + private Object getCompileLanguageInstance() + throws InstantiationException, IllegalAccessException { + Class<?> compileClass = DynamicCompileParse.parseClassSourceCode(sourceCode); + return compileClass.newInstance(); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java index 48a47d0383..f975ba2844 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java @@ -39,4 +39,16 @@ public class DynamicCompileTransformConfig implements Serializable { .enumType(CompileLanguage.class) .noDefaultValue() .withDescription("compile language"); + + public static final Option<String> ABSOLUTE_PATH = + Options.key("absolute_path") + .stringType() + .noDefaultValue() + .withDescription("absolute_path"); + + public static final Option<CompilePattern> COMPILE_PATTERN = + Options.key("compile_pattern") + .enumType(CompilePattern.class) + .defaultValue(CompilePattern.SOURCE_CODE) + .withDescription("compile_pattern"); } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java index 422bb0ff14..195102c4d9 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java @@ -38,7 +38,15 @@ public class DynamicCompileTransformFactory implements TableTransformFactory { return OptionRule.builder() .required( DynamicCompileTransformConfig.COMPILE_LANGUAGE, + DynamicCompileTransformConfig.COMPILE_PATTERN) + .conditional( + DynamicCompileTransformConfig.COMPILE_PATTERN, + CompilePattern.SOURCE_CODE, DynamicCompileTransformConfig.SOURCE_CODE) + .conditional( + DynamicCompileTransformConfig.COMPILE_PATTERN, + CompilePattern.ABSOLUTE_PATH, + DynamicCompileTransformConfig.ABSOLUTE_PATH) .build(); } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java index 906e9c2634..51d94fa166 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java @@ -21,5 +21,5 @@ import java.io.Serializable; public abstract class AbstractParse implements Serializable { - public abstract Class<?> parseClass(String sourceCode); + public abstract Class<?> parseClassSourceCode(String sourceCode); } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParser.java similarity index 97% rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParser.java index c4afd47e25..3d8d58fd82 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParser.java @@ -20,7 +20,7 @@ import org.apache.commons.codec.digest.DigestUtils; import java.util.concurrent.ConcurrentHashMap; -public abstract class ParseUtil { +public abstract class AbstractParser { protected static ConcurrentHashMap<String, Class<?>> classCache = new ConcurrentHashMap<>(); // Abstraction layer: Do not want to serialize and pass the classloader protected static String getClassKey(String sourceCode) { diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java index d94607eb1f..7ae95da628 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.transform.dynamiccompile.parse; public class GroovyClassParse extends AbstractParse { @Override - public Class<?> parseClass(String sourceCode) { - return GroovyClassUtil.parseWithCache(sourceCode); + public Class<?> parseClassSourceCode(String sourceCode) { + return GroovyClassParser.parseSourceCodeWithCache(sourceCode); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParser.java similarity index 89% rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParser.java index 5fab0e8761..c951335e37 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParser.java @@ -18,10 +18,10 @@ package org.apache.seatunnel.transform.dynamiccompile.parse; import groovy.lang.GroovyClassLoader; -public class GroovyClassUtil extends ParseUtil { +public class GroovyClassParser extends AbstractParser { private static final GroovyClassLoader groovyClassLoader = new GroovyClassLoader(); - public static Class<?> parseWithCache(String sourceCode) { + public static Class<?> parseSourceCodeWithCache(String sourceCode) { return classCache.computeIfAbsent( getClassKey(sourceCode), clazz -> groovyClassLoader.parseClass(sourceCode)); } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java index 3cd5bdd96e..9b77963eea 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java @@ -19,7 +19,7 @@ package org.apache.seatunnel.transform.dynamiccompile.parse; public class JavaClassParse extends AbstractParse { @Override - public Class<?> parseClass(String sourceCode) { - return JavaClassUtil.parseWithCache(sourceCode); + public Class<?> parseClassSourceCode(String sourceCode) { + return JavaClassParser.parseSourceCodeWithCache(sourceCode); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParser.java similarity index 72% rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParser.java index 344b2708d4..d9bee066f7 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParser.java @@ -21,24 +21,29 @@ import org.apache.seatunnel.shade.org.codehaus.janino.ClassBodyEvaluator; import java.util.function.Function; -public class JavaClassUtil extends ParseUtil { - - public static Class<?> parseWithCache(String sourceCode) { +public class JavaClassParser extends AbstractParser { + public static Class<?> parseSourceCodeWithCache(String sourceCode) { return classCache.computeIfAbsent( getClassKey(sourceCode), new Function<String, Class<?>>() { @Override public Class<?> apply(String classKey) { - try { - ClassBodyEvaluator cbe = new ClassBodyEvaluator(); - cbe.cook(sourceCode); - return cbe.getClazz(); - - } catch (CompileException e) { - throw new RuntimeException(e); - } + return getInnerClass(sourceCode); } }); } + + private static Class<?> getInnerClass(String FilePathOrSourceCode) { + try { + ClassBodyEvaluator cbe = new ClassBodyEvaluator(); + + cbe.cook(FilePathOrSourceCode); + + return cbe.getClazz(); + + } catch (CompileException e) { + throw new RuntimeException(e); + } + } }