slinkydeveloper commented on a change in pull request #18653: URL: https://github.com/apache/flink/pull/18653#discussion_r802413537
########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java ########## @@ -43,136 +44,81 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; import static org.apache.flink.table.api.Expressions.row; import static org.junit.Assert.assertEquals; /** - * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use - * MariaDB to mock a DB which use mysql driver too. + * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL + * to mock a DB. */ public class UnsignedTypeConversionITCase extends AbstractTestBase { private static final Logger logger = LoggerFactory.getLogger(UnsignedTypeConversionITCase.class); + private static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34"); private static final String DEFAULT_DB_NAME = "test"; private static final String TABLE_NAME = "unsigned_test"; - private static final int INITIALIZE_DB_MAX_RETRY = 3; - private static DB db; - private static String dbUrl; - private static Connection connection; + private static final String user = "root"; + private static final String password = ""; + + private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP = + new HashMap<String, String>() { + { + put("MYSQL_ROOT_HOST", "%"); + } + }; + + public static final MySQLContainer<?> MYSQL_CONTAINER = + new MySQLContainer<>(MYSQL_57_IMAGE) + .withEnv(DEFAULT_CONTAINER_ENV_MAP) + .withUsername(user) + .withPassword(password) + .withLogConsumer(new Slf4jLogConsumer(logger)); + + private static String baseUrl; + private static Connection connection; private StreamTableEnvironment tEnv; @BeforeClass - public static void prepareMariaDB() throws IllegalStateException { - boolean initDbSuccess = false; - int i = 0; - // The initialization of maria db instance is a little unstable according to past CI tests. - // Add retry logic here to avoid initialization failure. - while (i < INITIALIZE_DB_MAX_RETRY) { - try { - db = DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build()); - db.start(); - dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME); - connection = DriverManager.getConnection(dbUrl); - try (Statement statement = connection.createStatement()) { - statement.execute("CREATE DATABASE IF NOT EXISTS `" + DEFAULT_DB_NAME + "`;"); - ResultSet resultSet = - statement.executeQuery( - "SELECT SCHEMA_NAME FROM " - + "INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '" - + DEFAULT_DB_NAME - + "';"); - if (resultSet.next()) { - String dbName = resultSet.getString(1); - initDbSuccess = DEFAULT_DB_NAME.equalsIgnoreCase(dbName); - } - } - } catch (Exception e) { - logger.warn("Initialize DB failed.", e); - stopDb(); - } - if (initDbSuccess) { - break; + public static void launchContainer() throws Exception { + MYSQL_CONTAINER.start(); Review comment: Rather than manually starting the container, can you just use JUnit `@Rule` like shown in this doc? https://github.com/testcontainers/testcontainers-java/blob/master/modules/mysql/src/test/java/org/testcontainers/junit/mysql/SimpleMySQLTest.java#L42 This ties to the junit lifecycle, fixing for you all the issues related to start/stop leaks ########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java ########## @@ -43,136 +44,81 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; import static org.apache.flink.table.api.Expressions.row; import static org.junit.Assert.assertEquals; /** - * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use - * MariaDB to mock a DB which use mysql driver too. + * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL + * to mock a DB. */ public class UnsignedTypeConversionITCase extends AbstractTestBase { private static final Logger logger = LoggerFactory.getLogger(UnsignedTypeConversionITCase.class); + private static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34"); private static final String DEFAULT_DB_NAME = "test"; private static final String TABLE_NAME = "unsigned_test"; - private static final int INITIALIZE_DB_MAX_RETRY = 3; - private static DB db; - private static String dbUrl; - private static Connection connection; + private static final String user = "root"; + private static final String password = ""; + + private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP = + new HashMap<String, String>() { + { + put("MYSQL_ROOT_HOST", "%"); + } + }; + + public static final MySQLContainer<?> MYSQL_CONTAINER = + new MySQLContainer<>(MYSQL_57_IMAGE) + .withEnv(DEFAULT_CONTAINER_ENV_MAP) + .withUsername(user) + .withPassword(password) + .withLogConsumer(new Slf4jLogConsumer(logger)); + + private static String baseUrl; + private static Connection connection; private StreamTableEnvironment tEnv; @BeforeClass - public static void prepareMariaDB() throws IllegalStateException { - boolean initDbSuccess = false; - int i = 0; - // The initialization of maria db instance is a little unstable according to past CI tests. - // Add retry logic here to avoid initialization failure. - while (i < INITIALIZE_DB_MAX_RETRY) { - try { - db = DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build()); - db.start(); - dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME); - connection = DriverManager.getConnection(dbUrl); - try (Statement statement = connection.createStatement()) { - statement.execute("CREATE DATABASE IF NOT EXISTS `" + DEFAULT_DB_NAME + "`;"); - ResultSet resultSet = - statement.executeQuery( - "SELECT SCHEMA_NAME FROM " - + "INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '" - + DEFAULT_DB_NAME - + "';"); - if (resultSet.next()) { - String dbName = resultSet.getString(1); - initDbSuccess = DEFAULT_DB_NAME.equalsIgnoreCase(dbName); - } - } - } catch (Exception e) { - logger.warn("Initialize DB failed.", e); - stopDb(); - } - if (initDbSuccess) { - break; + public static void launchContainer() throws Exception { + MYSQL_CONTAINER.start(); + baseUrl = + MYSQL_CONTAINER + .getJdbcUrl() + .substring(0, MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/")); + connection = DriverManager.getConnection(baseUrl, user, password); + try (PreparedStatement ps = + connection.prepareStatement( + "CREATE DATABASE IF NOT EXISTS `" + DEFAULT_DB_NAME + "`;")) { + ps.execute(); + connection.close(); + connection = null; Review comment: why this statement, given in the next one you overwrite the connection variable anyway? ########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java ########## @@ -203,7 +149,12 @@ private void createFlinkTable() { + "big_un_c DECIMAL(20, 0)) with(" + " 'connector' = 'jdbc'," + " 'url' = '" - + dbUrl + + baseUrl + + "/" + + DEFAULT_DB_NAME + + "?user=" + + user + + "&password=&" Review comment: I think you can do directly `MYSQL_CONTAINER.getJdbcUrl()` ########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java ########## @@ -43,136 +44,81 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; import static org.apache.flink.table.api.Expressions.row; import static org.junit.Assert.assertEquals; /** - * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use - * MariaDB to mock a DB which use mysql driver too. + * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL + * to mock a DB. */ public class UnsignedTypeConversionITCase extends AbstractTestBase { private static final Logger logger = LoggerFactory.getLogger(UnsignedTypeConversionITCase.class); + private static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34"); private static final String DEFAULT_DB_NAME = "test"; private static final String TABLE_NAME = "unsigned_test"; - private static final int INITIALIZE_DB_MAX_RETRY = 3; - private static DB db; - private static String dbUrl; - private static Connection connection; + private static final String user = "root"; + private static final String password = ""; + + private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP = + new HashMap<String, String>() { + { + put("MYSQL_ROOT_HOST", "%"); + } + }; + + public static final MySQLContainer<?> MYSQL_CONTAINER = + new MySQLContainer<>(MYSQL_57_IMAGE) + .withEnv(DEFAULT_CONTAINER_ENV_MAP) + .withUsername(user) + .withPassword(password) + .withLogConsumer(new Slf4jLogConsumer(logger)); + + private static String baseUrl; + private static Connection connection; private StreamTableEnvironment tEnv; @BeforeClass - public static void prepareMariaDB() throws IllegalStateException { - boolean initDbSuccess = false; - int i = 0; - // The initialization of maria db instance is a little unstable according to past CI tests. - // Add retry logic here to avoid initialization failure. - while (i < INITIALIZE_DB_MAX_RETRY) { - try { - db = DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build()); - db.start(); - dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME); - connection = DriverManager.getConnection(dbUrl); - try (Statement statement = connection.createStatement()) { - statement.execute("CREATE DATABASE IF NOT EXISTS `" + DEFAULT_DB_NAME + "`;"); - ResultSet resultSet = - statement.executeQuery( - "SELECT SCHEMA_NAME FROM " - + "INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '" - + DEFAULT_DB_NAME - + "';"); - if (resultSet.next()) { - String dbName = resultSet.getString(1); - initDbSuccess = DEFAULT_DB_NAME.equalsIgnoreCase(dbName); - } - } - } catch (Exception e) { - logger.warn("Initialize DB failed.", e); - stopDb(); - } - if (initDbSuccess) { - break; + public static void launchContainer() throws Exception { + MYSQL_CONTAINER.start(); + baseUrl = + MYSQL_CONTAINER + .getJdbcUrl() + .substring(0, MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/")); + connection = DriverManager.getConnection(baseUrl, user, password); + try (PreparedStatement ps = + connection.prepareStatement( Review comment: Maybe use directly https://github.com/testcontainers/testcontainers-java/blob/master/modules/mysql/src/test/java/org/testcontainers/junit/mysql/SimpleMySQLTest.java#L142? ########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java ########## @@ -229,27 +180,84 @@ private void prepareData() { new Integer(127).byteValue(), new Integer(255).shortValue(), new Integer(32767).shortValue(), - Integer.valueOf(65535), - Integer.valueOf(2147483647), - Long.valueOf(4294967295L), - Long.valueOf(9223372036854775807L), + 65535, + 2147483647, + 4294967295L, + 9223372036854775807L, new BigDecimal(new BigInteger("18446744073709551615"), 0))); tEnv.createTemporaryView("data", dataTable); } - @After - public void cleanup() { - stopDb(); + @Test + public void testUnsignedType() throws Exception { + // write data to db + tEnv.executeSql( + "insert into jdbc_sink select" + + " tiny_c," + + " tiny_un_c," + + " small_c," + + " small_un_c ," + + " int_c," + + " int_un_c," + + " big_c ," + + " big_un_c from data") + .await(); + + // read data from db using jdbc connection and compare + PreparedStatement query = + connection.prepareStatement( + String.format( + "select tiny_c, tiny_un_c, small_c, small_un_c," + + " int_c, int_un_c, big_c, big_un_c from %s", + TABLE_NAME)); + ResultSet resultSet = query.executeQuery(); + while (resultSet.next()) { + assertEquals(127, resultSet.getObject("tiny_c")); + assertEquals(255, resultSet.getObject("tiny_un_c")); + assertEquals(32767, resultSet.getObject("small_c")); + assertEquals(65535, resultSet.getObject("small_un_c")); + assertEquals(2147483647, resultSet.getObject("int_c")); + assertEquals(4294967295L, resultSet.getObject("int_un_c")); + assertEquals(9223372036854775807L, resultSet.getObject("big_c")); + assertEquals(new BigInteger("18446744073709551615"), resultSet.getObject("big_un_c")); + } + + // read data from db using flink and compare + Iterator<Row> collected = + tEnv.executeSql( + "select tiny_c, tiny_un_c, small_c, small_un_c," + + " int_c, int_un_c, big_c, big_un_c from jdbc_source") + .collect(); + List<Row> result = CollectionUtil.iteratorToList(collected); + List<Row> expected = + Collections.singletonList( + Row.ofKind( + RowKind.INSERT, + Byte.parseByte("127"), Review comment: can you use casts and literal suffixes instead of "parse" methods? ########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java ########## @@ -229,27 +180,84 @@ private void prepareData() { new Integer(127).byteValue(), new Integer(255).shortValue(), new Integer(32767).shortValue(), - Integer.valueOf(65535), - Integer.valueOf(2147483647), - Long.valueOf(4294967295L), - Long.valueOf(9223372036854775807L), + 65535, + 2147483647, + 4294967295L, + 9223372036854775807L, new BigDecimal(new BigInteger("18446744073709551615"), 0))); tEnv.createTemporaryView("data", dataTable); } - @After - public void cleanup() { - stopDb(); + @Test + public void testUnsignedType() throws Exception { + // write data to db + tEnv.executeSql( + "insert into jdbc_sink select" + + " tiny_c," + + " tiny_un_c," + + " small_c," + + " small_un_c ," + + " int_c," + + " int_un_c," + + " big_c ," + + " big_un_c from data") + .await(); + + // read data from db using jdbc connection and compare + PreparedStatement query = + connection.prepareStatement( + String.format( + "select tiny_c, tiny_un_c, small_c, small_un_c," + + " int_c, int_un_c, big_c, big_un_c from %s", + TABLE_NAME)); + ResultSet resultSet = query.executeQuery(); + while (resultSet.next()) { + assertEquals(127, resultSet.getObject("tiny_c")); + assertEquals(255, resultSet.getObject("tiny_un_c")); + assertEquals(32767, resultSet.getObject("small_c")); + assertEquals(65535, resultSet.getObject("small_un_c")); + assertEquals(2147483647, resultSet.getObject("int_c")); + assertEquals(4294967295L, resultSet.getObject("int_un_c")); + assertEquals(9223372036854775807L, resultSet.getObject("big_c")); + assertEquals(new BigInteger("18446744073709551615"), resultSet.getObject("big_un_c")); + } + + // read data from db using flink and compare + Iterator<Row> collected = + tEnv.executeSql( + "select tiny_c, tiny_un_c, small_c, small_un_c," + + " int_c, int_un_c, big_c, big_un_c from jdbc_source") + .collect(); + List<Row> result = CollectionUtil.iteratorToList(collected); + List<Row> expected = + Collections.singletonList( + Row.ofKind( + RowKind.INSERT, + Byte.parseByte("127"), + Short.parseShort("255"), + Short.parseShort("32767"), + Integer.parseInt("65535"), + Integer.parseInt("2147483647"), + Long.parseLong("4294967295"), + Long.parseLong("9223372036854775807"), + new BigDecimal("18446744073709551615"))); + assertEquals(expected, result); Review comment: Please use only assertj assertions, here and throughout the rest of this test class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org