Hi! The Basic types hold the Java primitives and their boxed versions. Since the "scala.Int" boils down to the Java int primitive type at runtime, they should be interoperable.
I would guess that the problem is that "classOf[Int]" actually returns (in Java terms) "scala.Int.class", which is not recognized by the primitive types. We could add those, I assume, to make it work. The known basic types are statically set up in the class "BasicTypeInfo". You can have a look and try if adding the Scala core types solves the problem. The only issue is that you would have to add the scala library as a depencendy to "flink-java", or register the Scala types via reflection/class name. Greetings, Stephan On Tue, Jan 20, 2015 at 10:08 AM, Alexander Alexandrov < alexander.s.alexand...@gmail.com> wrote: > Just to clarify in order to spare us some time in the discussion. I > *deliberately* want to use Flink Java API from Scala with Scala core types. > > 2015-01-20 18:53 GMT+01:00 Alexander Alexandrov < > alexander.s.alexand...@gmail.com>: > > > Hi there, > > > > I cannot figure out how the Scala base types (e.g. scala.Int, > > scala.Double, etc.) are mapped to the Flink runtime. > > > > It seems that there are not treated the same as their Java counterparts > > (e.g. java.lang.Integer, java.lang.Double). For example, if I write the > > following code: > > > > val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int, > String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String]) > > val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int, > Int, String]]( > > BasicTypeInfo.INT_TYPE_INFO, > > BasicTypeInfo.INT_TYPE_INFO, > > BasicTypeInfo.STRING_TYPE_INFO) > > env.createInput(inputFormat, typeInformation) > > > > I get the following error: > > > > Exception in thread "main" java.lang.IllegalArgumentException: The type > > 'int' is not supported for the CSV input format. > > at > > > org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158) > > at > > > org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133) > > at > > > org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83) > > ... > > > > I couldn't really also find pre-defined TypeInformation implementations > > for these basic Scala types (similar to what we have in BasicTypeInfo) or > > macro code that synthesized those. > > > > Can somebody elaborate on that? > > > > Thanks, > > A. > > >