This is actually not a bug, or a POJO or Avro problem. It is simply a
limitation in the functionality, as the exception message says: "Specifying
fields by name is only supported on Case Classes (for now)."

Try this with a regular reduce function that selects the max and it should
work fine...

Greetings,
Stephan


On Wed, Oct 21, 2015 at 3:46 PM, aawhitaker <aawhita...@gmail.com> wrote:

> Till Rohrmann wrote
> > What was your problem with using Java POJOs with the Scala API?
>
> Here's a quick example
> <https://gist.github.com/AndrewWhitaker/e51308bb4b43f7ddefc3>   that
> demonstrates some of the problems I'm having. I used `max` in the example,
> but actually I get an exception for most of the operations I try directly
> on
> Java POJOs.
>
> The "User" class referenced here is just the Avro example schema hydrated
> into a Java POJO. I can post that or the entire project if it'd be helpful.
>
> I included the stack trace of the exception in the gist, but I'll post it
> here too:
>
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Specifying fields by name is onlysupported on Case Classes (for now).
>         at
> org.apache.flink.api.scala.package$.fieldNames2Indices(package.scala:62)
>         at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:466)
>         at org.apache.flink.api.scala.DataSet.max(DataSet.scala:503)
>         at SampleAvroJob$.main(SampleAvroJob.scala:12)
>         at SampleAvroJob.main(SampleAvroJob.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> And if I use field position instead of field name, I get this exception:
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Aggregating on field
> positions is only possible on tuple data types.
>         at
>
> org.apache.flink.api.scala.operators.ScalaAggregateOperator.<init>(ScalaAggregateOperator.java:71)
>         at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:455)
>         at org.apache.flink.api.scala.DataSet.max(DataSet.scala:482)
>         at SampleAvroJob$.main(SampleAvroJob.scala:12)
>         at SampleAvroJob.main(SampleAvroJob.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> A workaround is to use `.map` to map to tuples first, but this seems a
> little clunky.
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3202.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to