Just to clarify in order to spare us some time in the discussion. I *deliberately* want to use Flink Java API from Scala with Scala core types.
2015-01-20 18:53 GMT+01:00 Alexander Alexandrov < alexander.s.alexand...@gmail.com>: > Hi there, > > I cannot figure out how the Scala base types (e.g. scala.Int, > scala.Double, etc.) are mapped to the Flink runtime. > > It seems that there are not treated the same as their Java counterparts > (e.g. java.lang.Integer, java.lang.Double). For example, if I write the > following code: > > val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, String]](inPath, > "\n", '\t', classOf[Int], classOf[Int], classOf[String]) > val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, > String]]( > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.STRING_TYPE_INFO) > env.createInput(inputFormat, typeInformation) > > I get the following error: > > Exception in thread "main" java.lang.IllegalArgumentException: The type > 'int' is not supported for the CSV input format. > at > org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158) > at > org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133) > at > org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83) > ... > > I couldn't really also find pre-defined TypeInformation implementations > for these basic Scala types (similar to what we have in BasicTypeInfo) or > macro code that synthesized those. > > Can somebody elaborate on that? > > Thanks, > A. >