Yes, you can do that. You would have to define a custom trigger.
Alternatively, you can also generate more conservative watermarks. That
would have the same effect.

Best, Fabian

2018-02-16 12:25 GMT+01:00 Björn Zachrisson <bjo...@gmail.com>:

> Hi Fabian,
>
> It does not since my events are out of order within a certain interval and
> removing allowedLateness reduces the elements processed with 99.5%.
> Is it possible to trigger the window first when the allowed latness value
> has been passed?
>
>
> Regards
> Björn Zachrisson
>
> On 16 February 2018 at 12:17, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Björn,
>>
>> You configured an allowed lateness, so this might be caused by late
>> arriving data.
>> In case a late record is received within the allowed lateness, the window
>> function will fire again for the same window.
>> Does that also happen if you remove the allowedLateness() call?
>>
>> Best, Fabian
>>
>>
>> 2018-02-16 12:05 GMT+01:00 Björn Zachrisson <bjo...@gmail.com>:
>>
>>> Hi,
>>>
>>> I have something weird probably a user error :)
>>>
>>> I'm running a keyby on multiple elements
>>>
>>> val keyedStream = nonKeyedStream
>>>   .keyBy(m => (m.1, m.2, m.3.getOrElse(-1), m.4))
>>>
>>> then apply a window function
>>>
>>> val appliedWindow = keyedStream
>>>   .timeWindow(minutes(WindowTimeMinutes))
>>>   .allowedLateness(minutes(WindowDelayMinutes))
>>>   .apply(new windowFunc1)
>>>
>>>
>>> This is my first action in apply function
>>>
>>> override def apply(key: (Int,Long,Int,Int), window: TimeWindow, input: 
>>> Iterable[T4], out: Collector[T4]): Unit = {
>>>   myClass.addKey(key,window)
>>>
>>>
>>> And in this class i have a function
>>>
>>> object myClass{
>>>   val keyHash = new mutable.HashMap[(Int,Long,Int,Int),TimeWindow]()
>>>
>>>
>>>   def addKey(key: (Int,Long,Int,Int), window : TimeWindow) : Unit = {
>>>     if(keyHash.contains(key)){
>>>       printf("Multiple key found for: " + key + "\n")
>>>       printf(keyHash(key) + "\n")
>>>       printf(window + "\n")
>>>     }
>>>     keyHash.put(key,window)
>>>   }
>>> }
>>>
>>>
>>> This outputs
>>> Multiple key found for: (1,2,3,4)
>>> TimeWindow{start=1518268800000, end=1518270000000}
>>> TimeWindow{start=1518268800000, end=1518270000000}
>>>
>>>
>>> So it seems that the keyby is not doing what is is supposed to or I am
>>> messing things up somewhere but I cannot seem to find it.
>>>
>>> Regards
>>> Björn
>>>
>>>
>>>
>>
>

Reply via email to