[ https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ying Z updated FLINK-35502: --------------------------- Summary: compress the checkpoint metadata generated by ZK/ETCD HA Services (was: compress the checkpoint metadata ZK/ETCD HA Services) > compress the checkpoint metadata generated by ZK/ETCD HA Services > ----------------------------------------------------------------- > > Key: FLINK-35502 > URL: https://issues.apache.org/jira/browse/FLINK-35502 > Project: Flink > Issue Type: Improvement > Reporter: Ying Z > Priority: Major > > In the implementation of Flink HA, the metadata of checkpoints is stored in > either Zookeeper (ZK HA) or ETCD (K8S HA), such as: > ``` > checkpointID-0000000000000036044: xxxx > checkpointID-0000000000000036045: xxxx > ... > ... > ``` > However, neither of these are designed to store excessive amounts of data. If > the > [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained) > setting is set too large, it can easily cause abnormalities in ZK/ETCD. > The error log when set state.checkpoints.num-retained to 1500: > ``` > Caused by: org.apache.flink.util.SerializedThrowable: > io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: > PUT at: > https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. > Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J: > Too long: must have at most 1048576 bytes. Received status: > Status(apiVersion=v1, code=422, > details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must > have at most 1048576 bytes, reason=FieldValueTooLong, > additionalProperties={})l, group=null, kind=ConfigMap, > name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, > additionalProperties=(}), kind=Status, message=ConfigMap > "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 > bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, > resourceVersion=null, selfLink=null, additionalProperties={}), > reason=Invalid, status=Failure, additionalProperties=(}). > ``` > In Flink's code, all checkpoint metadata are updated at the same time, and > The checkpoint metadata contains many repeated bytes, therefore it can > achieve a very good compression ratio. > Therefore, I suggest compressing the data when writing checkpoints and > decompressing it when reading, to reduce storage pressure and improve IO > efficiency. > Here is the sample code, and reduce the metadata size from 1M bytes to 30K. > ```java > // Map -> Json > ObjectMapper objectMapper = new ObjectMapper(); > String checkpointJson = > objectMapper.writeValueAsString(checkpointMap); > // copress and base64 > String compressedBase64 = compressAndEncode(checkpointJson); > compressedData.put("checkpoint-all", compressedBase64); > private static String compressAndEncode(String data) throws IOException { > ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); > try (GZIPOutputStream gzipOutputStream = new > GZIPOutputStream(outputStream)) { > gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8)); > } > byte[] compressedData = outputStream.toByteArray(); > return Base64.getEncoder().encodeToString(compressedData); > } > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)