Hi, as far as I can see it the problem is in this line: k.sum(3) using field indices is only valid for Tuple Types. In your case you should be able to use this: k.sum(“field3”)
because this is a field of your Reading type. Cheers, Aljoscha > On 26 Feb 2016, at 02:44, Nirmalya Sengupta <sengupta.nirma...@gmail.com> > wrote: > > Hello Flinksters, > > I am trying to use Flinkspector in a Scala code snippet of mine and Flink is > complaining. The code is here: > > --------------------------------------------------------------------------------------------------------------- > > case class Reading(field1:String,field2:String,field3:Int) > > object MultiWindowing { > > def main(args: Array[String]) {} > > // WindowFunction<IN,OUT,KEY,W extends Window> > > class WindowPrinter extends WindowFunction[Reading, String, String, > TimeWindow] { > > // ..... > } > } > > val env = DataStreamTestEnvironment.createTestEnvironment(1) > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val input: EventTimeInput[Reading] = > EventTimeInputBuilder > .startWith(Reading("hans", "elephant", 15)) > .emit(Reading("susi", "arctic", 20), After.period(30, TimeUnit.SECONDS)) > .emit(Reading("pete", "elephant", 40), After.period(20, TimeUnit.SECONDS)) > > //acquire data source from input > val stream = env.fromInput(input) > > //apply transformation > val k = stream.keyBy(new KeySelector [Reading,String] { > def getKey(r:Reading) = r.field2 > }) > .timeWindow(Time.of(5, TimeUnit.MINUTES), Time.of(1, TimeUnit.MINUTES)) > > k.sum(3) > .print() > > env.execute() > > } > > --------------------------------------------------------------------------------------------------------------- > > And at runtime, I get this error: > > ---------------------------------------------------------------------------------------------------------------- > > Exception in thread "main" java.lang.ExceptionInInitializerError > at > org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing.main(MultiWindowing.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a > simple type (non-tuple, non-array). > at > org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:76) > at > org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:37) > at > org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:373) > at > org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<init>(MultiWindowing.scala:63) > at > org.apache.flink.quickstart.StreamWindowExperiments.MultiWindowing$.<clinit>(MultiWindowing.scala) > ... 6 more > > > --------------------------------------------------------------------------------------------------------------- > > Can someone help me by pointing out the mistake I am making? > > -- Nirmalya > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is > where they should be. > Now put the foundation under them."