Till Rohrmann created FLINK-4250:
------------------------------------

             Summary: Cannot select column from CsvTableSource
                 Key: FLINK-4250
                 URL: https://issues.apache.org/jira/browse/FLINK-4250
             Project: Flink
          Issue Type: Bug
          Components: Scala API, Table API & SQL
    Affects Versions: 1.1.0
            Reporter: Till Rohrmann


Using the Scala Table API and the {{CsvTableSource}} I cannot select a column 
from the csv source. The following code:

{code}
package com.dataartisans.batch

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo}
import org.apache.flink.api.scala._
import org.apache.flink.api.table.sources.CsvTableSource
import org.apache.flink.api.table.{Row, TableEnvironment, Table}

object CsvTableAPIJob {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val csvFilePath = "table-jobs/src/main/resources/input.csv"

    val tblEnv = TableEnvironment.getTableEnvironment(env)
    val csvTS = new CsvTableSource(csvFilePath, Array("key", "user", "value", 
"timestamp"), Array(BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO))

    tblEnv.registerTableSource("foobar", csvTS)

    val input = tblEnv.sql("SELECT user FROM foobar")

    tblEnv.toDataSet[Row](input).print()
  }
}
{code}

fails with 

{code}
Exception in thread "main" org.apache.flink.api.table.codegen.CodeGenException: 
Unsupported call: USER
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:782)
        at scala.Option.getOrElse(Option.scala:120)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:782)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:54)
        at org.apache.calcite.rex.RexCall.accept(RexCall.java:108)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:168)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
        at 
org.apache.flink.api.table.codegen.CodeGenerator$$anonfun$5.apply(CodeGenerator.scala:286)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at 
org.apache.flink.api.table.codegen.CodeGenerator.generateResultExpression(CodeGenerator.scala:286)
        at 
org.apache.flink.api.table.plan.nodes.FlinkCalc$class.functionBody(FlinkCalc.scala:52)
        at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.functionBody(DataSetCalc.scala:39)
        at 
org.apache.flink.api.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:108)
        at 
org.apache.flink.api.table.BatchTableEnvironment.translate(BatchTableEnvironment.scala:271)
        at 
org.apache.flink.api.scala.table.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:139)
        at com.dataartisans.batch.CsvTableAPIJob$.main(CsvTableAPIJob.scala:21)
        at com.dataartisans.batch.CsvTableAPIJob.main(CsvTableAPIJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to