Hi Johannes,

You are right. You should approach the problem with the semantics that you
need before thinking about optimizations such as state size.

The Table API / SQL offers (in v1.5.0) two types of joins:
1) Windowed joins where each record joins with records in a time-range of
the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)"
2) Non-windowed joins, which support arbitrary join predicates but which
fully materialize both inputs. As you mentioned, you can use idle state
retention to remove records from state that have not been accessed for a
certain time.

Best, Fabian

2018-06-18 11:09 GMT+02:00 Johannes Schulte <[email protected]>:

> Hi Fabian,
>
> thanks for the hints, though I somehow got the feeling that I am on the
> wrong track given how much code I would need to write for implementing a
> "blueprint" usecase.
>
> Would a join be more simple using the Table API? In the end it's the
> classical Order & OrderPosition example, where the output is an
> upsert-stream. Would I get the expected behaviour (output elements on every
> update on either side of the input stream). I realize that my session
> window approach wasn't driven by the requirements but by operational
> aspects (state size), so using a concept like idle state retention time
> would be a more natural fit.
>
> Thanks,
>
> Johannes
>
> On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <[email protected]> wrote:
>
>> Hi Johannes,
>>
>> EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default
>> trigger (see EventTimeSessionWindows.getDefaultTrigger()).
>>
>> I would take the EventTimeTrigger and extend it with early firing
>> functionality.
>> However, there are a few things to consider
>> * you need to be aware that session window can be merged, i.e., two
>> session windows A, B with gap 10: A [20,25), B [37, 45), will be merged
>> when a record at 32 is received.
>> * windows store all records in a list. For every firing, you need to
>> iterate the full list and also track which records you joined already to
>> avoid duplicates. Maybe you can migrate records from the window state into
>> a custom state defined in a ProcessWindowFunction.
>>
>> Best, Fabian
>>
>>
>>
>>
>>
>> 2018-06-13 13:43 GMT+02:00 Johannes Schulte <[email protected]>:
>>
>>> Hi,
>>>
>>> I am joining two streams with a session window and want to emit a joined
>>> (early) result for every element arriving on one of the streams.
>>>
>>> Currently the code looks like this:
>>>
>>> s1.join(s2)
>>> .where(s1.id).equalTo(s2.id)
>>> .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
>>> // trigger(?)
>>> .apply(...custom code..)
>>>
>>> What I am missing is the right trigger ala "withEarlyFiring" - do I need
>>> to implement my on trigger for this and if yes, what kind of functionality
>>> must be present to not break the session window semantics?
>>>
>>> Thanks in advance,
>>>
>>> Johannes
>>>
>>>
>>

Reply via email to