Kerem Ulutaş created FLINK-13721:
------------------------------------
Summary: BroadcastState should support StateTTL
Key: FLINK-13721
URL: https://issues.apache.org/jira/browse/FLINK-13721
Project: Flink
Issue Type: Improvement
Components: API / DataStream, Runtime / Queryable State
Affects Versions: 1.8.1
Environment: MacOS 10.14.6 running IntelliJ Idea Ultimate 2019.2,
Flink version 1.8.1
Reporter: Kerem Ulutaş
Attachments: DebugBroadcastStateTTL.java, IntHolder.java,
StringHolder.java, flink_broadcast_state_ttl_debug.log
Hi everyone,
Sorry if I'm doing anything wrong, this is my first issue in Apache Jira.
I have a use case which requires 2 data streams to be cross joined. To be
exact, one stream is location updates from clients and the other stream is
event data with location information. I'm trying to get events that happen
within a certain radius of client location(s).
Since the streams can not be related to each other by using a common key for
partitioning, I have to broadcast client updates to all tasks and evaluate the
radius check for each event.
The requirement here is, if we don't receive any location updates from a client
for a certain amount of time, we should consider the client is "gone" (similar
to the rationale stated in FLINK-3089 description:
https://issues.apache.org/jira/browse/FLINK-3089)
I have attached the sample application classes I used to debug BroadcastState
and StateTTL together.
The output (see flink_broadcast_state_ttl_debug.log) shows that client "c0" got
its first event at 17:08:07.67 (expected to be evicted sometime after
17:08:37.xxx) but doesn't get evicted.
For the operator part (which is the result of
BroadcastConnectedStream<IntHolder, StringHolder>.process) - since context in
onTimer method doesn't let user to change contents of the broadcast state, only
way to deal with stale client data is as follows:
* In processElement method, calculate result for client data which is newer
than the ttl
* In processBroadcastElement method, remove client data older than a certain
amount of time from the broadcast state
If broadcast side of the connected streams doesn't get data for longer than the
desired time-to-live amount of time, BroadcastState will hold stale data and
processElement method would have to filter those client data each time. This is
the method I am using in production right now.
I am not aware of any decision or limitation that makes it not possible to
implement StateTTL for BroadcastState, I will be pleased if someone explains if
there are any.
Thanks and regards.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)