Hi,

I'm trying to count the number of events in a window (every 5 seconds).
The code below works fine if there are events in the window, if there are
no events in the window no output is emitted.

What I want to achieve is a count of 0 when there are no events in the time
window of 5 seconds.

Can you help me?


Thanks

DataStreamSource<ObjectNode> stream = env.addSource(myConsumer);
DataStream<InputTuple_usb_monitor> tupledStream = stream.map(new
Json2Tuple());
SingleOutputStreamOperator<AvgCountSum> out = tupledStream
.filter(new FilterFunction<InputTuple_usb_monitor>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(InputTuple_usb_monitor arg0) throws Exception {
return (arg0.usb_code.equals("UMDFHostDeviceArrivalBegin"));
}
})
.timeWindowAll(Time.seconds(5))
.aggregate(new AvgCountAggregate());
out.print();

Reply via email to