[ 
https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561215#comment-16561215
 ] 

Elias Levy commented on FLINK-6243:
-----------------------------------

Stephan, thanks for bringing FLINK-8478 to my attention.  Alas, while getting 
closer to meeting our join requirements, it does not quite fulfill them.  

Our joins require the semantics of joining two upsert tables, i.e. only joining 
the latest value by key.  The DataStream Interval Join being implemented does 
not support those semantics, as it will buffer and join all elements for a key 
that fall within the interval.  Seems the upsert semantics could be implemented 
by changing the state from a {{MapState}} buffering multiple events per key to 
a {{ValueState}}, keeping the latest event per event time.

We also need these joins to be outer joins, but I see that there is already a 
subtask to implement those (FLINK-8483).

Finally, we also need to implement a join between two streams where one stream 
is keyed by a subset of other stream's composite key (e.g. the left stream is 
keyed by {{col1}} and the right stream by ({{col1}}, {{col2)}}), also with 
upsert semantics.  This could be implemented by keying both streams by 
{{col1}}, keeping a ValueState for the left stream buffering the latest event, 
and using a MapState on the right stream keyed by {{col2}} buffering the latest 
event per ({{col1}}, {{col2}}) tuple. 
 
Maybe something like:
{code:scala}
leftStream
  .keyKey(_.col1)
  .upsertJoin(rightStream.keyKey(_.col1).subKey(_.col2))
  .between(...)
  .process(...)
{code}

Looking at the implementation, I also worry that the clean up timers are not 
being coalesced, which may result in high overhead processing the clean up 
timers for high throughput streams.

> Continuous Joins:  True Sliding Window Joins
> --------------------------------------------
>
>                 Key: FLINK-6243
>                 URL: https://issues.apache.org/jira/browse/FLINK-6243
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>    Affects Versions: 1.1.4
>            Reporter: Elias Levy
>            Priority: Major
>
> Flink defines sliding window joins as the join of elements of two streams 
> that share a window of time, where the windows are defined by advancing them 
> forward some amount of time that is less than the window time span.  More 
> generally, such windows are just overlapping hopping windows. 
> Other systems, such as Kafka Streams, support a different notion of sliding 
> window joins.  In these systems, two elements of a stream are joined if the 
> absolute time difference between the them is less or equal the time window 
> length.
> This alternate notion of sliding window joins has some advantages in some 
> applications over the current implementation.  
> Elements to be joined may both fall within multiple overlapping sliding 
> windows, leading them to be joined multiple times, when we only wish them to 
> be joined once.
> The implementation need not instantiate window objects to keep track of 
> stream elements, which becomes problematic in the current implementation if 
> the window size is very large and the slide is very small.
> It allows for asymmetric time joins.  E.g. join if elements from stream A are 
> no more than X time behind and Y time head of an element from stream B.
> It is currently possible to implement a join with these semantics using 
> {{CoProcessFunction}}, but the capability should be a first class feature, 
> such as it is in Kafka Streams.
> To perform the join, elements of each stream must be buffered for at least 
> the window time length.  To allow for large window sizes and high volume of 
> elements, the state, possibly optionally, should be buffered such as it can 
> spill to disk (e.g. by using RocksDB).
> The same stream may be joined multiple times in a complex topology.  As an 
> optimization, it may be wise to reuse any element buffer among colocated join 
> operators.  Otherwise, there may write amplification and increased state that 
> must be snapshotted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to