[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Z updated FLINK-35502:
---------------------------
    Summary: compress the checkpoint metadata generated by ZK/ETCD HA Services  
(was: compress the checkpoint metadata ZK/ETCD HA Services)

> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -----------------------------------------------------------------
>
>                 Key: FLINK-35502
>                 URL: https://issues.apache.org/jira/browse/FLINK-35502
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Ying Z
>            Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> ```
> checkpointID-0000000000000036044: xxxx
> checkpointID-0000000000000036045: xxxx
> ...
> ...
> ```
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> ```
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}).
> ```
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> ```java
>                 // Map -> Json
>                 ObjectMapper objectMapper = new ObjectMapper();
>                 String checkpointJson = 
> objectMapper.writeValueAsString(checkpointMap);
>                 // copress and base64
>                 String compressedBase64 = compressAndEncode(checkpointJson);
>                 compressedData.put("checkpoint-all", compressedBase64);
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream)) {
>             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
>         }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to