This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.0 by this push:
new dbd15de3fd [#5199] feat(client-python): add distribution serdes (#8488)
dbd15de3fd is described below
commit dbd15de3fd79b4e759cdc3008a9db6a6e5a5a43c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Sep 10 14:59:24 2025 +0800
[#5199] feat(client-python): add distribution serdes (#8488)
### What changes were proposed in this pull request?
This PR is aimed at implementing the following classes corresponding to
the Java client.
JsonUtils.java
- DistributionSerializer
- DistributionDeserializer
### Why are the changes needed?
We need to support table partitioning, bucketing and sort ordering and
indexes.
#5199
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
Signed-off-by: George T. C. Lai <[email protected]>
Co-authored-by: George T. C. Lai <[email protected]>
---
.../dto/rel/json_serdes/distribution_serdes.py | 55 ++++++++++
.../unittests/dto/rel/test_distribution_serdes.py | 114 +++++++++++++++++++++
2 files changed, 169 insertions(+)
diff --git
a/clients/client-python/gravitino/dto/rel/json_serdes/distribution_serdes.py
b/clients/client-python/gravitino/dto/rel/json_serdes/distribution_serdes.py
new file mode 100644
index 0000000000..84e88642b9
--- /dev/null
+++ b/clients/client-python/gravitino/dto/rel/json_serdes/distribution_serdes.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from typing import Any
+
+from gravitino.api.expressions.distributions.strategy import Strategy
+from gravitino.api.types.json_serdes.base import JsonSerializable
+from gravitino.dto.rel.distribution_dto import DistributionDTO
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import
SerdesUtils
+from gravitino.utils.precondition import Precondition
+from gravitino.utils.serdes import SerdesUtilsBase
+
+
+class DistributionSerDes(SerdesUtilsBase, JsonSerializable[DistributionDTO]):
+ """Custom JSON deserializer for DistributionDTO objects."""
+
+ @classmethod
+ def serialize(cls, data_type: DistributionDTO) -> dict[str, Any]:
+ return {
+ cls.STRATEGY: data_type.strategy().name.lower(),
+ cls.NUMBER: data_type.number(),
+ cls.FUNCTION_ARGS: [
+ SerdesUtils.write_function_arg(arg) for arg in data_type.args()
+ ],
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> DistributionDTO:
+ Precondition.check_argument(
+ isinstance(data, dict) and len(data) > 0,
+ f"Cannot parse distribution from invalid JSON: {data}",
+ )
+ strategy_data = data.get(cls.STRATEGY, Strategy.HASH.value)
+ return DistributionDTO(
+ strategy=Strategy(strategy_data.upper()),
+ number=data[cls.NUMBER],
+ args=[
+ SerdesUtils.read_function_arg(arg) for arg in
data[cls.FUNCTION_ARGS]
+ ],
+ )
diff --git
a/clients/client-python/tests/unittests/dto/rel/test_distribution_serdes.py
b/clients/client-python/tests/unittests/dto/rel/test_distribution_serdes.py
new file mode 100644
index 0000000000..067a9083ed
--- /dev/null
+++ b/clients/client-python/tests/unittests/dto/rel/test_distribution_serdes.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from dataclasses import dataclass, field
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.expressions.distributions.strategy import Strategy
+from gravitino.dto.rel.distribution_dto import DistributionDTO
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import
SerdesUtils
+from gravitino.dto.rel.json_serdes.distribution_serdes import
DistributionSerDes
+from gravitino.exceptions.base import IllegalArgumentException
+
+
+@dataclass
+class MockDataClass(DataClassJsonMixin):
+ distribution: DistributionDTO = field(
+ metadata=config(
+ encoder=DistributionSerDes.serialize,
+ decoder=DistributionSerDes.deserialize,
+ )
+ )
+
+
+class TestDistributionSerdes(unittest.TestCase):
+ def test_distribution_serdes_invalid_json(self):
+ json_strings = [
+ '{"distribution": {}}',
+ '{"distribution": ""}',
+ ]
+ for json_string in json_strings:
+ with self.assertRaisesRegex(
+ IllegalArgumentException, "Cannot parse distribution from
invalid JSON"
+ ):
+ MockDataClass.from_json(json_string)
+
+ def test_distribution_serdes(self):
+ json_string = """
+ {
+ "distribution": {
+ "strategy": "even",
+ "number": 32,
+ "funcArgs": [
+ {
+ "type": "field",
+ "fieldName": ["id"]
+ }
+ ]
+ }
+ }
+ """
+ mock_data_class = MockDataClass.from_json(json_string)
+ distribution = mock_data_class.distribution
+ self.assertIs(Strategy.EVEN, distribution.strategy())
+ self.assertEqual(32, distribution.number())
+ self.assertListEqual(
+ distribution.args(),
+ [
+ SerdesUtils.read_function_arg(arg)
+ for arg in json.loads(json_string)["distribution"][
+ DistributionSerDes.FUNCTION_ARGS
+ ]
+ ],
+ )
+
+ serialized = mock_data_class.to_json()
+ self.assertDictEqual(json.loads(json_string), json.loads(serialized))
+
+ def test_distribution_serdes_without_strategy(self):
+ json_string = """
+ {
+ "distribution": {
+ "number": 4,
+ "funcArgs": [
+ {
+ "type": "field",
+ "fieldName": ["id"]
+ }
+ ]
+ }
+ }
+ """
+ mock_data_class = MockDataClass.from_json(json_string)
+ serialized_dict = json.loads(mock_data_class.to_json())
+
+ distribution_dto = mock_data_class.distribution
+ distribution_dict = serialized_dict["distribution"]
+ self.assertIs(Strategy.HASH, distribution_dto.strategy())
+ self.assertEqual(
+ distribution_dict[DistributionSerDes.NUMBER],
distribution_dto.number()
+ )
+ self.assertListEqual(
+ distribution_dto.args(),
+ [
+ SerdesUtils.read_function_arg(arg)
+ for arg in distribution_dict[DistributionSerDes.FUNCTION_ARGS]
+ ],
+ )