This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 821dfc889c [Feature][Transform] DynamicCompile add transform (#7170) 821dfc889c is described below commit 821dfc889c8a63f01ddfe3fd634384a0a1f12c88 Author: lizhenglei <127465317+jackyyyyys...@users.noreply.github.com> AuthorDate: Sat Jul 20 20:25:50 2024 +0800 [Feature][Transform] DynamicCompile add transform (#7170) --- docs/en/transform-v2/dynamic-compile.md | 128 +++++++++++++++++ pom.xml | 1 + seatunnel-e2e/pom.xml | 8 +- .../e2e/transform/TestDynamicCompileIT.java | 73 ++++++++++ ...ixed_dynamic_groovy_java_compile_transform.conf | 155 ++++++++++++++++++++ .../multiple_dynamic_groovy_compile_transform.conf | 155 ++++++++++++++++++++ .../multiple_dynamic_java_compile_transform.conf | 157 +++++++++++++++++++++ .../single_dynamic_groovy_compile_transform.conf | 110 +++++++++++++++ .../single_dynamic_java_compile_transform.conf | 114 +++++++++++++++ seatunnel-shade/pom.xml | 2 + seatunnel-shade/seatunnel-janino/pom.xml | 103 ++++++++++++++ seatunnel-transforms-v2/pom.xml | 11 ++ .../transform/dynamiccompile/CompileLanguage.java | 23 +++ .../dynamiccompile/CompileTransformErrorCode.java | 42 ++++++ .../dynamiccompile/DynamicCompileTransform.java | 94 ++++++++++++ .../DynamicCompileTransformConfig.java | 42 ++++++ .../DynamicCompileTransformFactory.java | 50 +++++++ .../dynamiccompile/parse/AbstractParse.java | 25 ++++ .../dynamiccompile/parse/GroovyClassParse.java | 26 ++++ .../dynamiccompile/parse/GroovyClassUtil.java | 28 ++++ .../dynamiccompile/parse/JavaClassParse.java | 25 ++++ .../dynamiccompile/parse/JavaClassUtil.java | 44 ++++++ .../transform/dynamiccompile/parse/ParseUtil.java | 29 ++++ tools/dependencies/known-dependencies.txt | 4 +- 24 files changed, 1447 insertions(+), 2 deletions(-) diff --git a/docs/en/transform-v2/dynamic-compile.md b/docs/en/transform-v2/dynamic-compile.md new file mode 100644 index 0000000000..5bfbbadbe0 --- /dev/null +++ b/docs/en/transform-v2/dynamic-compile.md @@ -0,0 +1,128 @@ +# DynamicCompile + +> DynamicCompile transform plugin + +## Description + +Provide a programmable way to process rows, allowing users to customize any business behavior, even RPC requests based on existing row fields as parameters, or to expand fields by retrieving associated data from other data sources. To distinguish businesses, you can also define multiple transforms to combine, +If the conversion is too complex, it may affect performance + +## Options + +| name | type | required | default value | +|------------------|--------|----------|---------------| +| source_code | string | yes | | +| compile_language | string | yes | | + +### source_code [string] + +The code must implement two methods: getInlineOutputColumns and getInlineOutputFieldValues. getInlineOutputColumns determines the columns you want to add or convert, and the original column structure can be obtained from CatalogTable +GetInlineOutputFieldValues determines your column values. You can fulfill any of your requirements, and even complete RPC requests to obtain new values based on the original columns +If there are third-party dependency packages, please place them in ${SEATUNNEL_HOME}/lib, if you use spark or flink, you need to put it under the libs of the corresponding service. + +### common options [string] + +Transform plugin common parameters, please refer to [Transform Plugin](common-options.md) for details + +### compile_language [string] + +Some syntax in Java may not be supported, please refer https://github.com/janino-compiler/janino +GROOVY,JAVA + +## Example + +The data read from source is a table like this: + +| name | age | card | +|----------|-----|------| +| Joy Ding | 20 | 123 | +| May Ding | 20 | 123 | +| Kin Dom | 20 | 123 | +| Joy Dom | 20 | 123 | + +``` +transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="GROOVY" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.catalog.CatalogTable + import org.apache.seatunnel.api.table.catalog.PhysicalColumn; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + class demo { + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + List<Column> columns = new ArrayList<>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "aa", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + columns.add(destColumn); + return columns.toArray(new Column[0]); + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] fieldValues = new Object[1]; + fieldValues[0]="AA" + return fieldValues; + } + };""" + + } +} + +transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="JAVA" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column; + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.catalog.*; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + + ArrayList<Column> columns = new ArrayList<Column>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "aa", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + return new Column[]{ + destColumn + }; + + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] fieldValues = new Object[1]; + fieldValues[0]="AA"; + return fieldValues; + } + """ + + } + } +``` + +Then the data in result table `fake1` will like this + +| name | age | card | aa | +|----------|-----|------|----| +| Joy Ding | 20 | 123 | AA | +| May Ding | 20 | 123 | AA | +| Kin Dom | 20 | 123 | AA | +| Joy Dom | 20 | 123 | AA | + +## Changelog + diff --git a/pom.xml b/pom.xml index 50c0d41200..41854d78fc 100644 --- a/pom.xml +++ b/pom.xml @@ -133,6 +133,7 @@ <spotless.version>2.29.0</spotless.version> <jsqlparser.version>4.5</jsqlparser.version> <json-path.version>2.7.0</json-path.version> + <groovy.version>4.0.16</groovy.version> <!-- Option args --> <skipUT>false</skipUT> <skipIT>true</skipIT> diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml index ff6ad8bea4..661892e54d 100644 --- a/seatunnel-e2e/pom.xml +++ b/seatunnel-e2e/pom.xml @@ -36,7 +36,7 @@ <properties> <maven-jar-plugin.version>2.4</maven-jar-plugin.version> - <rest-assured.version>4.3.1</rest-assured.version> + <rest-assured.version>5.4.0</rest-assured.version> </properties> <dependencies> <dependency> @@ -60,6 +60,12 @@ <artifactId>rest-assured</artifactId> <version>${rest-assured.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.codehaus.groovy</groupId> + <artifactId>groovy</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>io.rest-assured</groupId> diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java new file mode 100644 index 0000000000..5c5e69dad2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.transform; + +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +public class TestDynamicCompileIT extends TestSuiteBase { + + @TestTemplate + public void testDynamicSingleCompileGroovy(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob( + "/dynamic_compile/single_dynamic_groovy_compile_transform.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + public void testDynamicSingleCompileJava(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/dynamic_compile/single_dynamic_java_compile_transform.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + public void testDynamicMultipleCompileGroovy(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob( + "/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + public void testDynamicMultipleCompileJava(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob( + "/dynamic_compile/multiple_dynamic_java_compile_transform.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @TestTemplate + public void testDynamicMixedCompileJavaAndGroovy(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob( + "/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf new file mode 100644 index 0000000000..5c32e8d5a0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/mixed_dynamic_groovy_java_compile_transform.conf @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + row.num = 100 + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="JAVA" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column; + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.catalog.*; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + + + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + + ArrayList<Column> columns = new ArrayList<Column>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "col1", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + return new Column[]{ + destColumn + }; + + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + + Object[] fieldValues = new Object[1]; + fieldValues[0]="test1"; + return fieldValues; + } + """ + + } + DynamicCompile { + source_table_name = "fake1" + result_table_name = "fake2" + compile_language="GROOVY" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.catalog.CatalogTable + import org.apache.seatunnel.api.table.catalog.PhysicalColumn; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + class demo { + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + List<Column> columns = new ArrayList<>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "col2", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + columns.add(destColumn); + return columns.toArray(new Column[0]); + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] fieldValues = new Object[1]; + fieldValues[0]="test2" + return fieldValues; + } + };""" + + } + +} + + +sink { + Assert { + source_table_name = "fake2" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = col1 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "test1" + + } + ] + }, + { + field_name = col2 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "test2" + + } + + ] + } + ] + } + } + +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf new file mode 100644 index 0000000000..31756b9941 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_groovy_compile_transform.conf @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + } + } + } +} + +transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="GROOVY" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.catalog.CatalogTable + import org.apache.seatunnel.api.table.catalog.PhysicalColumn; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + class demo { + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + List<Column> columns = new ArrayList<>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "aa", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + columns.add(destColumn); + return columns.toArray(new Column[0]); + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] fieldValues = new Object[1]; + fieldValues[0]="AA" + return fieldValues; + } + };""" + + } + DynamicCompile { + source_table_name = "fake1" + result_table_name = "fake2" + compile_language="GROOVY" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.catalog.CatalogTable + import org.apache.seatunnel.api.table.catalog.PhysicalColumn; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + class demo { + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + List<Column> columns = new ArrayList<>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "bb", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + columns.add(destColumn); + return columns.toArray(new Column[0]); + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] fieldValues = new Object[1]; + fieldValues[0]="BB" + return fieldValues; + } + };""" + + } +} + +sink { + Assert { + source_table_name = "fake2" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = bb + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "BB" + + } + + ] + } + { + field_name = aa + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "AA" + + } + + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf new file mode 100644 index 0000000000..94e3a41272 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/multiple_dynamic_java_compile_transform.conf @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + row.num = 100 + parallelism = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="JAVA" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column; + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.catalog.*; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + + + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + + ArrayList<Column> columns = new ArrayList<Column>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "col1", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + return new Column[]{ + destColumn + }; + + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + + Object[] fieldValues = new Object[1]; + fieldValues[0]="test1"; + return fieldValues; + } + """ + + } + DynamicCompile { + source_table_name = "fake1" + result_table_name = "fake2" + compile_language="JAVA" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column; + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.catalog.*; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + + ArrayList<Column> columns = new ArrayList<Column>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "col2", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + return new Column[]{ + destColumn + }; + + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + + Object[] fieldValues = new Object[1]; + fieldValues[0]="test2"; + return fieldValues; + } + """ + + } + +} + + +sink { + Assert { + source_table_name = "fake2" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = col1 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "test1" + + } + ] + }, + { + field_name = col2 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "test2" + + } + + ] + } + ] + } + } + +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf new file mode 100644 index 0000000000..c478d33ddc --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_groovy_compile_transform.conf @@ -0,0 +1,110 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + } + } + } +} + +transform { + DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="GROOVY" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor + import org.apache.seatunnel.api.table.catalog.CatalogTable + import org.apache.seatunnel.api.table.catalog.PhysicalColumn; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + class demo { + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + List<Column> columns = new ArrayList<>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "aa", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + columns.add(destColumn); + return columns.toArray(new Column[0]); + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object[] fieldValues = new Object[1]; + fieldValues[0]="AA" + return fieldValues; + } + };""" + + } +} + +sink { + Assert { + source_table_name = "fake1" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = aa + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "AA" + + } + + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf new file mode 100644 index 0000000000..d3a735b630 --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/single_dynamic_java_compile_transform.conf @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + } + } + } +} + +transform { +DynamicCompile { + source_table_name = "fake" + result_table_name = "fake1" + compile_language="JAVA" + source_code=""" + import org.apache.seatunnel.api.table.catalog.Column; + import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; + import org.apache.seatunnel.api.table.catalog.*; + import org.apache.seatunnel.api.table.type.*; + import java.util.ArrayList; + + + public Column[] getInlineOutputColumns(CatalogTable inputCatalogTable) { + + ArrayList<Column> columns = new ArrayList<Column>(); + PhysicalColumn destColumn = + PhysicalColumn.of( + "col1", + BasicType.STRING_TYPE, + 10, + true, + "", + ""); + return new Column[]{ + destColumn + }; + + } + public Object[] getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) { + + Object[] fieldValues = new Object[1]; + fieldValues[0]="test1"; + return fieldValues; + } + """ + + } +} + +sink { + Assert { + source_table_name = "fake1" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ], + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = col1 + field_type = string + field_value = [ + { + rule_type = NOT_NULL + equals_to = "test1" + + } + + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-shade/pom.xml b/seatunnel-shade/pom.xml index d8c5ff0003..cc761e6640 100644 --- a/seatunnel-shade/pom.xml +++ b/seatunnel-shade/pom.xml @@ -33,6 +33,8 @@ <module>seatunnel-arrow-5.0</module> <module>seatunnel-thrift-service</module> <module>seatunnel-hazelcast</module> + <module>seatunnel-janino</module> + </modules> <build> diff --git a/seatunnel-shade/seatunnel-janino/pom.xml b/seatunnel-shade/seatunnel-janino/pom.xml new file mode 100644 index 0000000000..a661a49845 --- /dev/null +++ b/seatunnel-shade/seatunnel-janino/pom.xml @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-shade</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>seatunnel-janino</artifactId> + <name>SeaTunnel : Shade : Janino</name> + <properties> + <janino.verion>3.0.11</janino.verion> + </properties> + + <dependencies> + <dependency> + <groupId>org.codehaus.janino</groupId> + <artifactId>janino</artifactId> + <version>${janino.verion}</version> + <optional>true</optional> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>shade</goal> + </goals> + <phase>package</phase> + <configuration> + <finalName>seatunnel-janino</finalName> + <createSourcesJar>true</createSourcesJar> + <shadeSourcesContent>true</shadeSourcesContent> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>false</createDependencyReducedPom> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>org.codehaus</pattern> + <shadedPattern>${seatunnel.shade.package}.org.codehaus</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>attach-artifacts</id> + <goals> + <goal>attach-artifact</goal> + </goals> + <phase>package</phase> + <configuration> + <artifacts> + <artifact> + <file>${basedir}/target/seatunnel-janino.jar</file> + <type>jar</type> + <classifier>optional</classifier> + </artifact> + </artifacts> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/seatunnel-transforms-v2/pom.xml b/seatunnel-transforms-v2/pom.xml index 4dfaebb76f..ae8909f463 100644 --- a/seatunnel-transforms-v2/pom.xml +++ b/seatunnel-transforms-v2/pom.xml @@ -66,6 +66,17 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.groovy</groupId> + <artifactId>groovy</artifactId> + <version>${groovy.version}</version> + </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-janino</artifactId> + <version>${project.version}</version> + <classifier>optional</classifier> + </dependency> </dependencies> <build> diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java new file mode 100644 index 0000000000..be0e468e4d --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileLanguage.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.dynamiccompile; + +public enum CompileLanguage { + GROOVY, + JAVA +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileTransformErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileTransformErrorCode.java new file mode 100644 index 0000000000..69ff8f0d76 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/CompileTransformErrorCode.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.transform.dynamiccompile; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum CompileTransformErrorCode implements SeaTunnelErrorCode { + COMPILE_TRANSFORM_ERROR_CODE( + "COMPILE_TRANSFORM_ERROR_CODE-01", "CompileTransform error please check code"); + + private final String code; + private final String description; + + CompileTransformErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java new file mode 100644 index 0000000000..d798871401 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.dynamiccompile; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.common.utils.ReflectionUtils; +import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; +import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.transform.dynamiccompile.parse.AbstractParse; +import org.apache.seatunnel.transform.dynamiccompile.parse.GroovyClassParse; +import org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse; +import org.apache.seatunnel.transform.exception.TransformException; + +import static org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE; + +public class DynamicCompileTransform extends MultipleFieldOutputTransform { + public static final String PLUGIN_NAME = "DynamicCompile"; + + public static final String getInlineOutputColumns = "getInlineOutputColumns"; + + public static final String getInlineOutputFieldValues = "getInlineOutputFieldValues"; + + private final String sourceCode; + + private AbstractParse DynamicCompileParse; + + public DynamicCompileTransform(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) { + super(catalogTable); + CompileLanguage compileLanguage = + readonlyConfig.get(DynamicCompileTransformConfig.COMPILE_LANGUAGE); + // todo other compile + if (CompileLanguage.GROOVY.equals(compileLanguage)) { + DynamicCompileParse = new GroovyClassParse(); + } else if (CompileLanguage.JAVA.equals(compileLanguage)) { + DynamicCompileParse = new JavaClassParse(); + } + sourceCode = readonlyConfig.get(DynamicCompileTransformConfig.SOURCE_CODE); + } + + @Override + public String getPluginName() { + return PLUGIN_NAME; + } + + @Override + protected Column[] getOutputColumns() { + Object result; + try { + result = + ReflectionUtils.invoke( + DynamicCompileParse.parseClass(sourceCode).newInstance(), + getInlineOutputColumns, + inputCatalogTable); + + } catch (Exception e) { + throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, e.getMessage()); + } + + return (Column[]) result; + } + + @Override + protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) { + Object result; + try { + result = + ReflectionUtils.invoke( + DynamicCompileParse.parseClass(sourceCode).newInstance(), + getInlineOutputFieldValues, + inputRow); + + } catch (Exception e) { + throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, e.getMessage()); + } + return (Object[]) result; + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java new file mode 100644 index 0000000000..48a47d0383 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformConfig.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.dynamiccompile; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; + +@Getter +@Setter +public class DynamicCompileTransformConfig implements Serializable { + public static final Option<String> SOURCE_CODE = + Options.key("source_code") + .stringType() + .noDefaultValue() + .withDescription("source_code to compile"); + + public static final Option<CompileLanguage> COMPILE_LANGUAGE = + Options.key("compile_language") + .enumType(CompileLanguage.class) + .noDefaultValue() + .withDescription("compile language"); +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java new file mode 100644 index 0000000000..422bb0ff14 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransformFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.dynamiccompile; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableTransform; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableTransformFactory; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class DynamicCompileTransformFactory implements TableTransformFactory { + @Override + public String factoryIdentifier() { + return DynamicCompileTransform.PLUGIN_NAME; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required( + DynamicCompileTransformConfig.COMPILE_LANGUAGE, + DynamicCompileTransformConfig.SOURCE_CODE) + .build(); + } + + @Override + public TableTransform createTransform(TableTransformFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTables().get(0); + return () -> new DynamicCompileTransform(context.getOptions(), catalogTable); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java new file mode 100644 index 0000000000..906e9c2634 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/AbstractParse.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.dynamiccompile.parse; + +import java.io.Serializable; + +public abstract class AbstractParse implements Serializable { + + public abstract Class<?> parseClass(String sourceCode); +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java new file mode 100644 index 0000000000..d94607eb1f --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassParse.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.dynamiccompile.parse; + +public class GroovyClassParse extends AbstractParse { + + @Override + public Class<?> parseClass(String sourceCode) { + return GroovyClassUtil.parseWithCache(sourceCode); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java new file mode 100644 index 0000000000..5fab0e8761 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/GroovyClassUtil.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.transform.dynamiccompile.parse; + +import groovy.lang.GroovyClassLoader; + +public class GroovyClassUtil extends ParseUtil { + private static final GroovyClassLoader groovyClassLoader = new GroovyClassLoader(); + + public static Class<?> parseWithCache(String sourceCode) { + return classCache.computeIfAbsent( + getClassKey(sourceCode), clazz -> groovyClassLoader.parseClass(sourceCode)); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java new file mode 100644 index 0000000000..3cd5bdd96e --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassParse.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.transform.dynamiccompile.parse; + +public class JavaClassParse extends AbstractParse { + + @Override + public Class<?> parseClass(String sourceCode) { + return JavaClassUtil.parseWithCache(sourceCode); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java new file mode 100644 index 0000000000..344b2708d4 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/JavaClassUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.transform.dynamiccompile.parse; + +import org.apache.seatunnel.shade.org.codehaus.commons.compiler.CompileException; +import org.apache.seatunnel.shade.org.codehaus.janino.ClassBodyEvaluator; + +import java.util.function.Function; + +public class JavaClassUtil extends ParseUtil { + + public static Class<?> parseWithCache(String sourceCode) { + + return classCache.computeIfAbsent( + getClassKey(sourceCode), + new Function<String, Class<?>>() { + @Override + public Class<?> apply(String classKey) { + try { + ClassBodyEvaluator cbe = new ClassBodyEvaluator(); + cbe.cook(sourceCode); + return cbe.getClazz(); + + } catch (CompileException e) { + throw new RuntimeException(e); + } + } + }); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java new file mode 100644 index 0000000000..c4afd47e25 --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/parse/ParseUtil.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.transform.dynamiccompile.parse; + +import org.apache.commons.codec.digest.DigestUtils; + +import java.util.concurrent.ConcurrentHashMap; + +public abstract class ParseUtil { + protected static ConcurrentHashMap<String, Class<?>> classCache = new ConcurrentHashMap<>(); + // Abstraction layer: Do not want to serialize and pass the classloader + protected static String getClassKey(String sourceCode) { + return new String(DigestUtils.getMd5Digest().digest(sourceCode.getBytes())); + } +} diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt index 7c802e0c23..8532f7cba4 100755 --- a/tools/dependencies/known-dependencies.txt +++ b/tools/dependencies/known-dependencies.txt @@ -42,4 +42,6 @@ json-path-2.7.0.jar json-smart-2.4.7.jar accessors-smart-2.4.7.jar asm-9.1.jar -avro-1.11.1.jar \ No newline at end of file +avro-1.11.1.jar +groovy-4.0.16.jar +seatunnel-janino-2.3.6-SNAPSHOT-optional.jar \ No newline at end of file