Hi,
right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have
the old windowing API in there alongside the new one. To make your example
use the new API that actually uses the timestamps and watermarks you would
use the following code:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.addSource(SourceWithEventTime)

stream
  .timeWindowAll(Time.of(5,TimeUnit.SECONDS))
  // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

the version for keyed streams would be:
stream
  .keyBy(...)
  .timeWindow(Time.of(5,TimeUnit.SECONDS))
  // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
  .sum(0)
  .print()

I hope this helps. :D

Cheers,
Aljoscha


On Wed, 7 Oct 2015 at 16:54 Alexander Kolb <alexander.k...@mni.fh-giessen.de>
wrote:

> Hi Guys,
>
> I'm trying to use the event-time windowing feature. But the windowing does
> not work as expected.
>
> What I've been doing is to write my own source which implements the
> EventTimeSourceFunction and uses the collectWithTimeStamp  method.
> Additionally I'm emitting a watermark after each element.
>
> My job to test this looks like this:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val stream = env.addSource(SourceWithEventTime)
>
> stream
>   .window(Time.of(5,TimeUnit.SECONDS))
>   .sum(0)
>   .flatten()
>   .print()
>
> env.execute()
>
> The Input are some tuples with TimeStamps set 10 seconds apart:
>
> value: (1,test) timestamp: 1444228980390
> value: (2,foo) timestamp: 1444228990390
> value: (3,bar) timestamp: 1444229000390
>
> What I'm expecting is that each tuple goes into a separate window.
> The actual output is the sum of all tuples, hence all tuples are collected
> in the same window.
>
> Thanks in advance!
> Alex
>
>
>

Reply via email to