Hello again, Having had another go at this today, I clearly see that I cannot pass a certain type into the fold/window function and expect to be able to return a datastream of another type from the window function. I have tried a different approach and am now receiving a run-time exception, caused by trying to use a composite case class as the fold accumulator value. My query now is whether this is possible, and if it is possible, how to fix the run-time exception. Again any help is appreciated.
The exception: Exception in thread "main" java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to [Lorg.apache.flink.api.common.typeinfo.TypeInformation; at org.management.observations.processing.jobs.QCBlockNull$$anon$6.<init>(QCBlockNull.scala:104) at org.management.observations.processing.jobs.QCBlockNull$.main(QCBlockNull.scala:104) at org.management.observations.processing.jobs.QCBlockNull.main(QCBlockNull.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) The code snippet is pasted below, but also neater formatted Gist link: // The cause of the exception is the .apply(...) below and the use of IncrementalPlaceHolder. The fold and window classes return type IncrementalWindowPlaceholder val nullQCEvents1h = nullStream .keyBy("feature","procedure") .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(30))) .apply(new IncrementalWindowPlaceholder(0,None,None,None), new QCFoldCounter(), new QCCheckNullAggregate()) // The aggregate class I want to use with the fold/window function and emit as the DataStream type: case class IncrementalWindowPlaceholder (foldedValue: Double, keys: Option[Tuple], startTime: Option[Long], endTime: Option[Long]){ override def toString: String = foldedValue.toString+','+keys.getOrElse('-')+','+startTime.getOrElse('-')+','+endTime.getOrElse('-') } Also here: https://gist.github.com/dbciar/904e2d35d6aae30214666de1176f1d7c -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Functions-with-Incremental-Aggregation-tp8246p8259.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.