slinkydeveloper commented on a change in pull request #18653:
URL: https://github.com/apache/flink/pull/18653#discussion_r802413537



##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,136 +44,81 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
 
 import static org.apache.flink.table.api.Expressions.row;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
     private static final Logger logger =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
 
+    private static final String user = "root";
+    private static final String password = "";
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
+                }
+            };
+
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(user)
+                    .withPassword(password)
+                    .withLogConsumer(new Slf4jLogConsumer(logger));
+
+    private static String baseUrl;
+    private static Connection connection;
     private StreamTableEnvironment tEnv;
 
     @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
-                }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
+    public static void launchContainer() throws Exception {
+        MYSQL_CONTAINER.start();

Review comment:
       Rather than manually starting the container, can you just use JUnit 
`@Rule` like shown in this doc? 
https://github.com/testcontainers/testcontainers-java/blob/master/modules/mysql/src/test/java/org/testcontainers/junit/mysql/SimpleMySQLTest.java#L42
   
   This ties to the junit lifecycle, fixing for you all the issues related to 
start/stop leaks

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,136 +44,81 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
 
 import static org.apache.flink.table.api.Expressions.row;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
     private static final Logger logger =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
 
+    private static final String user = "root";
+    private static final String password = "";
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
+                }
+            };
+
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(user)
+                    .withPassword(password)
+                    .withLogConsumer(new Slf4jLogConsumer(logger));
+
+    private static String baseUrl;
+    private static Connection connection;
     private StreamTableEnvironment tEnv;
 
     @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
-                }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
+    public static void launchContainer() throws Exception {
+        MYSQL_CONTAINER.start();
+        baseUrl =
+                MYSQL_CONTAINER
+                        .getJdbcUrl()
+                        .substring(0, 
MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/"));
+        connection = DriverManager.getConnection(baseUrl, user, password);
+        try (PreparedStatement ps =
+                connection.prepareStatement(
+                        "CREATE DATABASE IF NOT EXISTS `" + DEFAULT_DB_NAME + 
"`;")) {
+            ps.execute();
+            connection.close();
+            connection = null;

Review comment:
       why this statement, given in the next one you overwrite the connection 
variable anyway?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -203,7 +149,12 @@ private void createFlinkTable() {
                         + "big_un_c DECIMAL(20, 0)) with("
                         + " 'connector' = 'jdbc',"
                         + " 'url' = '"
-                        + dbUrl
+                        + baseUrl
+                        + "/"
+                        + DEFAULT_DB_NAME
+                        + "?user="
+                        + user
+                        + "&password=&"

Review comment:
       I think you can do directly `MYSQL_CONTAINER.getJdbcUrl()`

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -43,136 +44,81 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
 
 import static org.apache.flink.table.api.Expressions.row;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
     private static final Logger logger =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
 
+    private static final String user = "root";
+    private static final String password = "";
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
+                }
+            };
+
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(user)
+                    .withPassword(password)
+                    .withLogConsumer(new Slf4jLogConsumer(logger));
+
+    private static String baseUrl;
+    private static Connection connection;
     private StreamTableEnvironment tEnv;
 
     @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
-                }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
+    public static void launchContainer() throws Exception {
+        MYSQL_CONTAINER.start();
+        baseUrl =
+                MYSQL_CONTAINER
+                        .getJdbcUrl()
+                        .substring(0, 
MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/"));
+        connection = DriverManager.getConnection(baseUrl, user, password);
+        try (PreparedStatement ps =
+                connection.prepareStatement(

Review comment:
       Maybe use directly 
https://github.com/testcontainers/testcontainers-java/blob/master/modules/mysql/src/test/java/org/testcontainers/junit/mysql/SimpleMySQLTest.java#L142?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -229,27 +180,84 @@ private void prepareData() {
                                 new Integer(127).byteValue(),
                                 new Integer(255).shortValue(),
                                 new Integer(32767).shortValue(),
-                                Integer.valueOf(65535),
-                                Integer.valueOf(2147483647),
-                                Long.valueOf(4294967295L),
-                                Long.valueOf(9223372036854775807L),
+                                65535,
+                                2147483647,
+                                4294967295L,
+                                9223372036854775807L,
                                 new BigDecimal(new 
BigInteger("18446744073709551615"), 0)));
         tEnv.createTemporaryView("data", dataTable);
     }
 
-    @After
-    public void cleanup() {
-        stopDb();
+    @Test
+    public void testUnsignedType() throws Exception {
+        // write data to db
+        tEnv.executeSql(
+                        "insert into jdbc_sink select"
+                                + " tiny_c,"
+                                + " tiny_un_c,"
+                                + " small_c,"
+                                + " small_un_c ,"
+                                + " int_c,"
+                                + " int_un_c,"
+                                + " big_c ,"
+                                + " big_un_c from data")
+                .await();
+
+        // read data from db using jdbc connection and compare
+        PreparedStatement query =
+                connection.prepareStatement(
+                        String.format(
+                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
+                                        + " int_c, int_un_c, big_c, big_un_c 
from %s",
+                                TABLE_NAME));
+        ResultSet resultSet = query.executeQuery();
+        while (resultSet.next()) {
+            assertEquals(127, resultSet.getObject("tiny_c"));
+            assertEquals(255, resultSet.getObject("tiny_un_c"));
+            assertEquals(32767, resultSet.getObject("small_c"));
+            assertEquals(65535, resultSet.getObject("small_un_c"));
+            assertEquals(2147483647, resultSet.getObject("int_c"));
+            assertEquals(4294967295L, resultSet.getObject("int_un_c"));
+            assertEquals(9223372036854775807L, resultSet.getObject("big_c"));
+            assertEquals(new BigInteger("18446744073709551615"), 
resultSet.getObject("big_un_c"));
+        }
+
+        // read data from db using flink and compare
+        Iterator<Row> collected =
+                tEnv.executeSql(
+                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
+                                        + " int_c, int_un_c, big_c, big_un_c 
from jdbc_source")
+                        .collect();
+        List<Row> result = CollectionUtil.iteratorToList(collected);
+        List<Row> expected =
+                Collections.singletonList(
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                Byte.parseByte("127"),

Review comment:
       can you use casts and literal suffixes instead of "parse" methods?

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -229,27 +180,84 @@ private void prepareData() {
                                 new Integer(127).byteValue(),
                                 new Integer(255).shortValue(),
                                 new Integer(32767).shortValue(),
-                                Integer.valueOf(65535),
-                                Integer.valueOf(2147483647),
-                                Long.valueOf(4294967295L),
-                                Long.valueOf(9223372036854775807L),
+                                65535,
+                                2147483647,
+                                4294967295L,
+                                9223372036854775807L,
                                 new BigDecimal(new 
BigInteger("18446744073709551615"), 0)));
         tEnv.createTemporaryView("data", dataTable);
     }
 
-    @After
-    public void cleanup() {
-        stopDb();
+    @Test
+    public void testUnsignedType() throws Exception {
+        // write data to db
+        tEnv.executeSql(
+                        "insert into jdbc_sink select"
+                                + " tiny_c,"
+                                + " tiny_un_c,"
+                                + " small_c,"
+                                + " small_un_c ,"
+                                + " int_c,"
+                                + " int_un_c,"
+                                + " big_c ,"
+                                + " big_un_c from data")
+                .await();
+
+        // read data from db using jdbc connection and compare
+        PreparedStatement query =
+                connection.prepareStatement(
+                        String.format(
+                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
+                                        + " int_c, int_un_c, big_c, big_un_c 
from %s",
+                                TABLE_NAME));
+        ResultSet resultSet = query.executeQuery();
+        while (resultSet.next()) {
+            assertEquals(127, resultSet.getObject("tiny_c"));
+            assertEquals(255, resultSet.getObject("tiny_un_c"));
+            assertEquals(32767, resultSet.getObject("small_c"));
+            assertEquals(65535, resultSet.getObject("small_un_c"));
+            assertEquals(2147483647, resultSet.getObject("int_c"));
+            assertEquals(4294967295L, resultSet.getObject("int_un_c"));
+            assertEquals(9223372036854775807L, resultSet.getObject("big_c"));
+            assertEquals(new BigInteger("18446744073709551615"), 
resultSet.getObject("big_un_c"));
+        }
+
+        // read data from db using flink and compare
+        Iterator<Row> collected =
+                tEnv.executeSql(
+                                "select tiny_c, tiny_un_c, small_c, 
small_un_c,"
+                                        + " int_c, int_un_c, big_c, big_un_c 
from jdbc_source")
+                        .collect();
+        List<Row> result = CollectionUtil.iteratorToList(collected);
+        List<Row> expected =
+                Collections.singletonList(
+                        Row.ofKind(
+                                RowKind.INSERT,
+                                Byte.parseByte("127"),
+                                Short.parseShort("255"),
+                                Short.parseShort("32767"),
+                                Integer.parseInt("65535"),
+                                Integer.parseInt("2147483647"),
+                                Long.parseLong("4294967295"),
+                                Long.parseLong("9223372036854775807"),
+                                new BigDecimal("18446744073709551615")));
+        assertEquals(expected, result);

Review comment:
       Please use only assertj assertions, here and throughout the rest of this 
test class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to