slinkydeveloper commented on a change in pull request #18653:
URL: https://github.com/apache/flink/pull/18653#discussion_r803774457



##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java
##########
@@ -21,161 +21,124 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 
-import ch.vorburger.exec.ManagedProcessException;
-import ch.vorburger.mariadb4j.DB;
-import ch.vorburger.mariadb4j.DBConfigurationBuilder;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
 
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Map;
 
-import static org.apache.flink.table.api.Expressions.row;
-import static org.junit.Assert.assertEquals;
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
- * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use
- * MariaDB to mock a DB which use mysql driver too.
+ * Test unsigned type conversion between Flink and JDBC driver mysql, the test 
underlying use MySQL
+ * to mock a DB.
  */
 public class UnsignedTypeConversionITCase extends AbstractTestBase {
 
-    private static final Logger logger =
+    private static final Logger LOGGER =
             LoggerFactory.getLogger(UnsignedTypeConversionITCase.class);
+
+    private static final DockerImageName MYSQL_57_IMAGE = 
DockerImageName.parse("mysql:5.7.34");
     private static final String DEFAULT_DB_NAME = "test";
     private static final String TABLE_NAME = "unsigned_test";
-    private static final int INITIALIZE_DB_MAX_RETRY = 3;
-    private static DB db;
-    private static String dbUrl;
-    private static Connection connection;
-
-    private StreamTableEnvironment tEnv;
-
-    @BeforeClass
-    public static void prepareMariaDB() throws IllegalStateException {
-        boolean initDbSuccess = false;
-        int i = 0;
-        // The initialization of maria db instance is a little unstable 
according to past CI tests.
-        // Add retry logic here to avoid initialization failure.
-        while (i < INITIALIZE_DB_MAX_RETRY) {
-            try {
-                db = 
DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build());
-                db.start();
-                dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME);
-                connection = DriverManager.getConnection(dbUrl);
-                try (Statement statement = connection.createStatement()) {
-                    statement.execute("CREATE DATABASE IF NOT EXISTS `" + 
DEFAULT_DB_NAME + "`;");
-                    ResultSet resultSet =
-                            statement.executeQuery(
-                                    "SELECT SCHEMA_NAME FROM "
-                                            + "INFORMATION_SCHEMA.SCHEMATA 
WHERE SCHEMA_NAME = '"
-                                            + DEFAULT_DB_NAME
-                                            + "';");
-                    if (resultSet.next()) {
-                        String dbName = resultSet.getString(1);
-                        initDbSuccess = 
DEFAULT_DB_NAME.equalsIgnoreCase(dbName);
-                    }
+    private static final String USER = "root";
+    private static final String PASSWORD = "";
+    private static final List<String> COLUMNS =
+            Arrays.asList(
+                    "tiny_c",
+                    "tiny_un_c",
+                    "small_c",
+                    "small_un_c",
+                    "int_c",
+                    "int_un_c",
+                    "big_c",
+                    "big_un_c");
+
+    private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
                 }
-            } catch (Exception e) {
-                logger.warn("Initialize DB failed.", e);
-                stopDb();
-            }
-            if (initDbSuccess) {
-                break;
-            }
-            i++;
-        }
-        if (!initDbSuccess) {
-            throw new IllegalStateException(
-                    String.format(
-                            "Initialize MySQL database instance failed after 
%d attempts,"
-                                    + " please open an issue.",
-                            INITIALIZE_DB_MAX_RETRY));
-        }
-    }
-
-    @Before
-    public void setUp() throws SQLException, IllegalStateException {
-        // dbUrl: jdbc:mysql://localhost:3306/test
-        createMysqlTable();
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        tEnv = StreamTableEnvironment.create(env);
-        createFlinkTable();
-        prepareData();
-    }
+            };
+
+    private static final Object[] ROW =
+            new Object[] {
+                (byte) 127,
+                (short) 255,
+                (short) 32767,
+                65535,
+                2147483647,
+                4294967295L,
+                9223372036854775807L,
+                new BigDecimal("18446744073709551615")
+            };
+
+    @ClassRule
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withUsername(USER)
+                    .withPassword(PASSWORD)
+                    .withDatabaseName(DEFAULT_DB_NAME)
+                    .withLogConsumer(new Slf4jLogConsumer(LOGGER));
 
     @Test
     public void testUnsignedType() throws Exception {
+        Connection con = 
DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), USER, PASSWORD);

Review comment:
       You're not closing this connection




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to