Hi!

The Basic types hold the Java primitives and their boxed versions. Since
the "scala.Int" boils down to the Java int primitive type at runtime, they
should be interoperable.

I would guess that the problem is that "classOf[Int]" actually returns (in
Java terms) "scala.Int.class", which is not recognized by the primitive
types. We could add those, I assume, to make it work.

The known basic types are statically set up in the class "BasicTypeInfo".
You can have a look and try if adding the Scala core types solves the
problem.

The only issue is that you would have to add the scala library as a
depencendy to "flink-java", or register the Scala types via
reflection/class name.

Greetings,
Stephan


On Tue, Jan 20, 2015 at 10:08 AM, Alexander Alexandrov <
alexander.s.alexand...@gmail.com> wrote:

> Just to clarify in order to spare us some time in the discussion. I
> *deliberately* want to use Flink Java API from Scala with Scala core types.
>
> 2015-01-20 18:53 GMT+01:00 Alexander Alexandrov <
> alexander.s.alexand...@gmail.com>:
>
> > Hi there,
> >
> > I cannot figure out how the Scala base types (e.g. scala.Int,
> > scala.Double, etc.) are mapped to the Flink runtime.
> >
> > It seems that there are not treated the same as their Java counterparts
> > (e.g. java.lang.Integer, java.lang.Double). For example, if I write the
> > following code:
> >
> > val inputFormat = new CsvInputFormat[FlinkTuple3[Int, Int,
> String]](inPath, "\n", '\t', classOf[Int], classOf[Int], classOf[String])
> > val typeInformation = new fjava.typeutils.TupleTypeInfo[FlinkTuple3[Int,
> Int, String]](
> >   BasicTypeInfo.INT_TYPE_INFO,
> >   BasicTypeInfo.INT_TYPE_INFO,
> >   BasicTypeInfo.STRING_TYPE_INFO)
> > env.createInput(inputFormat, typeInformation)
> >
> > I get the following error:
> >
> > Exception in thread "main" java.lang.IllegalArgumentException: The type
> > 'int' is not supported for the CSV input format.
> >     at
> >
> org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:158)
> >     at
> >
> org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:133)
> >     at
> >
> org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:83)
> >     ...
> >
> > I couldn't really also find pre-defined TypeInformation implementations
> > for these basic Scala types (similar to what we have in BasicTypeInfo) or
> > macro code that synthesized those.
> >
> > Can somebody elaborate on that?
> >
> > Thanks,
> > A.
> >
>

Reply via email to