This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.0 by this push:
     new dbd15de3fd [#5199] feat(client-python): add distribution serdes (#8488)
dbd15de3fd is described below

commit dbd15de3fd79b4e759cdc3008a9db6a6e5a5a43c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Sep 10 14:59:24 2025 +0800

    [#5199] feat(client-python): add distribution serdes (#8488)
    
    ### What changes were proposed in this pull request?
    
    This PR is aimed at implementing the following classes corresponding to
    the Java client.
    
    JsonUtils.java
    
    - DistributionSerializer
    - DistributionDeserializer
    
    ### Why are the changes needed?
    
    We need to support table partitioning, bucketing and sort ordering and
    indexes.
    
    #5199
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    Signed-off-by: George T. C. Lai <[email protected]>
    Co-authored-by: George T. C. Lai <[email protected]>
---
 .../dto/rel/json_serdes/distribution_serdes.py     |  55 ++++++++++
 .../unittests/dto/rel/test_distribution_serdes.py  | 114 +++++++++++++++++++++
 2 files changed, 169 insertions(+)

diff --git 
a/clients/client-python/gravitino/dto/rel/json_serdes/distribution_serdes.py 
b/clients/client-python/gravitino/dto/rel/json_serdes/distribution_serdes.py
new file mode 100644
index 0000000000..84e88642b9
--- /dev/null
+++ b/clients/client-python/gravitino/dto/rel/json_serdes/distribution_serdes.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from typing import Any
+
+from gravitino.api.expressions.distributions.strategy import Strategy
+from gravitino.api.types.json_serdes.base import JsonSerializable
+from gravitino.dto.rel.distribution_dto import DistributionDTO
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import 
SerdesUtils
+from gravitino.utils.precondition import Precondition
+from gravitino.utils.serdes import SerdesUtilsBase
+
+
+class DistributionSerDes(SerdesUtilsBase, JsonSerializable[DistributionDTO]):
+    """Custom JSON deserializer for DistributionDTO objects."""
+
+    @classmethod
+    def serialize(cls, data_type: DistributionDTO) -> dict[str, Any]:
+        return {
+            cls.STRATEGY: data_type.strategy().name.lower(),
+            cls.NUMBER: data_type.number(),
+            cls.FUNCTION_ARGS: [
+                SerdesUtils.write_function_arg(arg) for arg in data_type.args()
+            ],
+        }
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> DistributionDTO:
+        Precondition.check_argument(
+            isinstance(data, dict) and len(data) > 0,
+            f"Cannot parse distribution from invalid JSON: {data}",
+        )
+        strategy_data = data.get(cls.STRATEGY, Strategy.HASH.value)
+        return DistributionDTO(
+            strategy=Strategy(strategy_data.upper()),
+            number=data[cls.NUMBER],
+            args=[
+                SerdesUtils.read_function_arg(arg) for arg in 
data[cls.FUNCTION_ARGS]
+            ],
+        )
diff --git 
a/clients/client-python/tests/unittests/dto/rel/test_distribution_serdes.py 
b/clients/client-python/tests/unittests/dto/rel/test_distribution_serdes.py
new file mode 100644
index 0000000000..067a9083ed
--- /dev/null
+++ b/clients/client-python/tests/unittests/dto/rel/test_distribution_serdes.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from dataclasses import dataclass, field
+
+from dataclasses_json import DataClassJsonMixin, config
+
+from gravitino.api.expressions.distributions.strategy import Strategy
+from gravitino.dto.rel.distribution_dto import DistributionDTO
+from gravitino.dto.rel.expressions.json_serdes._helper.serdes_utils import 
SerdesUtils
+from gravitino.dto.rel.json_serdes.distribution_serdes import 
DistributionSerDes
+from gravitino.exceptions.base import IllegalArgumentException
+
+
+@dataclass
+class MockDataClass(DataClassJsonMixin):
+    distribution: DistributionDTO = field(
+        metadata=config(
+            encoder=DistributionSerDes.serialize,
+            decoder=DistributionSerDes.deserialize,
+        )
+    )
+
+
+class TestDistributionSerdes(unittest.TestCase):
+    def test_distribution_serdes_invalid_json(self):
+        json_strings = [
+            '{"distribution": {}}',
+            '{"distribution": ""}',
+        ]
+        for json_string in json_strings:
+            with self.assertRaisesRegex(
+                IllegalArgumentException, "Cannot parse distribution from 
invalid JSON"
+            ):
+                MockDataClass.from_json(json_string)
+
+    def test_distribution_serdes(self):
+        json_string = """
+            {
+                "distribution": {
+                    "strategy": "even",
+                    "number": 32,
+                    "funcArgs": [
+                        {
+                            "type": "field",
+                            "fieldName": ["id"]
+                        }
+                    ]
+                }
+            }
+        """
+        mock_data_class = MockDataClass.from_json(json_string)
+        distribution = mock_data_class.distribution
+        self.assertIs(Strategy.EVEN, distribution.strategy())
+        self.assertEqual(32, distribution.number())
+        self.assertListEqual(
+            distribution.args(),
+            [
+                SerdesUtils.read_function_arg(arg)
+                for arg in json.loads(json_string)["distribution"][
+                    DistributionSerDes.FUNCTION_ARGS
+                ]
+            ],
+        )
+
+        serialized = mock_data_class.to_json()
+        self.assertDictEqual(json.loads(json_string), json.loads(serialized))
+
+    def test_distribution_serdes_without_strategy(self):
+        json_string = """
+            {
+                "distribution": {
+                    "number": 4,
+                    "funcArgs": [
+                        {
+                            "type": "field",
+                            "fieldName": ["id"]
+                        }
+                    ]
+                }
+            }
+        """
+        mock_data_class = MockDataClass.from_json(json_string)
+        serialized_dict = json.loads(mock_data_class.to_json())
+
+        distribution_dto = mock_data_class.distribution
+        distribution_dict = serialized_dict["distribution"]
+        self.assertIs(Strategy.HASH, distribution_dto.strategy())
+        self.assertEqual(
+            distribution_dict[DistributionSerDes.NUMBER], 
distribution_dto.number()
+        )
+        self.assertListEqual(
+            distribution_dto.args(),
+            [
+                SerdesUtils.read_function_arg(arg)
+                for arg in distribution_dict[DistributionSerDes.FUNCTION_ARGS]
+            ],
+        )

Reply via email to