tsreaper commented on a change in pull request #17672: URL: https://github.com/apache/flink/pull/17672#discussion_r743594661
########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -18,12 +18,19 @@ package org.apache.flink.table.planner.runtime.batch.sql +import java.io.File +import java.net.URL import java.util +import org.apache.flink.client.ClientUtils +import org.apache.flink.configuration.Configuration import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath} import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory} +import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row +import org.apache.flink.table.utils.TestUserClassLoaderJar +import org.apache.flink.util.TemporaryClassLoaderContext import org.junit.{Before, Test} import org.junit.runner.RunWith import org.junit.runners.Parameterized Review comment: Clean up import order, although they're currently not checked for scala files. ```suggestion import org.apache.flink.client.ClientUtils import org.apache.flink.configuration.Configuration import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath} import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory} import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.utils.TestUserClassLoaderJar import org.apache.flink.util.TemporaryClassLoaderContext import org.junit.{Before, Test} import org.junit.runner.RunWith import org.junit.runners.Parameterized import java.io.File import java.net.URL import java.util ``` ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -120,6 +127,33 @@ class PartitionableSourceITCase( ) ) } + + @Test + def testPartitionPrunerCompileClassLoader(): Unit = { + val udfJavaCode = + s""" + |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction { + | public String eval(String str) { + | return str.trim(); + | } + |} + |""".stripMargin + val tmpDir: File = TEMPORARY_FOLDER.newFolder() + val udfJarFile: File = TestUserClassLoaderJar.createJarFile( + tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode) + val jars: util.List[URL] = new util.ArrayList[URL]() + jars.add(udfJarFile.toURI.toURL) + val cl = ClientUtils.buildUserCodeClassLoader(jars, java.util.Collections.emptyList(), + getClass.getClassLoader, new Configuration()); + val ctx = TemporaryClassLoaderContext.of(cl) + tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'"); + checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 > 1", + Seq( + row(3, "Jack", "A", 2, 3) + ) + ) + ctx.close() Review comment: Wrap this with `try... finally...` and close class loader context in `finally`. If the test fails the class loader will be changed and might cause other bugs in other tests. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java ########## @@ -34,12 +34,13 @@ import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; -/** Mainly used for testing classloading of UDF dependencies. */ +/** Mainly used for testing classloading. */ public class TestUserClassLoaderJar { - private static final String GENERATED_UDF_CLASS = "LowerUDF"; + /** Legacy code. */ + public static final String GENERATED_UDF_CLASS = "LowerUDF"; - private static final String GENERATED_UDF_CODE = + public static final String GENERATED_UDF_CODE = Review comment: Move these back to `flink-sql-client`. They are used only by tests in SQL client. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -120,6 +127,33 @@ class PartitionableSourceITCase( ) ) } + + @Test + def testPartitionPrunerCompileClassLoader(): Unit = { + val udfJavaCode = + s""" + |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction { + | public String eval(String str) { + | return str.trim(); + | } + |} + |""".stripMargin + val tmpDir: File = TEMPORARY_FOLDER.newFolder() + val udfJarFile: File = TestUserClassLoaderJar.createJarFile( + tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode) + val jars: util.List[URL] = new util.ArrayList[URL]() + jars.add(udfJarFile.toURI.toURL) + val cl = ClientUtils.buildUserCodeClassLoader(jars, java.util.Collections.emptyList(), + getClass.getClassLoader, new Configuration()); + val ctx = TemporaryClassLoaderContext.of(cl) + tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'"); Review comment: No semicolon in scala code. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -120,6 +127,33 @@ class PartitionableSourceITCase( ) ) } + + @Test + def testPartitionPrunerCompileClassLoader(): Unit = { + val udfJavaCode = + s""" + |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction { + | public String eval(String str) { + | return str.trim(); + | } + |} + |""".stripMargin + val tmpDir: File = TEMPORARY_FOLDER.newFolder() + val udfJarFile: File = TestUserClassLoaderJar.createJarFile( + tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode) + val jars: util.List[URL] = new util.ArrayList[URL]() + jars.add(udfJarFile.toURI.toURL) + val cl = ClientUtils.buildUserCodeClassLoader(jars, java.util.Collections.emptyList(), Review comment: nit: For lists with single constant element use `java.util.Collections.singletonList`. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -18,12 +18,19 @@ package org.apache.flink.table.planner.runtime.batch.sql +import java.io.File +import java.net.URL import java.util +import org.apache.flink.client.ClientUtils +import org.apache.flink.configuration.Configuration import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath} import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory} +import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row +import org.apache.flink.table.utils.TestUserClassLoaderJar +import org.apache.flink.util.TemporaryClassLoaderContext import org.junit.{Before, Test} import org.junit.runner.RunWith import org.junit.runners.Parameterized Review comment: Clean up import order, although they're currently not checked for scala files. ```suggestion import org.apache.flink.client.ClientUtils import org.apache.flink.configuration.Configuration import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath} import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory} import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.utils.TestUserClassLoaderJar import org.apache.flink.util.TemporaryClassLoaderContext import org.junit.{Before, Test} import org.junit.runner.RunWith import org.junit.runners.Parameterized import java.io.File import java.net.URL import java.util ``` ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -120,6 +127,33 @@ class PartitionableSourceITCase( ) ) } + + @Test + def testPartitionPrunerCompileClassLoader(): Unit = { + val udfJavaCode = + s""" + |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction { + | public String eval(String str) { + | return str.trim(); + | } + |} + |""".stripMargin + val tmpDir: File = TEMPORARY_FOLDER.newFolder() + val udfJarFile: File = TestUserClassLoaderJar.createJarFile( + tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode) + val jars: util.List[URL] = new util.ArrayList[URL]() + jars.add(udfJarFile.toURI.toURL) + val cl = ClientUtils.buildUserCodeClassLoader(jars, java.util.Collections.emptyList(), + getClass.getClassLoader, new Configuration()); + val ctx = TemporaryClassLoaderContext.of(cl) + tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'"); + checkResult("select * from MyTable where trimUDF(part1) = 'A' and part2 > 1", + Seq( + row(3, "Jack", "A", 2, 3) + ) + ) + ctx.close() Review comment: Wrap this with `try... finally...` and close class loader context in `finally`. If the test fails the class loader will be changed and might cause other bugs in other tests. ########## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/utils/TestUserClassLoaderJar.java ########## @@ -34,12 +34,13 @@ import java.util.jar.JarEntry; import java.util.jar.JarOutputStream; -/** Mainly used for testing classloading of UDF dependencies. */ +/** Mainly used for testing classloading. */ public class TestUserClassLoaderJar { - private static final String GENERATED_UDF_CLASS = "LowerUDF"; + /** Legacy code. */ + public static final String GENERATED_UDF_CLASS = "LowerUDF"; - private static final String GENERATED_UDF_CODE = + public static final String GENERATED_UDF_CODE = Review comment: Move these back to `flink-sql-client`. They are used only by tests in SQL client. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -120,6 +127,33 @@ class PartitionableSourceITCase( ) ) } + + @Test + def testPartitionPrunerCompileClassLoader(): Unit = { + val udfJavaCode = + s""" + |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction { + | public String eval(String str) { + | return str.trim(); + | } + |} + |""".stripMargin + val tmpDir: File = TEMPORARY_FOLDER.newFolder() + val udfJarFile: File = TestUserClassLoaderJar.createJarFile( + tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode) + val jars: util.List[URL] = new util.ArrayList[URL]() + jars.add(udfJarFile.toURI.toURL) + val cl = ClientUtils.buildUserCodeClassLoader(jars, java.util.Collections.emptyList(), + getClass.getClassLoader, new Configuration()); + val ctx = TemporaryClassLoaderContext.of(cl) + tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'"); Review comment: No semicolon in scala code. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala ########## @@ -120,6 +127,33 @@ class PartitionableSourceITCase( ) ) } + + @Test + def testPartitionPrunerCompileClassLoader(): Unit = { + val udfJavaCode = + s""" + |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction { + | public String eval(String str) { + | return str.trim(); + | } + |} + |""".stripMargin + val tmpDir: File = TEMPORARY_FOLDER.newFolder() + val udfJarFile: File = TestUserClassLoaderJar.createJarFile( + tmpDir, "flink-test-udf.jar", "TrimUDF", udfJavaCode) + val jars: util.List[URL] = new util.ArrayList[URL]() + jars.add(udfJarFile.toURI.toURL) + val cl = ClientUtils.buildUserCodeClassLoader(jars, java.util.Collections.emptyList(), Review comment: nit: For lists with single constant element use `java.util.Collections.singletonList`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org