This is an automated email from the ASF dual-hosted git repository.

dailai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 41b5a88d61 [Hotfix][Transforms-v2] DynamicCompile Plugin compatibility 
fix (#8057)
41b5a88d61 is described below

commit 41b5a88d611b58486d1c996127d6ed0b2b78c337
Author: zhangdonghao <39961809+hawk9...@users.noreply.github.com>
AuthorDate: Fri Nov 15 15:32:58 2024 +0800

    [Hotfix][Transforms-v2] DynamicCompile Plugin compatibility fix (#8057)
---
 .../e2e/transform/TestDynamicCompileIT.java        |   9 ++
 ..._dynamic_java_compile_transform_compatible.conf | 115 +++++++++++++++++++++
 .../dynamiccompile/DynamicCompileTransform.java    |  21 +++-
 3 files changed, 143 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
index 2528499fc1..2ad2f32e4b 100644
--- 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java
@@ -116,6 +116,15 @@ public class TestDynamicCompileIT extends TestSuiteBase 
implements TestResource
         Assertions.assertEquals(0, execResult.getExitCode());
     }
 
+    @TestTemplate
+    public void testDynamicSingleCompileJavaOldVersionCompatible(TestContainer 
container)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                container.executeJob(
+                        basePath + 
"single_dynamic_java_compile_transform_compatible.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
     @TestTemplate
     public void testDynamicMultipleCompileGroovy(TestContainer container)
             throws IOException, InterruptedException {
diff --git 
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf
 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf
new file mode 100644
index 0000000000..f26dd21060
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/dynamic_compile/conf/single_dynamic_java_compile_transform_compatible.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+      }
+    }
+  }
+}
+
+transform {
+  DynamicCompile {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    compile_language = "JAVA"
+    compile_pattern = "SOURCE_CODE"
+    source_code = """
+                 import org.apache.seatunnel.api.table.catalog.Column;
+                 import 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
+                 import org.apache.seatunnel.api.table.catalog.*;
+                 import org.apache.seatunnel.api.table.type.*;
+                 import java.util.ArrayList;
+
+
+                     public Column[] getInlineOutputColumns(CatalogTable 
inputCatalogTable) {
+
+                       ArrayList<Column> columns = new ArrayList<Column>();
+                                               PhysicalColumn destColumn =
+                                               PhysicalColumn.of(
+                                               "col1",
+                                              BasicType.STRING_TYPE,
+                                               10,
+                                              true,
+                                              "",
+                                              "");
+                                                 return new Column[]{
+                                                                destColumn
+                                                        };
+
+                     }
+                     public Object[] 
getInlineOutputFieldValues(SeaTunnelRowAccessor inputRow) {
+
+                       Object[] fieldValues = new Object[1];
+                       fieldValues[0]="test1";
+                       return fieldValues;
+                     }
+                """
+
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "fake1"
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MIN_ROW
+            rule_value = 100
+          }
+        ],
+        field_rules = [
+          {
+            field_name = id
+            field_type = int
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              }
+            ]
+          },
+          {
+            field_name = col1
+            field_type = string
+            field_value = [
+              {
+                rule_type = NOT_NULL
+                equals_to = "test1"
+
+              }
+
+            ]
+          }
+        ]
+      }
+  }
+}
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
index bfae2b8d2a..7ff88d85d4 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/dynamiccompile/DynamicCompileTransform.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.transform.dynamiccompile;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowAccessor;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
@@ -30,6 +31,7 @@ import 
org.apache.seatunnel.transform.dynamiccompile.parse.JavaClassParse;
 import org.apache.seatunnel.transform.exception.TransformException;
 
 import java.nio.file.Paths;
+import java.util.Optional;
 
 import static 
org.apache.seatunnel.transform.dynamiccompile.CompileTransformErrorCode.COMPILE_TRANSFORM_ERROR_CODE;
 
@@ -42,6 +44,8 @@ public class DynamicCompileTransform extends 
MultipleFieldOutputTransform {
 
     private final String sourceCode;
 
+    private final boolean compatibilityMode;
+
     private final CompilePattern compilePattern;
 
     private AbstractParse DynamicCompileParse;
@@ -68,6 +72,9 @@ public class DynamicCompileTransform extends 
MultipleFieldOutputTransform {
                                     readonlyConfig.get(
                                             
DynamicCompileTransformConfig.ABSOLUTE_PATH)));
         }
+        compatibilityMode =
+                sourceCode.contains(
+                        
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor.class.getName());
     }
 
     @Override
@@ -98,14 +105,24 @@ public class DynamicCompileTransform extends 
MultipleFieldOutputTransform {
         try {
             result =
                     ReflectionUtils.invoke(
-                            getCompileLanguageInstance(), 
getInlineOutputFieldValues, inputRow);
-
+                            getCompileLanguageInstance(),
+                            getInlineOutputFieldValues,
+                            getCompatibilityAccessor(inputRow));
         } catch (Exception e) {
             throw new TransformException(COMPILE_TRANSFORM_ERROR_CODE, 
e.getMessage());
         }
         return (Object[]) result;
     }
 
+    private Object getCompatibilityAccessor(SeaTunnelRowAccessor inputRow) {
+        if (compatibilityMode) {
+            Optional<Object> field = ReflectionUtils.getField(inputRow, "row");
+            SeaTunnelRow row = (SeaTunnelRow) field.get();
+            return new 
org.apache.seatunnel.transform.common.SeaTunnelRowAccessor(row);
+        }
+        return inputRow;
+    }
+
     private Object getCompileLanguageInstance()
             throws InstantiationException, IllegalAccessException {
         Class<?> compileClass = 
DynamicCompileParse.parseClassSourceCode(sourceCode);

Reply via email to