Hi everyone

I have an KeyedBroadcastProcessFunction with a broadcast state (a bunch of
rules), and I have set up a few gauge metrics on that state (things such as
number of known rules and timestamp of the last rule received). However, I
have on an issue when the server restarts from a checkpoint or a savepoint:
metrics values are not restored.

That's nothing anomalous: the fields used in the metrics are transient, not
part of the state (I have followed this doc:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types).
The fields will be reset to the proper value in the next call to
processBroadcastElement(), but that's not enough for my use case: rules
updates aren't that frequent (it could be minutes or even hours before the
next one). We can't have the metrics offline for that long.

Is there any way to reset those fields without waiting for the next
messages to arrive? The open() method doesn't have access to the broadcast
state, so I can't do it there. I could do it in processElement() (normal
element are much more frequent than rules), but it's far from ideal:
- it would be done again and again for every single element received, which
is overkill;
- it could only update the metric on the current subtask, not the others,
so one subtask could lag behind.

Am I missing something here ? Is there any way to trigger a reset of the
value when the broadcast state is reconstructed ?

Thanks for any help,
Gaël Renoux

Reply via email to