tsreaper commented on a change in pull request #17672:
URL: https://github.com/apache/flink/pull/17672#discussion_r743594661



##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -18,12 +18,19 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql
 
+import java.io.File
+import java.net.URL
 import java.util
 
+import org.apache.flink.client.ClientUtils
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, ObjectPath}
 import org.apache.flink.table.planner.factories.{TestValuesCatalog, 
TestValuesTableFactory}
+import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.TemporaryClassLoaderContext
 import org.junit.{Before, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized

Review comment:
       Clean up import order, although they're currently not checked for scala 
files.
   
   ```suggestion
   import org.apache.flink.client.ClientUtils
   import org.apache.flink.configuration.Configuration
   import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, ObjectPath}
   import org.apache.flink.table.planner.factories.{TestValuesCatalog, 
TestValuesTableFactory}
   import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
   import org.apache.flink.table.planner.runtime.utils.BatchTestBase
   import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
   import org.apache.flink.table.utils.TestUserClassLoaderJar
   import org.apache.flink.util.TemporaryClassLoaderContext
   
   import org.junit.{Before, Test}
   import org.junit.runner.RunWith
   import org.junit.runners.Parameterized
   
   import java.io.File
   import java.net.URL
   import java.util
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
       )
     )
   }
+
+  @Test
+  def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+        |public class TrimUDF extends 
org.apache.flink.table.functions.ScalarFunction {
+        |   public String eval(String str) {
+        |     return str.trim();
+        |   }
+        |}
+        |""".stripMargin
+    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+      tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+    val jars: util.List[URL] = new util.ArrayList[URL]()
+    jars.add(udfJarFile.toURI.toURL)
+    val cl = ClientUtils.buildUserCodeClassLoader(jars, 
java.util.Collections.emptyList(),
+      getClass.getClassLoader, new Configuration());
+    val ctx = TemporaryClassLoaderContext.of(cl)
+    tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");
+    checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 > 
1",
+      Seq(
+        row(3, "Jack", "A", 2, 3)
+      )
+    )
+    ctx.close()

Review comment:
       Wrap this with `try... finally...` and close class loader context in 
`finally`. If the test fails the class loader will be changed and might cause 
other bugs in other tests.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
##########
@@ -34,12 +34,13 @@
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
-/** Mainly used for testing classloading of UDF dependencies. */
+/** Mainly used for testing classloading. */
 public class TestUserClassLoaderJar {
 
-    private static final String GENERATED_UDF_CLASS = "LowerUDF";
+    /** Legacy code. */
+    public static final String GENERATED_UDF_CLASS = "LowerUDF";
 
-    private static final String GENERATED_UDF_CODE =
+    public static final String GENERATED_UDF_CODE =

Review comment:
       Move these back to `flink-sql-client`. They are used only by tests in 
SQL client.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
       )
     )
   }
+
+  @Test
+  def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+        |public class TrimUDF extends 
org.apache.flink.table.functions.ScalarFunction {
+        |   public String eval(String str) {
+        |     return str.trim();
+        |   }
+        |}
+        |""".stripMargin
+    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+      tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+    val jars: util.List[URL] = new util.ArrayList[URL]()
+    jars.add(udfJarFile.toURI.toURL)
+    val cl = ClientUtils.buildUserCodeClassLoader(jars, 
java.util.Collections.emptyList(),
+      getClass.getClassLoader, new Configuration());
+    val ctx = TemporaryClassLoaderContext.of(cl)
+    tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");

Review comment:
       No semicolon in scala code.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
       )
     )
   }
+
+  @Test
+  def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+        |public class TrimUDF extends 
org.apache.flink.table.functions.ScalarFunction {
+        |   public String eval(String str) {
+        |     return str.trim();
+        |   }
+        |}
+        |""".stripMargin
+    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+      tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+    val jars: util.List[URL] = new util.ArrayList[URL]()
+    jars.add(udfJarFile.toURI.toURL)
+    val cl = ClientUtils.buildUserCodeClassLoader(jars, 
java.util.Collections.emptyList(),

Review comment:
       nit: For lists with single constant element use 
`java.util.Collections.singletonList`.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -18,12 +18,19 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql
 
+import java.io.File
+import java.net.URL
 import java.util
 
+import org.apache.flink.client.ClientUtils
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, ObjectPath}
 import org.apache.flink.table.planner.factories.{TestValuesCatalog, 
TestValuesTableFactory}
+import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.utils.TestUserClassLoaderJar
+import org.apache.flink.util.TemporaryClassLoaderContext
 import org.junit.{Before, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized

Review comment:
       Clean up import order, although they're currently not checked for scala 
files.
   
   ```suggestion
   import org.apache.flink.client.ClientUtils
   import org.apache.flink.configuration.Configuration
   import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, ObjectPath}
   import org.apache.flink.table.planner.factories.{TestValuesCatalog, 
TestValuesTableFactory}
   import 
org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
   import org.apache.flink.table.planner.runtime.utils.BatchTestBase
   import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
   import org.apache.flink.table.utils.TestUserClassLoaderJar
   import org.apache.flink.util.TemporaryClassLoaderContext
   
   import org.junit.{Before, Test}
   import org.junit.runner.RunWith
   import org.junit.runners.Parameterized
   
   import java.io.File
   import java.net.URL
   import java.util
   ```

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
       )
     )
   }
+
+  @Test
+  def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+        |public class TrimUDF extends 
org.apache.flink.table.functions.ScalarFunction {
+        |   public String eval(String str) {
+        |     return str.trim();
+        |   }
+        |}
+        |""".stripMargin
+    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+      tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+    val jars: util.List[URL] = new util.ArrayList[URL]()
+    jars.add(udfJarFile.toURI.toURL)
+    val cl = ClientUtils.buildUserCodeClassLoader(jars, 
java.util.Collections.emptyList(),
+      getClass.getClassLoader, new Configuration());
+    val ctx = TemporaryClassLoaderContext.of(cl)
+    tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");
+    checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 > 
1",
+      Seq(
+        row(3, "Jack", "A", 2, 3)
+      )
+    )
+    ctx.close()

Review comment:
       Wrap this with `try... finally...` and close class loader context in 
`finally`. If the test fails the class loader will be changed and might cause 
other bugs in other tests.

##########
File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java
##########
@@ -34,12 +34,13 @@
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
-/** Mainly used for testing classloading of UDF dependencies. */
+/** Mainly used for testing classloading. */
 public class TestUserClassLoaderJar {
 
-    private static final String GENERATED_UDF_CLASS = "LowerUDF";
+    /** Legacy code. */
+    public static final String GENERATED_UDF_CLASS = "LowerUDF";
 
-    private static final String GENERATED_UDF_CODE =
+    public static final String GENERATED_UDF_CODE =

Review comment:
       Move these back to `flink-sql-client`. They are used only by tests in 
SQL client.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
       )
     )
   }
+
+  @Test
+  def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+        |public class TrimUDF extends 
org.apache.flink.table.functions.ScalarFunction {
+        |   public String eval(String str) {
+        |     return str.trim();
+        |   }
+        |}
+        |""".stripMargin
+    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+      tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+    val jars: util.List[URL] = new util.ArrayList[URL]()
+    jars.add(udfJarFile.toURI.toURL)
+    val cl = ClientUtils.buildUserCodeClassLoader(jars, 
java.util.Collections.emptyList(),
+      getClass.getClassLoader, new Configuration());
+    val ctx = TemporaryClassLoaderContext.of(cl)
+    tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'");

Review comment:
       No semicolon in scala code.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
##########
@@ -120,6 +127,33 @@ class PartitionableSourceITCase(
       )
     )
   }
+
+  @Test
+  def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+        |public class TrimUDF extends 
org.apache.flink.table.functions.ScalarFunction {
+        |   public String eval(String str) {
+        |     return str.trim();
+        |   }
+        |}
+        |""".stripMargin
+    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile: File = TestUserClassLoaderJar.createJarFile(
+      tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode)
+    val jars: util.List[URL] = new util.ArrayList[URL]()
+    jars.add(udfJarFile.toURI.toURL)
+    val cl = ClientUtils.buildUserCodeClassLoader(jars, 
java.util.Collections.emptyList(),

Review comment:
       nit: For lists with single constant element use 
`java.util.Collections.singletonList`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to