Thanks!

This works with the exception that I have to use the reduceWindow() method
when summing up my the content of the window.
There still seems to be some work to do.

With the finished Api will I be able to switch from event-time to
processing- or ingestion-time without having to adjust my code?

Best,
Alex

Aljoscha Krettek <aljos...@apache.org> schrieb am Mi., 7. Okt. 2015, 17:23:

> Hi,
> right now, the 0.10-SNAPSHOT is in a bit of a weird state. We still have
> the old windowing API in there alongside the new one. To make your example
> use the new API that actually uses the timestamps and watermarks you would
> use the following code:
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val stream = env.addSource(SourceWithEventTime)
>
> stream
>   .timeWindowAll(Time.of(5,TimeUnit.SECONDS))
>   // or .windowAll(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
>   .sum(0)
>   .print()
>
> the version for keyed streams would be:
> stream
>   .keyBy(...)
>   .timeWindow(Time.of(5,TimeUnit.SECONDS))
>   // or .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
>   .sum(0)
>   .print()
>
> I hope this helps. :D
>
> Cheers,
> Aljoscha
>
>
> On Wed, 7 Oct 2015 at 16:54 Alexander Kolb <
> alexander.k...@mni.fh-giessen.de> wrote:
>
>> Hi Guys,
>>
>> I'm trying to use the event-time windowing feature. But the windowing
>> does not work as expected.
>>
>> What I've been doing is to write my own source which implements the
>> EventTimeSourceFunction and uses the collectWithTimeStamp  method.
>> Additionally I'm emitting a watermark after each element.
>>
>> My job to test this looks like this:
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> val stream = env.addSource(SourceWithEventTime)
>>
>> stream
>>   .window(Time.of(5,TimeUnit.SECONDS))
>>   .sum(0)
>>   .flatten()
>>   .print()
>>
>> env.execute()
>>
>> The Input are some tuples with TimeStamps set 10 seconds apart:
>>
>> value: (1,test) timestamp: 1444228980390
>> value: (2,foo) timestamp: 1444228990390
>> value: (3,bar) timestamp: 1444229000390
>>
>> What I'm expecting is that each tuple goes into a separate window.
>> The actual output is the sum of all tuples, hence all tuples are
>> collected in the same window.
>>
>> Thanks in advance!
>> Alex
>>
>>
>>

Reply via email to