Just to clarify in order to spare us some time in the discussion. I
*deliberately* want to use Flink Java API from Scala with Scala core types.

2015-01-20 18:53 GMT+01:00 Alexander Alexandrov <
alexander.s.alexand...@gmail.com>:

> Hi there,
>
> I cannot figure out how the Scala base types (e.g. scala.Int,
> scala.Double, etc.) are mapped to the Flink runtime.
>
> It seems that there are not treated the same as their Java counterparts
> (e.g. java.lang.Integer, java.lang.Double). For example, if I write the
> following code:
>
> val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, String]](inPath, 
> "\n", '\t', classOf[Int], classOf[Int], classOf[String])
> val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, Int, 
> String]](
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.STRING_TYPE_INFO)
> env.createInput(inputFormat, typeInformation)
>
> I get the following error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: The type
> 'int' is not supported for the CSV input format.
>     at
> org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158)
>     at
> org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133)
>     at
> org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83)
>     ...
>
> I couldn't really also find pre-defined TypeInformation implementations
> for these basic Scala types (similar to what we have in BasicTypeInfo) or
> macro code that synthesized those.
>
> Can somebody elaborate on that?
>
> Thanks,
> A.
>

Reply via email to