Hello again,

Having had another go at this today, I clearly see that I cannot pass a
certain type into the fold/window function and expect to be able to return a
datastream of another type from the window function.  I have tried a
different approach and am now receiving a run-time exception, caused by
trying to use a composite case class as the fold accumulator value.  My
query now is whether this is possible, and if it is possible, how to fix the
run-time exception.  Again any help is appreciated.

The exception:

Exception in thread "main" java.lang.ClassCastException: [Ljava.lang.Object;
cannot be cast to [Lorg.apache.flink.api.common.typeinfo.TypeInformation;
        at
org.management.observations.processing.jobs.QCBlockNull$$anon$6.<init>(QCBlockNull.scala:104)
        at
org.management.observations.processing.jobs.QCBlockNull$.main(QCBlockNull.scala:104)
        at
org.management.observations.processing.jobs.QCBlockNull.main(QCBlockNull.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

The code snippet is pasted below, but also neater formatted Gist link:

// The cause of the exception is the .apply(...) below and the use of
IncrementalPlaceHolder.  The fold and window classes return type
IncrementalWindowPlaceholder

    val nullQCEvents1h = nullStream
      .keyBy("feature","procedure")
      .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(30)))
      .apply(new IncrementalWindowPlaceholder(0,None,None,None),
        new QCFoldCounter(),
        new QCCheckNullAggregate())

// The aggregate class I want to use with the fold/window function and emit
as the DataStream type:

case class IncrementalWindowPlaceholder (foldedValue: Double,
                                         keys: Option[Tuple],
                                         startTime: Option[Long],
                                         endTime: Option[Long]){

  override def toString: String =
   
foldedValue.toString+','+keys.getOrElse('-')+','+startTime.getOrElse('-')+','+endTime.getOrElse('-')
}

Also here:
https://gist.github.com/dbciar/904e2d35d6aae30214666de1176f1d7c





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-Functions-with-Incremental-Aggregation-tp8246p8259.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to