This is actually not a bug, or a POJO or Avro problem. It is simply a limitation in the functionality, as the exception message says: "Specifying fields by name is only supported on Case Classes (for now)."
Try this with a regular reduce function that selects the max and it should work fine... Greetings, Stephan On Wed, Oct 21, 2015 at 3:46 PM, aawhitaker <aawhita...@gmail.com> wrote: > Till Rohrmann wrote > > What was your problem with using Java POJOs with the Scala API? > > Here's a quick example > <https://gist.github.com/AndrewWhitaker/e51308bb4b43f7ddefc3> that > demonstrates some of the problems I'm having. I used `max` in the example, > but actually I get an exception for most of the operations I try directly > on > Java POJOs. > > The "User" class referenced here is just the Avro example schema hydrated > into a Java POJO. I can post that or the entire project if it'd be helpful. > > I included the stack trace of the exception in the gist, but I'll post it > here too: > > Exception in thread "main" java.lang.UnsupportedOperationException: > Specifying fields by name is onlysupported on Case Classes (for now). > at > org.apache.flink.api.scala.package$.fieldNames2Indices(package.scala:62) > at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:466) > at org.apache.flink.api.scala.DataSet.max(DataSet.scala:503) > at SampleAvroJob$.main(SampleAvroJob.scala:12) > at SampleAvroJob.main(SampleAvroJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > And if I use field position instead of field name, I get this exception: > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: Aggregating on field > positions is only possible on tuple data types. > at > > org.apache.flink.api.scala.operators.ScalaAggregateOperator.<init>(ScalaAggregateOperator.java:71) > at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:455) > at org.apache.flink.api.scala.DataSet.max(DataSet.scala:482) > at SampleAvroJob$.main(SampleAvroJob.scala:12) > at SampleAvroJob.main(SampleAvroJob.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > A workaround is to use `.map` to map to tuples first, but this seems a > little clunky. > > Thanks! > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3202.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >