Hi Vishnu,

Thank you for the pointers/modified example, that was really helpful and it is 
working as expected now.


I took another look through the documentation and found in the "Window" section 
for streaming data, the "Recipes for building windows" sub-section, where it 
shows the countWindow being created with a GlobalWindow.  After applying your 
changes and it working, that section made more sense to me.


Cheers,

David

________________________________
From: Vishnu Viswanath <vishnu.viswanat...@gmail.com>
Sent: 13 July 2016 12:35:44
To: user@flink.apache.org
Subject: Re: countWindow custom WindowFunction


Hi David,

countWindow(size,slide) creates a GlobalWindow, not a TimeWindow. Also you have 
to use Tuple instead of Tuple2.

class SequentialDeltaCheck extends WindowFunction[RawObservation, String, 
Tuple, GlobalWindow]{

  def apply(key: Tuple, window: GlobalWindow, input: Iterable[RawObservation], 
out: Collector[String]): Unit = {
    val previous: Double = input.head.observation
    val current: Double = input.last.observation

    val delta: Double = current - previous
    out.collect(s"TEST-DELTA: $window, $delta")
  }
}


Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com<http://www.vishnuviswanath.com/>

On Wed, Jul 13, 2016 at 7:07 AM, Ciar, David B. 
<dcia...@ceh.ac.uk<mailto:dcia...@ceh.ac.uk>> wrote:

Hello everyone,


I'm relatively new to using Apache Flink and Scala, and am just getting to 
grips with some of the basic functionality both provide.  I've hit a wall 
trying to implement a custom WindowFunction over a keyed countWindow however, 
and hoped someone may have a pointer.  The full code is in a Gist 
(https://gist.github.com/dbciar/37df92d321c180f5e96e5e3f17806c91), and I am 
using version Flink 1.0.3, Scala 2.11.


So my workflow is that I read string values from a Kafka queue, parse these 
into a DataStream of RawObservation type using a custom map, and then create a 
keyed countWindow stream.


The problem is that when I try to implement a custom WindowFunction the IDE 
gives an error on the ".apply" function "Cannot resolve symbol apply".  I have 
a feeling that this might be caused by my WindowFunction not being implemented 
correctly and not matching the signature of the apply function.  I think this 
as when I remove the '[String]' return type from apply ('.apply[String]') I get 
the following errors:


--------------------------------------------------------------------------------------------------------

Unspecified value parameters: foldFunction: (NotInferedR, RawObservation) => 
NotInferedR, windowFunction: (Tuple, GlobalWindow, Iterable[NotInferedR], 
Collector[NotInferedR]) => Unit

Unspecified value parameters: foldFunction: FoldFunction[RawObservation, 
NotInferedR], function: WindowFunction[NotInferedR, NotInferedR, Tuple, 
GlobalWindow]


Unspecified value parameters: function: WindowFunction[RawObservation, 
NotInferedR, Tuple, GlobalWindow]

Unspecified value parameters: windowFunction: (Tuple, GlobalWindow, 
Iterable[RawObservation], Collector[NotInferedR]) => Unit

Type mismatch, expected: (Tuple, GlobalWindow, Iterable[RawObservation], 
Collector[NotInferedR]) => Unit, actual: SequentialDeltaCheck

Type mismatch, expected: WindowFunction[RawObservation, NotInferedR, Tuple, 
GlobalWindow], actual: SequentialDeltaCheck

--------------------------------------------------------------------------------------------------------



As an aside to this, when defining the WindowFunction, I wasn't sure if I was 
correctly setting the key type to Tuple2, as it is a compound key.


Any help or pointers to something I may have missed in the docs would be great, 
I've a had a look through but nothing jumped out at me.  I also think I could 
probably do this using the fold transform, but I wanted to try using window 
functions first.


Thanks,

David


The workflow:


val stream: DataStream[RawObservation] = env
  .addSource(new FlinkKafkaConsumer09[String]("sensor_raw", new 
SimpleStringSchema(), properties))
  .map(new RawTupleToObservation())

/**
  * Take the stream of RawObservation objects, parse out the Event Time and add 
watermarks,
  * key by the site and sensor values, then create a sliding countWindow for 
subsequent observations
  */
val timedObservations: DataStream[RawObservation] = stream
  .assignTimestampsAndWatermarks(new ObservationTimestamp())

val windowedObservations = timedObservations
  .keyBy("site")
  .countWindow(2,1)


val deltaStream: DataStream[String] = windowedObservations
  .apply[String](new SequentialDeltaCheck())


The WindowFunction:


class SequentialDeltaCheck extends WindowFunction[RawObservation, String, 
String, TimeWindow]{

  def apply(key: String, window: TimeWindow, input: Iterable[RawObservation], 
out: Collector[String]): Unit = {
    val previous: Double = input.head.observation
    val current: Double = input.last.observation

    val delta: Double = current - previous
    out.collect(s"TEST-DELTA: $window, $delta")
  }
}



________________________________
This message (and any attachments) is for the recipient only. NERC is subject 
to the Freedom of Information Act 2000 and the contents of this email and any 
reply you make may be disclosed by NERC unless it is exempt from release under 
the Act. Any material supplied to NERC may be stored in an electronic records 
management system.
________________________________

?
--
Thanks and Regards,
Vishnu Viswanath,
www.vishnuviswanath.com<http://www.vishnuviswanath.com>
________________________________
This message (and any attachments) is for the recipient only. NERC is subject 
to the Freedom of Information Act 2000 and the contents of this email and any 
reply you make may be disclosed by NERC unless it is exempt from release under 
the Act. Any material supplied to NERC may be stored in an electronic records 
management system.
________________________________

Reply via email to