[ 
https://issues.apache.org/jira/browse/FLINK-10261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16604590#comment-16604590
 ] 

ASF GitHub Bot commented on FLINK-10261:
----------------------------------------

yanghua commented on a change in pull request #6648: [FLINK-10261][table] fix 
insert into with order by
URL: https://github.com/apache/flink/pull/6648#discussion_r215327606
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
 ##########
 @@ -105,6 +107,36 @@ class SortITCase extends StreamingWithStateTestBase {
       "20")
     assertEquals(expected, SortITCase.testResults)
   }
+
+  @Test
+  def testInsertIntoMemoryTableOrderBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+        .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+      "FROM sourceTable order by rowtime, a desc"
 
 Review comment:
   SQL keyword (`ORDER BY`) upper case looks better to me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> INSERT INTO does not work with ORDER BY clause
> ----------------------------------------------
>
>                 Key: FLINK-10261
>                 URL: https://issues.apache.org/jira/browse/FLINK-10261
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Timo Walther
>            Assignee: xueyu
>            Priority: Major
>              Labels: pull-request-available
>
> It seems that INSERT INTO and ORDER BY do not work well together.
> An AssertionError is thrown and the ORDER BY clause is duplicated. I guess 
> this is a Calcite issue.
> Example:
> {code}
> @Test
>   def testInsertIntoMemoryTable(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     MemoryTableSourceSinkUtil.clear()
>     val t = StreamTestData.getSmall3TupleDataStream(env)
>         .assignAscendingTimestamps(x => x._2)
>       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
>     tEnv.registerTable("sourceTable", t)
>     val fieldNames = Array("d", "e", "f", "t")
>     val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, 
> Types.SQL_TIMESTAMP)
>       .asInstanceOf[Array[TypeInformation[_]]]
>     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
>     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
>     val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM 
> sourceTable ORDER BY a"
>     tEnv.sqlUpdate(sql)
>     env.execute()
> {code}
> Error:
> {code}
> java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, 
> `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
> FROM `sourceTable` AS `sourceTable`
> ORDER BY `a`
> ORDER BY `a`
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
>       at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
>       at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
>       at 
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
>       at 
> org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to