Till Rohrmann created FLINK-4250: ------------------------------------ Summary: Cannot select column from CsvTableSource Key: FLINK-4250 URL: https://issues.apache.org/jira/browse/FLINK-4250 Project: Flink Issue Type: Bug Components: Scala API, Table API & SQL Affects Versions: 1.1.0 Reporter: Till Rohrmann
Using the Scala Table API and the {{CsvTableSource}} I cannot select a column from the csv source. The following code: {code} package com.dataartisans.batch import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo} import org.apache.flink.api.scala._ import org.apache.flink.api.table.sources.CsvTableSource import org.apache.flink.api.table.{Row, TableEnvironment, Table} object CsvTableAPIJob { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val csvFilePath = "table-jobs/src/main/resources/input.csv" val tblEnv = TableEnvironment.getTableEnvironment(env) val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", "timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) tblEnv.registerTableSource("foobar", csvTS) val input = tblEnv.sql("SELECT user FROM foobar") tblEnv.toDataSet[Row](input).print() } } {code} fails with {code} Exception in thread "main" org.apache.flink.api.table.codegen.CodeGenException: Unsupported call: USER at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782) at scala.Option.getOrElse(Option.scala:120) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782) at org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54) at org.apache.calcite.rex.RexCall.accept(RexCall.java:108) at org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) at org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286) at org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52) at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39) at org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108) at org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271) at org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139) at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21) at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)