[ https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851346#comment-17851346 ]
Ying Z commented on FLINK-35502: -------------------------------- [~roman] My usage scenario is as follows: The checkpoints of the task are stored in $state.checkpoints.dir, and the number of them is determined by $state.checkpoints.num-retained. For example, if checkpoint interval is 5 minutes, and you want to recover from a checkpoint that was made 3 days ago when restarting the task, you would need to save 60/5*24*3 = 864 checkpoints. If the checkpoint frequency is higher, or if you need to recover from an earlier time, state.checkpoints.num-retained will need to be set larger. > compress the checkpoint metadata generated by ZK/ETCD HA Services > ----------------------------------------------------------------- > > Key: FLINK-35502 > URL: https://issues.apache.org/jira/browse/FLINK-35502 > Project: Flink > Issue Type: Improvement > Reporter: Ying Z > Priority: Major > > In the implementation of Flink HA, the metadata of checkpoints is stored in > either Zookeeper (ZK HA) or ETCD (K8S HA), such as: > {code:java} > checkpointID-0000000000000036044: xxxx > checkpointID-0000000000000036045: xxxx > ... > ... {code} > However, neither of these are designed to store excessive amounts of data. If > the > [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained]) > setting is set too large, it can easily cause abnormalities in ZK/ETCD. > The error log when set state.checkpoints.num-retained to 1500: > {code:java} > Caused by: org.apache.flink.util.SerializedThrowable: > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > PUT at: > https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. > Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: > Too long: must have at most 1048576 bytes. Received status: > Status(apiVersion=v1, code=422, > details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must > have at most 1048576 bytes, reason=FieldValueTooLong, > additionalProperties={})l, group=null, kind=ConfigMap, > name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, > additionalProperties=(}), kind=Status, message=ConfigMap > "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 > bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, > resourceVersion=null, selfLink=null, additionalProperties={}), > reason=Invalid, status=Failure, additionalProperties=(}). {code} > In Flink's code, all checkpoint metadata are updated at the same time, and > The checkpoint metadata contains many repeated bytes, therefore it can > achieve a very good compression ratio. > Therefore, I suggest compressing the data when writing checkpoints and > decompressing it when reading, to reduce storage pressure and improve IO > efficiency. > Here is the sample code, and reduce the metadata size from 1M bytes to 30K. > {code:java} > // Map -> Json > ObjectMapper objectMapper = new ObjectMapper(); > String checkpointJson = objectMapper.writeValueAsString(checkpointMap); // // > copress and base64 > String compressedBase64 = compressAndEncode(checkpointJson); > compressedData.put("checkpoint-all", compressedBase64);{code} > {code:java} > private static String compressAndEncode(String data) throws IOException { > ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); > try (GZIPOutputStream gzipOutputStream = new > GZIPOutputStream(outputStream)) > { gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); > } > byte[] compressedData = outputStream.toByteArray(); > return Base64.getEncoder().encodeToString(compressedData); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)