slinkydeveloper commented on a change in pull request #18653: URL: https://github.com/apache/flink/pull/18653#discussion_r803774457
########## File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/UnsignedTypeConversionITCase.java ########## @@ -21,161 +21,124 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; -import ch.vorburger.exec.ManagedProcessException; -import ch.vorburger.mariadb4j.DB; -import ch.vorburger.mariadb4j.DBConfigurationBuilder; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; import java.math.BigDecimal; -import java.math.BigInteger; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collections; -import java.util.Iterator; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; +import java.util.Map; -import static org.apache.flink.table.api.Expressions.row; -import static org.junit.Assert.assertEquals; +import static java.lang.String.format; +import static java.lang.String.join; +import static org.assertj.core.api.Assertions.assertThat; /** - * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use - * MariaDB to mock a DB which use mysql driver too. + * Test unsigned type conversion between Flink and JDBC driver mysql, the test underlying use MySQL + * to mock a DB. */ public class UnsignedTypeConversionITCase extends AbstractTestBase { - private static final Logger logger = + private static final Logger LOGGER = LoggerFactory.getLogger(UnsignedTypeConversionITCase.class); + + private static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34"); private static final String DEFAULT_DB_NAME = "test"; private static final String TABLE_NAME = "unsigned_test"; - private static final int INITIALIZE_DB_MAX_RETRY = 3; - private static DB db; - private static String dbUrl; - private static Connection connection; - - private StreamTableEnvironment tEnv; - - @BeforeClass - public static void prepareMariaDB() throws IllegalStateException { - boolean initDbSuccess = false; - int i = 0; - // The initialization of maria db instance is a little unstable according to past CI tests. - // Add retry logic here to avoid initialization failure. - while (i < INITIALIZE_DB_MAX_RETRY) { - try { - db = DB.newEmbeddedDB(DBConfigurationBuilder.newBuilder().build()); - db.start(); - dbUrl = db.getConfiguration().getURL(DEFAULT_DB_NAME); - connection = DriverManager.getConnection(dbUrl); - try (Statement statement = connection.createStatement()) { - statement.execute("CREATE DATABASE IF NOT EXISTS `" + DEFAULT_DB_NAME + "`;"); - ResultSet resultSet = - statement.executeQuery( - "SELECT SCHEMA_NAME FROM " - + "INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '" - + DEFAULT_DB_NAME - + "';"); - if (resultSet.next()) { - String dbName = resultSet.getString(1); - initDbSuccess = DEFAULT_DB_NAME.equalsIgnoreCase(dbName); - } + private static final String USER = "root"; + private static final String PASSWORD = ""; + private static final List<String> COLUMNS = + Arrays.asList( + "tiny_c", + "tiny_un_c", + "small_c", + "small_un_c", + "int_c", + "int_un_c", + "big_c", + "big_un_c"); + + private static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP = + new HashMap<String, String>() { + { + put("MYSQL_ROOT_HOST", "%"); } - } catch (Exception e) { - logger.warn("Initialize DB failed.", e); - stopDb(); - } - if (initDbSuccess) { - break; - } - i++; - } - if (!initDbSuccess) { - throw new IllegalStateException( - String.format( - "Initialize MySQL database instance failed after %d attempts," - + " please open an issue.", - INITIALIZE_DB_MAX_RETRY)); - } - } - - @Before - public void setUp() throws SQLException, IllegalStateException { - // dbUrl: jdbc:mysql://localhost:3306/test - createMysqlTable(); - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - tEnv = StreamTableEnvironment.create(env); - createFlinkTable(); - prepareData(); - } + }; + + private static final Object[] ROW = + new Object[] { + (byte) 127, + (short) 255, + (short) 32767, + 65535, + 2147483647, + 4294967295L, + 9223372036854775807L, + new BigDecimal("18446744073709551615") + }; + + @ClassRule + public static final MySQLContainer<?> MYSQL_CONTAINER = + new MySQLContainer<>(MYSQL_57_IMAGE) + .withEnv(DEFAULT_CONTAINER_ENV_MAP) + .withUsername(USER) + .withPassword(PASSWORD) + .withDatabaseName(DEFAULT_DB_NAME) + .withLogConsumer(new Slf4jLogConsumer(LOGGER)); @Test public void testUnsignedType() throws Exception { + Connection con = DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), USER, PASSWORD); Review comment: You're not closing this connection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org