Theo Diefenthal created FLINK-19382:
---------------------------------------
Summary: Introducing ReplayableSourceStateBackend
Key: FLINK-19382
URL: https://issues.apache.org/jira/browse/FLINK-19382
Project: Flink
Issue Type: Improvement
Reporter: Theo Diefenthal
I got the idea of a new StateBackend simply called "ReplayableSource". This
statebackend would be bound by a number of limitations, but in the few areas,
it could improve the pipeline performance by magnitudes which makes me think
it's worth debating about it.
I'd like to start with describing two useful scenarios for such a state backend
before debating more about the backend. Both scenarios share that they read
data from kafka and process that via Flink.
Scenario 1: Buffering data. Currently I'm developing a pipeline where directly
post to reading the data, I need to buffer it for 1 minute in event time. This
is due to inter-event-dependencies: If within one minute a certain event
arrives, I have to enrich information to the first event. Right now, I store
the 1 minute event time data in Flink State leading to checkpointing the entire
buffer on each checkpoint and making it impossible for me to use exactly-once
processing (We want to have as low latency as possible, i.e. after buffering at
maximum a few seconds while simoulatenously having high volume of data).
Scenario 2: Performing Flink SQL CEP Queries. CEP Queries naturally have
inter-event-dependencies. Having simple MATCH_RECOGNIZE queries directly post
to a kafka source often lead to requiring RocksDB state backend and slow
performance.
The idea: Instead of storing the entire state, we could simply store a kafka
offset. When restoring the state from savepoint, all the state could be
restored by reading from kafka. The checkpoint size would thus be reduced from
huge sizes down to just a few numbers which allows frequent and fast
checkpointing.
Limitations:
* This would only work for fully deterministic/replayable streaming jobs. If a
certain operator within the pipeline is not determinstic, a replay could cause
another result.
* The source must be replayable, e.g. kafka
* This would also only work for "short-state-living" pipelines. There are many
pipelines which build up their state over days, month or even years. Restoring
such a state by replaying all the data over that time would be almost
impossible, especially as kafka usually has a retention of something like a
week configured. However, there are also many queries with short-lived-state
like the mentioned CEP usecase where one usually have patterns defined in
timeframes of second, minutes, hours or a few days for the event correlation.
* Not sure if there are more limitations with regards to windowing/watermarks
or similar things which would make that feature impossible!?
For certain scenarios, this feature would obviously be dumb, most likely for
windows pipelines. It is certainly much cheapter to e.g. store a COUNT per
window then replaying all events per window in order to restore that COUNT. But
I'm focusing on something like CEP.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)