[ 
https://issues.apache.org/jira/browse/FLINK-35502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Z updated FLINK-35502:
---------------------------
    Description: 
In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:


{code:java}
checkpointID-0000000000000036044: xxxx
checkpointID-0000000000000036045: xxxx
...
... {code}
However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:
{code:java}
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}). {code}
In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
{code:java}
                // Map -> Json
                ObjectMapper objectMapper = new ObjectMapper();
                String checkpointJson = 
objectMapper.writeValueAsString(checkpointMap);
                // copress and base64
                String compressedBase64 = compressAndEncode(checkpointJson);
                compressedData.put("checkpoint-all", compressedBase64);
    private static String compressAndEncode(String data) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(outputStream))
{             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));    
     }
        byte[] compressedData = outputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedData);
    } {code}

  was:
In the implementation of Flink HA, the metadata of checkpoints is stored in 
either Zookeeper (ZK HA) or ETCD (K8S HA), such as:

```
checkpointID-0000000000000036044: xxxx
checkpointID-0000000000000036045: xxxx
...
...
```

However, neither of these are designed to store excessive amounts of data. If 
the 
[state.checkpoints.num-retained](https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained)
 setting is set too large, it can easily cause abnormalities in ZK/ETCD. 

The error log when set state.checkpoints.num-retained to 1500:

```
Caused by: org.apache.flink.util.SerializedThrowable: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: PUT 
at: https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
Too long: must have at most 1048576 bytes. Received status: 
Status(apiVersion=v1, code=422, 
details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must have 
at most 1048576 bytes, reason=FieldValueTooLong, additionalProperties={})l, 
group=null, kind=ConfigMap, name=xxx-jobmanager-leader, retryAfterSeconds=null, 
uid=null, additionalProperties=(}), kind=Status, message=ConfigMap 
"xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, 
status=Failure, additionalProperties=(}).
```

In Flink's code, all checkpoint metadata are updated at the same time, and The 
checkpoint metadata contains many repeated bytes, therefore it can achieve a 
very good compression ratio.

Therefore, I suggest compressing the data when writing checkpoints and 
decompressing it when reading, to reduce storage pressure and improve IO 
efficiency.

Here is the sample code, and reduce the metadata size from 1M bytes to 30K.

```java
                // Map -> Json
                ObjectMapper objectMapper = new ObjectMapper();
                String checkpointJson = 
objectMapper.writeValueAsString(checkpointMap);
                // copress and base64
                String compressedBase64 = compressAndEncode(checkpointJson);
                compressedData.put("checkpoint-all", compressedBase64);

    private static String compressAndEncode(String data) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(outputStream)) {
            gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));
        }
        byte[] compressedData = outputStream.toByteArray();
        return Base64.getEncoder().encodeToString(compressedData);
    }
```


> compress the checkpoint metadata generated by ZK/ETCD HA Services
> -----------------------------------------------------------------
>
>                 Key: FLINK-35502
>                 URL: https://issues.apache.org/jira/browse/FLINK-35502
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Ying Z
>            Priority: Major
>
> In the implementation of Flink HA, the metadata of checkpoints is stored in 
> either Zookeeper (ZK HA) or ETCD (K8S HA), such as:
> {code:java}
> checkpointID-0000000000000036044: xxxx
> checkpointID-0000000000000036045: xxxx
> ...
> ... {code}
> However, neither of these are designed to store excessive amounts of data. If 
> the 
> [state.checkpoints.num-retained]([https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/deployment/config/#state-checkpoints-num-retained])
>  setting is set too large, it can easily cause abnormalities in ZK/ETCD. 
> The error log when set state.checkpoints.num-retained to 1500:
> {code:java}
> Caused by: org.apache.flink.util.SerializedThrowable: 
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: 
> PUT at: 
> https://xxx/api/v1/namespaces/default/configmaps/xxx-jobmanager-leader. 
> Message: ConfigMap "xxx-jobmanager-leader" is invalid: 0J:
> Too long: must have at most 1048576 bytes. Received status: 
> Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=(StatusCause(field=[J, message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})l, group=null, kind=ConfigMap, 
> name=xxx-jobmanager-leader, retryAfterSeconds=null, uid=null, 
> additionalProperties=(}), kind=Status, message=ConfigMap 
> "xxx-jobmanager-leader" is invalid: [): Too long: must have at most 1048576 
> bytes, metadata=ListMeta(_continue=null, remainingItemCount=null, 
> resourceVersion=null, selfLink=null, additionalProperties={}), 
> reason=Invalid, status=Failure, additionalProperties=(}). {code}
> In Flink's code, all checkpoint metadata are updated at the same time, and 
> The checkpoint metadata contains many repeated bytes, therefore it can 
> achieve a very good compression ratio.
> Therefore, I suggest compressing the data when writing checkpoints and 
> decompressing it when reading, to reduce storage pressure and improve IO 
> efficiency.
> Here is the sample code, and reduce the metadata size from 1M bytes to 30K.
> {code:java}
>                 // Map -> Json
>                 ObjectMapper objectMapper = new ObjectMapper();
>                 String checkpointJson = 
> objectMapper.writeValueAsString(checkpointMap);
>                 // copress and base64
>                 String compressedBase64 = compressAndEncode(checkpointJson);
>                 compressedData.put("checkpoint-all", compressedBase64);
>     private static String compressAndEncode(String data) throws IOException {
>         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
>         try (GZIPOutputStream gzipOutputStream = new 
> GZIPOutputStream(outputStream))
> {             gzipOutputStream.write(data.getBytes(StandardCharsets.UTF_8));  
>        }
>         byte[] compressedData = outputStream.toByteArray();
>         return Base64.getEncoder().encodeToString(compressedData);
>     } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to