Hi! I dont think there is any ongoing effort in core Flink other than this library we created.
You are probably right that it is pretty hacky at the moment. I would say this one way we could do it that seemed convenient to me at the time I have written the code. If you have ideas how to structure it better or improve it, you know where to find the code, feel free to open a PR :) That might actually takes us closer to having this properly in flink one day soon. Just to clarify the code you are showing: writer.writeAll() -> Runs the batch job that writes the checkpoint files for the changed operator states, returns the reference to the OperatorState metadata object StateMetadataUtils.createNewSavepoint() -> Replaces the metadata for the operator states you have just written in the previous savepoint StateMetadataUtils.writeSavepointMetadata() -> Writes a new metadata file So metadata writing happens as the very last step after the batch job has run. This is similar to how it works in streaming jobs in the sense there the jobmanager writes the metafile after the checkpointing is done. The downside of this approach is that the client might not have access to write the metafile here. Gyula