This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e9c0b31ac [python] Introduce changelog_manager for pypaimon (#7492)
1e9c0b31ac is described below

commit 1e9c0b31ac2e80db4180bb05556eb3a3bf8be30a
Author: xuzifu666 <[email protected]>
AuthorDate: Sat Mar 21 13:30:57 2026 +0800

    [python] Introduce changelog_manager for pypaimon (#7492)
---
 paimon-python/pypaimon/changelog/__init__.py       |  23 ++
 paimon-python/pypaimon/changelog/changelog.py      |  85 +++++
 .../pypaimon/changelog/changelog_manager.py        | 353 +++++++++++++++++++++
 paimon-python/pypaimon/snapshot/snapshot.py        |   4 +
 paimon-python/pypaimon/table/file_store_table.py   |   5 +
 .../pypaimon/tests/changelog_manager_test.py       | 137 ++++++++
 .../pypaimon/tests/table/file_store_table_test.py  |  59 ++++
 7 files changed, 666 insertions(+)

diff --git a/paimon-python/pypaimon/changelog/__init__.py 
b/paimon-python/pypaimon/changelog/__init__.py
new file mode 100644
index 0000000000..a057af07dd
--- /dev/null
+++ b/paimon-python/pypaimon/changelog/__init__.py
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+from pypaimon.changelog.changelog import Changelog
+from pypaimon.changelog.changelog_manager import ChangelogManager
+
+__all__ = ['Changelog', 'ChangelogManager']
diff --git a/paimon-python/pypaimon/changelog/changelog.py 
b/paimon-python/pypaimon/changelog/changelog.py
new file mode 100644
index 0000000000..8e25fdc2e4
--- /dev/null
+++ b/paimon-python/pypaimon/changelog/changelog.py
@@ -0,0 +1,85 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+import logging
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.common.json_util import JSON
+from pypaimon.snapshot.snapshot import Snapshot
+
+logger = logging.getLogger(__name__)
+
+
+class Changelog(Snapshot):
+    """The metadata of changelog.
+
+    Changelog extends Snapshot with the same fields. It generates from the 
snapshot
+    file during expiration so that changelog of table can outlive snapshot's 
lifecycle.
+    """
+
+    @staticmethod
+    def from_snapshot(snapshot: Snapshot) -> 'Changelog':
+        """Create a Changelog from a Snapshot instance."""
+        return Changelog(
+            version=snapshot.version,
+            id=snapshot.id,
+            schema_id=snapshot.schema_id,
+            base_manifest_list=snapshot.base_manifest_list,
+            base_manifest_list_size=snapshot.base_manifest_list_size,
+            delta_manifest_list=snapshot.delta_manifest_list,
+            delta_manifest_list_size=snapshot.delta_manifest_list_size,
+            changelog_manifest_list=snapshot.changelog_manifest_list,
+            changelog_manifest_list_size=snapshot.changelog_manifest_list_size,
+            index_manifest=snapshot.index_manifest,
+            commit_user=snapshot.commit_user,
+            commit_identifier=snapshot.commit_identifier,
+            commit_kind=snapshot.commit_kind,
+            time_millis=snapshot.time_millis,
+            total_record_count=snapshot.total_record_count,
+            delta_record_count=snapshot.delta_record_count,
+            changelog_record_count=snapshot.changelog_record_count,
+            watermark=snapshot.watermark,
+            statistics=snapshot.statistics,
+            properties=snapshot.properties,
+            next_row_id=snapshot.next_row_id
+        )
+
+    @staticmethod
+    def from_json(json_str: str) -> 'Changelog':
+        """Create a Changelog from JSON string."""
+        return JSON.from_json(json_str, Changelog)
+
+    @staticmethod
+    def from_path(file_io: FileIO, path: str) -> 'Changelog':
+        """Create a Changelog from a file path. Raises RuntimeError if file 
doesn't exist."""
+        try:
+            return Changelog.try_from_path(file_io, path)
+        except FileNotFoundError as e:
+            raise RuntimeError(f"Failed to read changelog from path {path}") 
from e
+
+    @staticmethod
+    def try_from_path(file_io: FileIO, path: str) -> 'Changelog':
+        """Create a Changelog from a file path. Raises FileNotFoundError if 
file doesn't exist."""
+        try:
+            json_str = file_io.read_file_utf8(path)
+            return Changelog.from_json(json_str)
+        except FileNotFoundError as e:
+            raise e
+        except Exception as e:
+            raise RuntimeError(f"Failed to read changelog from path {path}") 
from e
diff --git a/paimon-python/pypaimon/changelog/changelog_manager.py 
b/paimon-python/pypaimon/changelog/changelog_manager.py
new file mode 100644
index 0000000000..06f2c1b8ad
--- /dev/null
+++ b/paimon-python/pypaimon/changelog/changelog_manager.py
@@ -0,0 +1,353 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+import logging
+import re
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from typing import Iterator, List, Optional
+
+from pypaimon.branch.branch_manager import BranchManager
+from pypaimon.changelog.changelog import Changelog
+from pypaimon.common.file_io import FileIO
+
+logger = logging.getLogger(__name__)
+
+
+class ChangelogManager:
+    """Manager for Changelog, providing utility methods related to paths and 
changelog hints."""
+
+    CHANGELOG_PREFIX = "changelog-"
+
+    def __init__(self, file_io: FileIO, table_path: str, branch: Optional[str] 
= None):
+        """Initialize ChangelogManager.
+
+        Args:
+            file_io: FileIO instance for file operations
+            table_path: Path to the table directory
+            branch: Branch name (defaults to 'main' if None)
+        """
+        self.file_io = file_io
+        self.table_path = table_path
+        self.branch = BranchManager.normalize_branch(branch) if branch else 
'main'
+
+    def file_io(self) -> FileIO:
+        """Get the FileIO instance."""
+        return self.file_io
+
+    def latest_long_lived_changelog_id(self) -> Optional[int]:
+        """Get the latest long-lived changelog ID.
+
+        Returns:
+            The latest changelog ID, or None if not found
+        """
+        try:
+            return self._find_latest_hint_id(self.changelog_directory())
+        except Exception as e:
+            raise RuntimeError("Failed to find latest changelog id") from e
+
+    def earliest_long_lived_changelog_id(self) -> Optional[int]:
+        """Get the earliest long-lived changelog ID.
+
+        Returns:
+            The earliest changelog ID, or None if not found
+        """
+        try:
+            return self._find_earliest_hint_id(self.changelog_directory())
+        except Exception as e:
+            raise RuntimeError("Failed to find earliest changelog id") from e
+
+    def long_lived_changelog_exists(self, snapshot_id: int) -> bool:
+        """Check if a long-lived changelog exists for the given snapshot ID.
+
+        Args:
+            snapshot_id: Snapshot ID to check
+
+        Returns:
+            True if changelog exists, False otherwise
+        """
+        path = self.long_lived_changelog_path(snapshot_id)
+        try:
+            return self.file_io.exists(path)
+        except Exception as e:
+            raise RuntimeError(f"Failed to determine if changelog 
#{snapshot_id} exists in path {path}") from e
+
+    def long_lived_changelog(self, snapshot_id: int) -> Changelog:
+        """Get a long-lived changelog for the given snapshot ID.
+
+        Args:
+            snapshot_id: Snapshot ID
+
+        Returns:
+            Changelog instance
+        """
+        return Changelog.from_path(self.file_io, 
self.long_lived_changelog_path(snapshot_id))
+
+    def changelog(self, snapshot_id: int) -> Changelog:
+        """Get a changelog for the given snapshot ID.
+
+        Args:
+            snapshot_id: Snapshot ID
+
+        Returns:
+            Changelog instance
+        """
+        changelog_path = self.long_lived_changelog_path(snapshot_id)
+        return Changelog.from_path(self.file_io, changelog_path)
+
+    def long_lived_changelog_path(self, snapshot_id: int) -> str:
+        """Get the path to a long-lived changelog for the given snapshot ID.
+
+        Args:
+            snapshot_id: Snapshot ID
+
+        Returns:
+            Path string to the changelog file
+        """
+        return 
f"{self._branch_path()}/changelog/{self.CHANGELOG_PREFIX}{snapshot_id}"
+
+    def changelog_directory(self) -> str:
+        """Get the changelog directory path.
+
+        Returns:
+            Path string to the changelog directory
+        """
+        return f"{self._branch_path()}/changelog"
+
+    def commit_changelog(self, changelog: Changelog, changelog_id: int) -> 
None:
+        """Commit a changelog to storage.
+
+        Args:
+            changelog: Changelog instance to commit
+            changelog_id: Changelog ID
+        """
+        path = self.long_lived_changelog_path(changelog_id)
+        self.file_io.write_file(path, changelog.to_json(), True)
+
+    def commit_long_lived_changelog_latest_hint(self, snapshot_id: int) -> 
None:
+        """Commit the latest hint for long-lived changelog.
+
+        Args:
+            snapshot_id: Latest snapshot ID
+        """
+        self._commit_latest_hint(snapshot_id, self.changelog_directory())
+
+    def commit_long_lived_changelog_earliest_hint(self, snapshot_id: int) -> 
None:
+        """Commit the earliest hint for long-lived changelog.
+
+        Args:
+            snapshot_id: Earliest snapshot ID
+        """
+        self._commit_earliest_hint(snapshot_id, self.changelog_directory())
+
+    def try_get_changelog(self, snapshot_id: int) -> Optional[Changelog]:
+        """Try to get a changelog for the given snapshot ID.
+
+        Args:
+            snapshot_id: Snapshot ID
+
+        Returns:
+            Changelog instance if found, None otherwise
+
+        Raises:
+            FileNotFoundError: If changelog file is explicitly not found
+        """
+        changelog_path = self.long_lived_changelog_path(snapshot_id)
+        return Changelog.try_from_path(self.file_io, changelog_path)
+
+    def changelogs(self) -> Iterator[Changelog]:
+        """Get an iterator over all changelogs, sorted by ID.
+
+        Yields:
+            Changelog instances in ascending order of ID
+        """
+        for snapshot_id in self._list_changelog_ids():
+            yield self.changelog(snapshot_id)
+
+    def safely_get_all_changelogs(self) -> List[Changelog]:
+        """Safely get all changelogs, handling potential errors gracefully.
+
+        This method reads changelogs in parallel and handles cases where
+        files might be missing or corrupted.
+
+        Returns:
+            List of Changelog instances (may include None for failed reads)
+        """
+        paths = [self.long_lived_changelog_path(sid) for sid in 
self._list_changelog_ids()]
+        changelogs: List[Optional[Changelog]] = [None] * len(paths)
+
+        def read_changelog(index: int, path: str):
+            """Read changelog from path, handling errors gracefully."""
+            try:
+                changelog_str = self.file_io.read_file_utf8(path)
+                if not changelog_str or not changelog_str.strip():
+                    logger.warning(f"Changelog file is empty, path: {path}")
+                    return
+                changelogs[index] = Changelog.from_json(changelog_str)
+            except FileNotFoundError:
+                # File not found is expected in some cases, ignore
+                pass
+            except Exception as e:
+                raise RuntimeError(f"Failed to read changelog from path 
{path}") from e
+
+        # Read changelogs in parallel
+        with ThreadPoolExecutor(max_workers=4) as executor:
+            futures = {
+                executor.submit(read_changelog, i, path): (i, path)
+                for i, path in enumerate(paths)
+            }
+            for future in as_completed(futures):
+                future.result()
+
+        # Filter out None values and sort by ID
+        valid_changelogs = [c for c in changelogs if c is not None]
+        valid_changelogs.sort(key=lambda c: c.id)
+        return valid_changelogs
+
+    def delete_latest_hint(self) -> None:
+        """Delete the latest hint file."""
+        self._delete_latest_hint(self.changelog_directory())
+
+    def delete_earliest_hint(self) -> None:
+        """Delete the earliest hint file."""
+        self._delete_earliest_hint(self.changelog_directory())
+
+    def _branch_path(self) -> str:
+        """Get the branch-specific path.
+
+        Returns:
+            Path string for the current branch
+        """
+        return BranchManager.branch_path(self.table_path, self.branch)
+
+    def _list_changelog_ids(self) -> List[int]:
+        """List all changelog IDs in the changelog directory.
+
+        Returns:
+            Sorted list of changelog IDs
+        """
+        pattern = re.compile(r'^changelog-(\d+)$')
+        changelog_ids = []
+
+        try:
+            file_infos = self.file_io.list_status(self.changelog_directory())
+            for file_info in file_infos:
+                filename = file_info.path.split('/')[-1]
+                match = pattern.match(filename)
+                if match:
+                    changelog_id = int(match.group(1))
+                    changelog_ids.append(changelog_id)
+        except Exception as e:
+            logger.warning(f"Failed to list changelog files: {e}")
+            return []
+
+        return sorted(changelog_ids)
+
+    def _find_latest_hint_id(self, directory: str) -> Optional[int]:
+        """Find the latest snapshot ID from LATEST hint file.
+
+        Args:
+            directory: Directory containing the hint file
+
+        Returns:
+            Latest snapshot ID, or None if not found
+        """
+        latest_file = f"{directory}/LATEST"
+        if not self.file_io.exists(latest_file):
+            return None
+
+        try:
+            content = self.file_io.read_file_utf8(latest_file)
+            if content and content.strip():
+                return int(content.strip())
+        except Exception as e:
+            logger.warning(f"Failed to read latest hint from {latest_file}: 
{e}")
+
+        return None
+
+    def _find_earliest_hint_id(self, directory: str) -> Optional[int]:
+        """Find the earliest snapshot ID from EARLIEST hint file.
+
+        Args:
+            directory: Directory containing the hint file
+
+        Returns:
+            Earliest snapshot ID, or None if not found
+        """
+        earliest_file = f"{directory}/EARLIEST"
+        if not self.file_io.exists(earliest_file):
+            return None
+
+        try:
+            content = self.file_io.read_file_utf8(earliest_file)
+            if content and content.strip():
+                return int(content.strip())
+        except Exception as e:
+            logger.warning(f"Failed to read earliest hint from 
{earliest_file}: {e}")
+
+        return None
+
+    def _commit_latest_hint(self, snapshot_id: int, directory: str) -> None:
+        """Commit the latest hint file.
+
+        Args:
+            snapshot_id: Latest snapshot ID
+            directory: Directory to write hint file
+        """
+        latest_file = f"{directory}/LATEST"
+        self.file_io.write_file(latest_file, str(snapshot_id), False)
+
+    def _commit_earliest_hint(self, snapshot_id: int, directory: str) -> None:
+        """Commit the earliest hint file.
+
+        Args:
+            snapshot_id: Earliest snapshot ID
+            directory: Directory to write hint file
+        """
+        earliest_file = f"{directory}/EARLIEST"
+        self.file_io.write_file(earliest_file, str(snapshot_id), False)
+
+    def _delete_latest_hint(self, directory: str) -> None:
+        """Delete the latest hint file.
+
+        Args:
+            directory: Directory containing the hint file
+        """
+        latest_file = f"{directory}/LATEST"
+        try:
+            self.file_io.delete(latest_file)
+        except FileNotFoundError:
+            # File doesn't exist, that's fine
+            pass
+        except Exception as e:
+            logger.warning(f"Failed to delete latest hint {latest_file}: {e}")
+
+    def _delete_earliest_hint(self, directory: str) -> None:
+        """Delete the earliest hint file.
+
+        Args:
+            directory: Directory containing the hint file
+        """
+        earliest_file = f"{directory}/EARLIEST"
+        try:
+            self.file_io.delete(earliest_file)
+        except FileNotFoundError:
+            # File doesn't exist, that's fine
+            pass
+        except Exception as e:
+            logger.warning(f"Failed to delete earliest hint {earliest_file}: 
{e}")
diff --git a/paimon-python/pypaimon/snapshot/snapshot.py 
b/paimon-python/pypaimon/snapshot/snapshot.py
index f269a9ac85..6903ad9236 100644
--- a/paimon-python/pypaimon/snapshot/snapshot.py
+++ b/paimon-python/pypaimon/snapshot/snapshot.py
@@ -39,9 +39,13 @@ class Snapshot:
     commit_kind: str = json_field("commitKind")
     time_millis: int = json_field("timeMillis")
     # Optional fields with defaults
+    base_manifest_list_size: Optional[int] = 
optional_json_field("baseManifestListSize", "non_null")
+    delta_manifest_list_size: Optional[int] = 
optional_json_field("deltaManifestListSize", "non_null")
     changelog_manifest_list: Optional[str] = 
optional_json_field("changelogManifestList", "non_null")
+    changelog_manifest_list_size: Optional[int] = 
optional_json_field("changelogManifestListSize", "non_null")
     index_manifest: Optional[str] = optional_json_field("indexManifest", 
"non_null")
     changelog_record_count: Optional[int] = 
optional_json_field("changelogRecordCount", "non_null")
     watermark: Optional[int] = optional_json_field("watermark", "non_null")
     statistics: Optional[str] = optional_json_field("statistics", "non_null")
     next_row_id: Optional[int] = optional_json_field("nextRowId", "non_null")
+    properties: Optional[dict] = optional_json_field("properties", "non_null")
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index bb9587cc28..51a031bd94 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -123,6 +123,11 @@ class FileStoreTable(Table):
             current_branch
         )
 
+    def changelog_manager(self):
+        """Get the changelog manager for this table."""
+        from pypaimon.changelog.changelog_manager import ChangelogManager
+        return ChangelogManager(self.file_io, self.table_path, 
self.current_branch())
+
     def create_tag(
             self,
             tag_name: str,
diff --git a/paimon-python/pypaimon/tests/changelog_manager_test.py 
b/paimon-python/pypaimon/tests/changelog_manager_test.py
new file mode 100644
index 0000000000..63d9157e5e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/changelog_manager_test.py
@@ -0,0 +1,137 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+import os
+import shutil
+import tempfile
+import unittest
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.changelog import Changelog, ChangelogManager
+
+import pyarrow as pa
+
+
+class TestChangelogManager(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({
+            'warehouse': cls.warehouse
+        })
+        cls.catalog.create_database('default', True)
+
+        cls.pa_schema = pa.schema([
+            ('user_id', pa.int32()),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            ('dt', pa.string())
+        ])
+        schema = Schema.from_pyarrow_schema(cls.pa_schema, 
partition_keys=['dt'],
+                                            options={"bucket": "2"})
+        cls.catalog.create_table('default.test_changelog_table', schema, False)
+        cls.table = cls.catalog.get_table('default.test_changelog_table')
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def test_changelog_manager_initialization(self):
+        """Test that ChangelogManager can be initialized."""
+        changelog_manager = ChangelogManager(
+            self.table.file_io,
+            self.table.table_path
+        )
+        self.assertIsNotNone(changelog_manager)
+        self.assertEqual(changelog_manager.branch, 'main')
+
+    def test_changelog_manager_with_branch(self):
+        """Test that ChangelogManager can be initialized with a custom 
branch."""
+        changelog_manager = ChangelogManager(
+            self.table.file_io,
+            self.table.table_path,
+            branch='feature'
+        )
+        self.assertIsNotNone(changelog_manager)
+        self.assertEqual(changelog_manager.branch, 'feature')
+
+    def test_changelog_directory_path(self):
+        """Test that changelog directory path is correct."""
+        changelog_manager = ChangelogManager(
+            self.table.file_io,
+            self.table.table_path
+        )
+        expected = f"{self.table.table_path}/changelog"
+        self.assertEqual(changelog_manager.changelog_directory(), expected)
+
+    def test_changelog_directory_path_with_branch(self):
+        """Test that changelog directory path includes branch."""
+        changelog_manager = ChangelogManager(
+            self.table.file_io,
+            self.table.table_path,
+            branch='feature'
+        )
+        expected = f"{self.table.table_path}/branch/branch-feature/changelog"
+        self.assertEqual(changelog_manager.changelog_directory(), expected)
+
+    def test_long_lived_changelog_path(self):
+        """Test that long-lived changelog path is correct."""
+        changelog_manager = ChangelogManager(
+            self.table.file_io,
+            self.table.table_path
+        )
+        expected = f"{self.table.table_path}/changelog/changelog-123"
+        self.assertEqual(changelog_manager.long_lived_changelog_path(123), 
expected)
+
+    def test_latest_long_lived_changelog_id_none(self):
+        """Test that latest changelog ID is None when no changelog exists."""
+        changelog_manager = ChangelogManager(
+            self.table.file_io,
+            self.table.table_path
+        )
+        # No changelog files should exist yet
+        self.assertIsNone(changelog_manager.latest_long_lived_changelog_id())
+
+    def test_earliest_long_lived_changelog_id_none(self):
+        """Test that earliest changelog ID is None when no changelog exists."""
+        changelog_manager = ChangelogManager(
+            self.table.file_io,
+            self.table.table_path
+        )
+        # No changelog files should exist yet
+        self.assertIsNone(changelog_manager.earliest_long_lived_changelog_id())
+
+    def test_changelog_from_snapshot(self):
+        """Test that Changelog can be created from a Snapshot."""
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+        snapshot_manager = SnapshotManager(self.table)
+        snapshot = snapshot_manager.get_latest_snapshot()
+
+        if snapshot:
+            changelog = Changelog.from_snapshot(snapshot)
+            self.assertEqual(changelog.id, snapshot.id)
+            self.assertEqual(changelog.schema_id, snapshot.schema_id)
+            self.assertEqual(changelog.time_millis, snapshot.time_millis)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/table/file_store_table_test.py 
b/paimon-python/pypaimon/tests/table/file_store_table_test.py
index 9406c884cd..d30846a2ae 100644
--- a/paimon-python/pypaimon/tests/table/file_store_table_test.py
+++ b/paimon-python/pypaimon/tests/table/file_store_table_test.py
@@ -239,6 +239,65 @@ class FileStoreTableTest(unittest.TestCase):
             # Restore original catalog environment
             self.table.catalog_environment = original_env
 
+    def test_changelog_manager(self):
+        """Test that FileStoreTable has changelog_manager method."""
+        # Get changelog_manager
+        changelog_manager = self.table.changelog_manager()
+
+        # Verify changelog_manager type
+        from pypaimon.changelog.changelog_manager import ChangelogManager
+        self.assertIsInstance(changelog_manager, ChangelogManager)
+
+        # Verify changelog_manager has correct branch
+        from pypaimon.branch.branch_manager import DEFAULT_MAIN_BRANCH
+        self.assertEqual(self.table.current_branch(), DEFAULT_MAIN_BRANCH)
+        self.assertEqual(changelog_manager.branch, DEFAULT_MAIN_BRANCH)
+
+    def test_changelog_manager_with_branch(self):
+        """Test changelog_manager with branch option."""
+        # Create table with branch option
+        branch_name = "feature"
+        schema = Schema.from_pyarrow_schema(
+            self.pa_schema,
+            partition_keys=['dt'],
+            options={
+                CoreOptions.BUCKET.key(): "2",
+                "branch": branch_name
+            }
+        )
+        self.catalog.create_table('default.test_changelog_branch_table', 
schema, False)
+        branch_table = 
self.catalog.get_table('default.test_changelog_branch_table')
+
+        # Get changelog_manager and verify it has correct branch
+        branch_changelog_manager = branch_table.changelog_manager()
+        self.assertEqual(branch_table.current_branch(), branch_name)
+        self.assertEqual(branch_changelog_manager.branch, branch_name)
+
+        # Verify changelog directory path is correct
+        expected_changelog_dir = 
f"{branch_table.table_path}/branch/branch-feature/changelog"
+        self.assertEqual(branch_changelog_manager.changelog_directory(), 
expected_changelog_dir)
+
+    def test_changelog_manager_path_generation(self):
+        """Test that changelog_manager generates correct paths."""
+        changelog_manager = self.table.changelog_manager()
+
+        # Test changelog directory path
+        expected_dir = f"{self.table.table_path}/changelog"
+        self.assertEqual(changelog_manager.changelog_directory(), expected_dir)
+
+        # Test changelog file path
+        snapshot_id = 123
+        expected_path = 
f"{self.table.table_path}/changelog/changelog-{snapshot_id}"
+        
self.assertEqual(changelog_manager.long_lived_changelog_path(snapshot_id), 
expected_path)
+
+    def test_changelog_manager_latest_and_earliest_none(self):
+        """Test that latest and earliest changelog IDs are None when no 
changelog exists."""
+        changelog_manager = self.table.changelog_manager()
+
+        # No changelog files should exist yet
+        self.assertIsNone(changelog_manager.latest_long_lived_changelog_id())
+        self.assertIsNone(changelog_manager.earliest_long_lived_changelog_id())
+
     def test_current_branch(self):
         """Test that current_branch returns the branch from options."""
         from pypaimon.branch.branch_manager import DEFAULT_MAIN_BRANCH

Reply via email to