Hi David, countWindow(size,slide) creates a GlobalWindow, not a TimeWindow. Also you have to use Tuple instead of Tuple2.
class SequentialDeltaCheck extends WindowFunction[RawObservation, String, Tuple, GlobalWindow]{ def apply(key: Tuple, window: GlobalWindow, input: Iterable[RawObservation], out: Collector[String]): Unit = { val previous: Double = input.head.observation val current: Double = input.last.observation val delta: Double = current - previous out.collect(s"TEST-DELTA: $window, $delta") } } Thanks and Regards, Vishnu Viswanath, www.vishnuviswanath.com On Wed, Jul 13, 2016 at 7:07 AM, Ciar, David B. <dcia...@ceh.ac.uk> wrote: Hello everyone, > > > I'm relatively new to using Apache Flink and Scala, and am just getting to > grips with some of the basic functionality both provide. I've hit a wall > trying to implement a custom WindowFunction over a keyed countWindow > however, and hoped someone may have a pointer. The full code is in a Gist ( > https://gist.github.com/dbciar/37df92d321c180f5e96e5e3f17806c91), and I > am using version Flink 1.0.3, Scala 2.11. > > > So my workflow is that I read string values from a Kafka queue, parse > these into a DataStream of RawObservation type using a custom map, and then > create a keyed countWindow stream. > > > The problem is that when I try to implement a custom WindowFunction the > IDE gives an error on the ".apply" function "Cannot resolve symbol apply". > I have a feeling that this might be caused by my WindowFunction not being > implemented correctly and not matching the signature of the apply > function. I think this as when I remove the '[String]' return type from > apply ('.apply[String]') I get the following errors: > > > -------------------------------------------------------------------- > ------------------------------------ > > Unspecified value parameters: foldFunction: (NotInferedR, RawObservation) > => NotInferedR, windowFunction: (Tuple, GlobalWindow, > Iterable[NotInferedR], Collector[NotInferedR]) => Unit > > Unspecified value parameters: foldFunction: FoldFunction[RawObservation, > NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple, > GlobalWindow] > > > Unspecified value parameters: function: WindowFunction[RawObservation, > NotInferedR, Tuple, GlobalWindow] > > Unspecified value parameters: windowFunction: (Tuple, GlobalWindow, > Iterable[RawObservation], Collector[NotInferedR]) => Unit > > Type mismatch, expected: (Tuple, GlobalWindow, Iterable[RawObservation], > Collector[NotInferedR]) => Unit, actual: SequentialDeltaCheck > > Type mismatch, expected: WindowFunction[RawObservation, NotInferedR, > Tuple, GlobalWindow], actual: SequentialDeltaCheck > > -------------------------------------------------------------------- > ------------------------------------ > > > As an aside to this, when defining the WindowFunction, I wasn't sure if I > was correctly setting the key type to Tuple2, as it is a compound key. > > > Any help or pointers to something I may have missed in the docs would be > great, I've a had a look through but nothing jumped out at me. I also > think I could probably do this using the fold transform, but I wanted to > try using window functions first. > > > Thanks, > > David > > > The workflow: > > > val stream: DataStream[RawObservation] = env > .addSource(new FlinkKafkaConsumer09[String]("sensor_raw", new > SimpleStringSchema(), properties)) > .map(new RawTupleToObservation()) > > /** > * Take the stream of RawObservation objects, parse out the Event Time and > add watermarks, > * key by the site and sensor values, then create a sliding countWindow for > subsequent observations > */ > val timedObservations: DataStream[RawObservation] = stream > .assignTimestampsAndWatermarks(new ObservationTimestamp()) > > val windowedObservations = timedObservations > .keyBy("site") > .countWindow(2,1) > > > val deltaStream: DataStream[String] = windowedObservations > .apply[String](new SequentialDeltaCheck()) > > > The WindowFunction: > > > class SequentialDeltaCheck extends WindowFunction[RawObservation, String, > String, TimeWindow]{ > > def apply(key: String, window: TimeWindow, input: Iterable[RawObservation], > out: Collector[String]): Unit = { > val previous: Double = input.head.observation > val current: Double = input.last.observation > > val delta: Double = current - previous > out.collect(s"TEST-DELTA: $window, $delta") > } > } > > > > ------------------------------ > This message (and any attachments) is for the recipient only. NERC is > subject to the Freedom of Information Act 2000 and the contents of this > email and any reply you make may be disclosed by NERC unless it is exempt > from release under the Act. Any material supplied to NERC may be stored in > an electronic records management system. > ------------------------------ > -- Thanks and Regards, Vishnu Viswanath, *www.vishnuviswanath.com <http://www.vishnuviswanath.com>*