Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6201#discussion_r198215161 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -712,7 +712,48 @@ class SqlITCase extends StreamingWithStateTestBase { "1,1,Hi,1970-01-01 00:00:00.001", "2,2,Hello,1970-01-01 00:00:00.002", "3,2,Hello world,1970-01-01 00:00:00.002") - assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) + assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableData.map(_.toString).sorted) + } + + @Test + def testWriteReadTableSourceSink(): Unit = { + var env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + var tEnv = TableEnvironment.getTableEnvironment(env) + MemoryTableSourceSinkUtil.clear + + val t = StreamTestData.getSmall3TupleDataStream(env) + .assignAscendingTimestamps(x => x._2) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + tEnv.registerTable("sourceTable", t) + + val fieldNames = Array("a", "e", "f", "t") + val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) + .asInstanceOf[Array[TypeInformation[_]]] + + val tableSchema = new TableSchema( + Array("a", "e", "f", "t", "rowtime", "proctime"), + Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP, + Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP) + .asInstanceOf[Array[TypeInformation[_]]], + Array("a", "e", "f", "t")) + tEnv.registerTableSource("targetTable", new MemoryTableSourceSinkUtil.UnsafeMemoryTableSource( + tableSchema, returnType, "rowtime", 3)) + tEnv.registerTableSink("targetTable", + new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink().configure(fieldNames, fieldTypes)) + + tEnv.sqlUpdate("INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable") + tEnv.sqlQuery("SELECT a, e, f, t, rowtime from targetTable") --- End diff -- I think we need more test cases about how we handle the time attributes for `both` table types. Maybe not only ITCases but also unit tests. The `configure` method is an internal method that should not be called here.
---