[
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chen Qin updated FLINK-16392:
-----------------------------
Description:
IntervalJoin is getting lots of usecases in our side. Those use cases shares
following similar pattern
* left stream pulled from slow evolving static dataset periodically
* lookup time range is very large (days weeks)
* right stream is web traffic with high QPS
In current interval join implementation, we treat both streams equal and
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb
fetch and update gets more expensive, performance took hit to unblock large use
cases.
In proposed implementation, we plan to introduce two changes
* allow user opt-in use cases like above by customize and inherit from
ProcessJoinFunction.
** whether skip trigger scan from left events(static data set)
** allow set earlier clean up right stream earlier than interval upper-bound
* leverage ram cache on demand build sortedMap from it's otherBuffer for each
join key, in our use cases, it helps
** expedite right stream lookup of left buffers without access rocksdb
everytime (disk -> sorted memory cache)
** if a key see event from left side, it cleanup cache and load cache from
right side
** in worst case scenario, we only see two stream with round robin
processElement1 and processElement2 of same set of keys at same frequency.
Performance is expected to be similar with current implementation, memory
footprint will be bounded by 1/2 state size.
Open discussion
* how to control cache size?
** by default cache size is set to 1 key
* how to avoid dirty cache
** if a given key see insertion from other side, cache will be cleared for
that key and rebuild.
* what happens when checkpoint/restore
** state still persists in statebackend, clear cache and rebuild of each new
key seen.
* how is performance
** Given assumption ram is magnitude faster than ram, this is a small overhead
(<5%) to populate cache, compare with current rocksdb implemenation, we need do
full loop at every event. It saves on bucket scan logic. If key recurring more
than 1 access in same direction on cache, we expect significant perf gain.
was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares
following similar pattern
* left stream pulled from slow evolving static dataset periodically
* lookup time range is very large (days weeks)
* right stream is web traffic with high QPS
In current interval join implementation, we treat both streams equal and
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb
fetch and update gets more expensive, performance took hit to unblock large use
cases.
In proposed implementation, we plan to introduce two changes
* allow user opt-in use cases like above by inherit from ProcessJoinFunction.
** whether skip trigger scan from left events(static data set)
** allow set earlier clean up right stream earlier than interval upper-bound
* leverage ram cache on demand build sortedMap from it's otherBuffer for each
join key, in our use cases, it helps
** expedite right stream lookup of left buffers without access rocksdb
everytime (disk -> sorted memory cache)
** if a key see event from left side, it cleanup cache and load cache from
right side
** in worst case scenario, we only see two stream with round robin
processElement1 and processElement2 of same set of keys at same frequency.
Performance is expected to be similar with current implementation, memory
footprint will be bounded by 1/2 state size.
Open discussion
* how to control cache size?
** by default cache size is set to 1 key
* how to avoid dirty cache
** if a given key see insertion from other side, cache will be cleared for
that key and rebuild.
* what happens when checkpoint/restore
** state still persists in statebackend, clear cache and rebuild of each new
key seen.
* how is performance
** Given assumption ram is magnitude faster than ssd and lot more to spin
disk, this is a small overhead (1% - 5%) to populate cache, compare with
current rocksdb implemenation, we need do full loop at every event. It saves on
bucket scan logic. If key recurring more than 1 access in same direction on
cache, we expect significant perf gain.
> oneside sorted cache in intervaljoin
> ------------------------------------
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Affects Versions: 1.10.0
> Reporter: Chen Qin
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.11.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares
> following similar pattern
> * left stream pulled from slow evolving static dataset periodically
> * lookup time range is very large (days weeks)
> * right stream is web traffic with high QPS
> In current interval join implementation, we treat both streams equal and
> ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb
> fetch and update gets more expensive, performance took hit to unblock large
> use cases.
> In proposed implementation, we plan to introduce two changes
> * allow user opt-in use cases like above by customize and inherit from
> ProcessJoinFunction.
> ** whether skip trigger scan from left events(static data set)
> ** allow set earlier clean up right stream earlier than interval upper-bound
> * leverage ram cache on demand build sortedMap from it's otherBuffer for
> each join key, in our use cases, it helps
> ** expedite right stream lookup of left buffers without access rocksdb
> everytime (disk -> sorted memory cache)
> ** if a key see event from left side, it cleanup cache and load cache from
> right side
> ** in worst case scenario, we only see two stream with round robin
> processElement1 and processElement2 of same set of keys at same frequency.
> Performance is expected to be similar with current implementation, memory
> footprint will be bounded by 1/2 state size.
>
> Open discussion
> * how to control cache size?
> ** by default cache size is set to 1 key
> * how to avoid dirty cache
> ** if a given key see insertion from other side, cache will be cleared for
> that key and rebuild.
> * what happens when checkpoint/restore
> ** state still persists in statebackend, clear cache and rebuild of each new
> key seen.
> * how is performance
> ** Given assumption ram is magnitude faster than ram, this is a small
> overhead (<5%) to populate cache, compare with current rocksdb implemenation,
> we need do full loop at every event. It saves on bucket scan logic. If key
> recurring more than 1 access in same direction on cache, we expect
> significant perf gain.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)