This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit c918c47f38d0ed7b17569e2ea0bf15c0d411c5b2
Author: timmyyao <[email protected]>
AuthorDate: Mon Mar 30 18:05:43 2026 +0800

    [python] Support PyJindo in pyarrow_file_io (#7410)
    
    Implement a filesystem for OSS backed by PyJindo
    
(https://aliyun.github.io/alibabacloud-jindodata/jindosdk/jindosdk_download/),
    which provides higher performance and stability for visiting OSS.
---
 paimon-python/pypaimon/common/options/config.py    |   2 +
 .../filesystem/jindo_file_system_handler.py        | 283 +++++++++++
 .../pypaimon/filesystem/pyarrow_file_io.py         |  23 +-
 paimon-python/pypaimon/tests/file_io_test.py       |   8 +-
 .../pypaimon/tests/jindo_file_system_test.py       | 524 +++++++++++++++++++++
 paimon-python/pypaimon/tests/oss_file_io_test.py   | 362 ++++++++++++++
 6 files changed, 1198 insertions(+), 4 deletions(-)

diff --git a/paimon-python/pypaimon/common/options/config.py 
b/paimon-python/pypaimon/common/options/config.py
index fb6a46446e..8b0230da64 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -18,6 +18,8 @@ from pypaimon.common.options.config_options import 
ConfigOptions
 
 
 class OssOptions:
+    OSS_IMPL = 
ConfigOptions.key("fs.oss.impl").string_type().default_value("default").with_description(
+        "OSS filesystem implementation: default or jindo")
     OSS_ACCESS_KEY_ID = 
ConfigOptions.key("fs.oss.accessKeyId").string_type().no_default_value().with_description(
         "OSS access key ID")
     OSS_ACCESS_KEY_SECRET = ConfigOptions.key(
diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py 
b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
new file mode 100644
index 0000000000..693416cc57
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
@@ -0,0 +1,283 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+
+import pyarrow as pa
+from pyarrow import PythonFile
+from pyarrow._fs import FileSystemHandler
+from pyarrow.fs import FileInfo, FileSelector, FileType
+
+try:
+    import pyjindo.fs as jfs
+    import pyjindo.util as jutil
+    JINDO_AVAILABLE = True
+except ImportError:
+    JINDO_AVAILABLE = False
+    jfs = None
+    jutil = None
+
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+
+
+class JindoInputFile:
+    def __init__(self, jindo_stream):
+        self._stream = jindo_stream
+        self._closed = False
+
+    @property
+    def closed(self):
+        if hasattr(self._stream, 'closed'):
+            return self._stream.closed
+        return self._closed
+
+    def read(self, nbytes: int = -1):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+        if nbytes is None or nbytes < 0:
+            return self._stream.read()
+        return self._stream.read(nbytes)
+
+    def seek(self, position: int, whence: int = 0):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+        self._stream.seek(position, whence)
+
+    def tell(self) -> int:
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+        return self._stream.tell()
+
+    def read_at(self, nbytes: int, offset: int):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+        return self._stream.pread(nbytes, offset)
+
+    def close(self):
+        if not self._closed:
+            self._stream.close()
+            self._closed = True
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+        return False
+
+
+class JindoOutputFile:
+    def __init__(self, jindo_stream):
+        self._stream = jindo_stream
+        self._closed = False
+
+    @property
+    def closed(self):
+        if hasattr(self._stream, 'closed'):
+            return self._stream.closed
+        return self._closed
+
+    def write(self, data: bytes) -> int:
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+        if isinstance(data, pa.Buffer):
+            data = data.to_pybytes()
+        elif not isinstance(data, bytes):
+            raise TypeError("Unsupported data type")
+        return self._stream.write(data)
+
+    def flush(self):
+        if self.closed:
+            raise ValueError("I/O operation on closed file")
+        if hasattr(self._stream, 'flush'):
+            self._stream.flush()
+
+    def close(self):
+        if not self._closed:
+            self._stream.close()
+            self._closed = True
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+        return False
+
+
+class JindoFileSystemHandler(FileSystemHandler):
+    def __init__(self, root_path: str, catalog_options: Options):
+        if not JINDO_AVAILABLE:
+            raise ImportError("Module pyjindo is not available. Please install 
pyjindosdk.")
+
+        self.logger = logging.getLogger(__name__)
+        self.root_path = root_path
+        self.properties = catalog_options
+
+        # Build jindo config from catalog_options
+        config = jutil.Config()
+
+        access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID)
+        access_key_secret = 
catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET)
+        security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN)
+        endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT)
+        region = catalog_options.get(OssOptions.OSS_REGION)
+
+        if access_key_id:
+            config.set("fs.oss.accessKeyId", access_key_id)
+        if access_key_secret:
+            config.set("fs.oss.accessKeySecret", access_key_secret)
+        if security_token:
+            config.set("fs.oss.securityToken", security_token)
+        if endpoint:
+            endpoint_clean = endpoint.replace('http://', 
'').replace('https://', '')
+            config.set("fs.oss.endpoint", endpoint_clean)
+        if region:
+            config.set("fs.oss.region", region)
+        config.set("fs.oss.user.agent.features", "pypaimon")
+
+        self._jindo_fs = jfs.connect(self.root_path, "root", config)
+
+    def __eq__(self, other):
+        if isinstance(other, JindoFileSystemHandler):
+            return self.root_path == other.root_path
+        return NotImplemented
+
+    def __ne__(self, other):
+        if isinstance(other, JindoFileSystemHandler):
+            return not self.__eq__(other)
+        return NotImplemented
+
+    def _normalize_path(self, path: str) -> str:
+        if path.startswith('oss://'):
+            return path
+
+        if not path or path == '.':
+            return self.root_path.rstrip('/') + '/'
+
+        path_clean = path.lstrip('/')
+        return self.root_path.rstrip('/') + '/' + path_clean
+
+    def _convert_file_type(self, jindo_type) -> FileType:
+        if jindo_type == jfs.FileType.File:
+            return FileType.File
+        elif jindo_type == jfs.FileType.Directory:
+            return FileType.Directory
+        else:
+            return FileType.Unknown
+
+    def _convert_file_info(self, jindo_info) -> FileInfo:
+        pa_type = self._convert_file_type(jindo_info.type)
+        return FileInfo(
+            path=jindo_info.path,
+            type=pa_type,
+            size=jindo_info.size if jindo_info.type == jfs.FileType.File else 
None,
+            mtime=jindo_info.mtime if hasattr(jindo_info, 'mtime') else None,
+        )
+
+    def get_type_name(self) -> str:
+        return "jindo"
+
+    def get_file_info(self, paths) -> list:
+        infos = []
+        for path in paths:
+            normalized = self._normalize_path(path)
+            try:
+                jindo_info = self._jindo_fs.get_file_info(normalized)
+                infos.append(self._convert_file_info(jindo_info))
+            except FileNotFoundError:
+                infos.append(FileInfo(normalized, FileType.NotFound))
+        return infos
+
+    def get_file_info_selector(self, selector: FileSelector) -> list:
+        normalized = self._normalize_path(selector.base_dir)
+        try:
+            items = self._jindo_fs.listdir(normalized, 
recursive=selector.recursive)
+            return [self._convert_file_info(item) for item in items]
+        except FileNotFoundError:
+            if selector.allow_not_found:
+                return []
+            raise
+
+    def create_dir(self, path: str, recursive: bool):
+        normalized = self._normalize_path(path)
+        self._jindo_fs.mkdir(normalized)
+
+    def delete_dir(self, path: str):
+        normalized = self._normalize_path(path)
+        self._jindo_fs.remove(normalized)
+
+    def delete_dir_contents(self, path: str, missing_dir_ok: bool = False):
+        normalized = self._normalize_path(path)
+        if normalized == self.root_path:
+            raise ValueError(
+                "delete_dir_contents() does not accept root path"
+            )
+        self._delete_dir_contents(path, missing_dir_ok)
+
+    def delete_root_dir_contents(self):
+        self._delete_dir_contents("/", missing_dir_ok=False)
+
+    def _delete_dir_contents(self, path: str, missing_dir_ok: bool):
+        normalized = self._normalize_path(path)
+        try:
+            items = self._jindo_fs.listdir(normalized, recursive=False)
+        except FileNotFoundError:
+            if missing_dir_ok:
+                return
+            raise
+        except Exception as e:
+            self.logger.warning(f"Error listing {path}: {e}")
+            raise
+        for item in items:
+            self._jindo_fs.remove(item.path)
+
+    def delete_file(self, path: str):
+        normalized = self._normalize_path(path)
+        self._jindo_fs.remove(normalized)
+
+    def move(self, src: str, dest: str):
+        src_norm = self._normalize_path(src)
+        dst_norm = self._normalize_path(dest)
+        self._jindo_fs.rename(src_norm, dst_norm)
+
+    def copy_file(self, src: str, dest: str):
+        src_norm = self._normalize_path(src)
+        dst_norm = self._normalize_path(dest)
+        self._jindo_fs.copy_file(src_norm, dst_norm)
+
+    def open_input_stream(self, path: str):
+        normalized = self._normalize_path(path)
+        jindo_stream = self._jindo_fs.open(normalized, "rb")
+        return PythonFile(JindoInputFile(jindo_stream), mode="r")
+
+    def open_input_file(self, path: str):
+        normalized = self._normalize_path(path)
+        jindo_stream = self._jindo_fs.open(normalized, "rb")
+        return PythonFile(JindoInputFile(jindo_stream), mode="r")
+
+    def open_output_stream(self, path: str, metadata):
+        normalized = self._normalize_path(path)
+        jindo_stream = self._jindo_fs.open(normalized, "wb")
+        return PythonFile(JindoOutputFile(jindo_stream), mode="w")
+
+    def open_append_stream(self, path: str, metadata):
+        raise IOError("append mode is not supported")
+
+    def normalize_path(self, path: str) -> str:
+        return self._normalize_path(path)
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py 
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 18e54a0b70..6cf2faabb2 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -34,6 +34,7 @@ from pypaimon.common.file_io import FileIO
 from pypaimon.common.options import Options
 from pypaimon.common.options.config import OssOptions, S3Options
 from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem.jindo_file_system_handler import 
JindoFileSystemHandler
 from pypaimon.schema.data_types import (AtomicType, DataField,
                                         PyarrowFieldParser)
 from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
@@ -56,9 +57,13 @@ class PyArrowFileIO(FileIO):
         self.uri_reader_factory = UriReaderFactory(catalog_options)
         self._is_oss = scheme in {"oss"}
         self._oss_bucket = None
+        self._oss_impl = self.properties.get(OssOptions.OSS_IMPL)
         if self._is_oss:
             self._oss_bucket = self._extract_oss_bucket(path)
-            self.filesystem = self._initialize_oss_fs(path)
+            if self._oss_impl == "jindo":
+                self.filesystem = self._initialize_jindo_fs(path)
+            else:
+                self.filesystem = self._initialize_oss_fs(path)
         elif scheme in {"s3", "s3a", "s3n"}:
             self.filesystem = self._initialize_s3_fs()
         elif scheme in {"hdfs", "viewfs"}:
@@ -116,6 +121,13 @@ class PyArrowFileIO(FileIO):
             raise ValueError("Invalid OSS URI without bucket: 
{}".format(location))
         return bucket
 
+    def _initialize_jindo_fs(self, path) -> FileSystem:
+        """Initialize JindoFileSystem for OSS access."""
+        self.logger.info(f"Initializing JindoFileSystem for OSS access: 
{path}")
+        root_path = f"oss://{self._oss_bucket}/"
+        fs_handler = JindoFileSystemHandler(root_path, self.properties)
+        return pafs.PyFileSystem(fs_handler)
+
     def _initialize_oss_fs(self, path) -> FileSystem:
         if self.properties.get(OssOptions.OSS_ACCESS_KEY_ID):
             # When explicit credentials are provided, disable the EC2 Instance 
Metadata
@@ -198,7 +210,9 @@ class PyArrowFileIO(FileIO):
     def new_output_stream(self, path: str):
         path_str = self.to_filesystem_path(path)
 
-        if self._is_oss and not self._pyarrow_gte_7:
+        if self._oss_impl == "jindo":
+            pass
+        elif self._is_oss and not self._pyarrow_gte_7:
             # For PyArrow 6.x + OSS, path_str is already just the key part
             if '/' in path_str:
                 parent_dir = '/'.join(path_str.split('/')[:-1])
@@ -560,6 +574,11 @@ class PyArrowFileIO(FileIO):
             path_part = normalized_path.lstrip('/')
             return f"{drive_letter}:/{path_part}" if path_part else 
f"{drive_letter}:"
 
+        if self._oss_impl == "jindo":
+            # For JindoFileSystem, pass key only
+            path_part = normalized_path.lstrip('/')
+            return path_part if path_part else '.'
+
         if isinstance(self.filesystem, S3FileSystem):
             if parsed.scheme:
                 if parsed.netloc:
diff --git a/paimon-python/pypaimon/tests/file_io_test.py 
b/paimon-python/pypaimon/tests/file_io_test.py
index ba91f94a18..5cc4d7a821 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -67,7 +67,9 @@ class FileIOTest(unittest.TestCase):
 
         lt7 = _pyarrow_lt_7()
         oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({
-            OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com'
+            OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com',
+            OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key',
+            OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret',
         }))
         got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt")
         self.assertEqual(got, "path/to/file.txt" if lt7 else 
"test-bucket/path/to/file.txt")
@@ -286,7 +288,9 @@ class FileIOTest(unittest.TestCase):
             file_io.delete_directory_quietly("file:///some/path")
 
             oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({
-                OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com'
+                OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com',
+                OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key',
+                OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret',
             }))
             mock_fs = MagicMock()
             mock_fs.get_file_info.return_value = [
diff --git a/paimon-python/pypaimon/tests/jindo_file_system_test.py 
b/paimon-python/pypaimon/tests/jindo_file_system_test.py
new file mode 100644
index 0000000000..b6f168ac58
--- /dev/null
+++ b/paimon-python/pypaimon/tests/jindo_file_system_test.py
@@ -0,0 +1,524 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import unittest
+import uuid
+
+import pyarrow.fs as pafs
+
+from pyarrow.fs import PyFileSystem
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+from pypaimon.filesystem.jindo_file_system_handler import 
JindoFileSystemHandler, JINDO_AVAILABLE
+
+
+class JindoFileSystemTest(unittest.TestCase):
+    """Test cases for JindoFileSystem."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        if not JINDO_AVAILABLE:
+            self.skipTest("pyjindo is not available")
+
+        # Get OSS credentials from environment variables or use test values
+        bucket = os.environ.get("OSS_TEST_BUCKET")
+        access_key_id = os.environ.get("OSS_ACCESS_KEY_ID")
+        access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET")
+        endpoint = os.environ.get("OSS_ENDPOINT")
+        if not bucket:
+            self.skipTest("test bucket is not configured")
+            return
+        if not access_key_id:
+            self.skipTest("test access key id is not configured")
+            return
+        if not access_key_secret:
+            self.skipTest("test access key secret is not configured")
+            return
+        if not endpoint:
+            self.skipTest("test endpoint is not configured")
+            return
+        self.root_path = f"oss://{bucket}/"
+
+        self.catalog_options = Options({
+            OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id,
+            OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret,
+            OssOptions.OSS_ENDPOINT.key(): endpoint,
+        })
+
+        # Create JindoFileSystemHandler instance
+        fs_handler = JindoFileSystemHandler(self.root_path, 
self.catalog_options)
+        self.fs = PyFileSystem(fs_handler)
+
+        # Create unique test prefix to avoid conflicts
+        self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"
+
+    def tearDown(self):
+        """Clean up test files and directories."""
+        # Delete the entire test prefix directory
+        test_dir = f"{self.root_path}{self.test_prefix}"
+        try:
+            file_info = self.fs.get_file_info(test_dir)
+            if file_info.type == pafs.FileType.Directory:
+                try:
+                    self.fs.delete_dir(test_dir)
+                except Exception:
+                    pass
+        except Exception:
+            pass  # Ignore cleanup errors
+
+    def _get_test_path(self, name: str) -> str:
+        """Get a test path under test_prefix."""
+        return f"{self.root_path}{self.test_prefix}{name}"
+
+    def test_get_root_file_info(self):
+        """Test get_file_info for root path."""
+        # Verify directory exists using get_file_info
+        file_info = self.fs.get_file_info(self.root_path)
+
+        # Verify the returned file info
+        self.assertIsInstance(file_info, pafs.FileInfo)
+        self.assertEqual(file_info.type, pafs.FileType.Directory)
+
+    def test_create_dir_recursive(self):
+        """Test create_dir with recursive=True."""
+        test_dir = self._get_test_path("nested/deep/dir/")
+
+        # Create nested directory
+        self.fs.create_dir(test_dir, recursive=True)
+
+        # Verify directory exists
+        file_info = self.fs.get_file_info(test_dir)
+        self.assertEqual(file_info.type, pafs.FileType.Directory)
+
+    def test_write_and_read_small_file(self):
+        """Test writing and reading a small file (10 bytes) with random 
data."""
+        test_file = self._get_test_path("small-file.txt")
+        test_data = os.urandom(10)
+
+        # Write file
+        out_stream = self.fs.open_output_stream(test_file)
+        out_stream.write(test_data)
+        out_stream.close()
+
+        # Read file
+        with self.fs.open_input_file(test_file) as in_file:
+            read_data = in_file.read()
+
+        # Verify data correctness
+        self.assertEqual(test_data, read_data)
+        self.assertEqual(len(read_data), 10)
+
+        # Verify file info
+        file_info = self.fs.get_file_info(test_file)
+        self.assertEqual(file_info.type, pafs.FileType.File)
+        self.assertEqual(file_info.size, 10)
+
+    def test_write_and_read_small_file_with_context(self):
+        """Test writing and reading a small file (10 bytes) with random data 
using context manager."""
+        test_file = self._get_test_path("small-file-with-context.txt")
+        test_data = os.urandom(10)
+
+        # Write file using context manager
+        with self.fs.open_output_stream(test_file) as out_stream:
+            out_stream.write(test_data)
+
+        # Read file
+        with self.fs.open_input_file(test_file) as in_file:
+            read_data = in_file.read()
+
+        # Verify data correctness
+        self.assertEqual(test_data, read_data)
+        self.assertEqual(len(read_data), 10)
+
+        # Verify file info
+        file_info = self.fs.get_file_info(test_file)
+        self.assertEqual(file_info.type, pafs.FileType.File)
+        self.assertEqual(file_info.size, 10)
+
+    def test_write_and_read_large_file(self):
+        """Test writing and reading a large file (20MB) with random data."""
+        test_file = self._get_test_path("large-file.bin")
+        file_size = 20 * 1024 * 1024  # 20MB
+        test_data = os.urandom(file_size)
+
+        # Write file in chunks
+        chunk_size = 1024 * 1024  # 1MB chunks
+        out_stream = self.fs.open_output_stream(test_file)
+        for i in range(0, len(test_data), chunk_size):
+            chunk = test_data[i:i + chunk_size]
+            out_stream.write(chunk)
+        out_stream.close()
+
+        # Verify file info
+        file_info = self.fs.get_file_info(test_file)
+        self.assertEqual(file_info.type, pafs.FileType.File)
+        self.assertEqual(file_info.size, file_size)
+
+        # Test JindoInputFile all methods
+        with self.fs.open_input_file(test_file) as in_file:
+            # Test tell() - should be at position 0 initially
+            self.assertEqual(in_file.tell(), 0)
+
+            # Test read() with specific size
+            chunk1 = in_file.read(1024)
+            self.assertEqual(len(chunk1), 1024)
+            self.assertEqual(in_file.tell(), 1024)
+            self.assertEqual(chunk1, test_data[0:1024])
+
+            # Test seek() - seek to middle of file
+            middle_pos = file_size // 2
+            in_file.seek(middle_pos)
+            self.assertEqual(in_file.tell(), middle_pos)
+
+            # Test read_at()
+            read_at_data = in_file.read_at(1024, 0)
+            self.assertEqual(len(read_at_data), 1024)
+            self.assertEqual(read_at_data, test_data[0:1024])
+
+            # Test read_at() at different offset
+            offset = file_size - 2048
+            read_at_data2 = in_file.read_at(1024, offset)
+            self.assertEqual(len(read_at_data2), 1024)
+            self.assertEqual(read_at_data2, test_data[offset:offset + 1024])
+
+            # Test seek() to end
+            in_file.seek(file_size)
+            self.assertEqual(in_file.tell(), file_size)
+
+            # Test read() at end - should return empty bytes
+            empty_data = in_file.read(100)
+            self.assertEqual(len(empty_data), 0)
+
+            # Test read() without size - read remaining
+            in_file.seek(0)
+            all_data = in_file.read()
+            self.assertEqual(len(all_data), file_size)
+            self.assertEqual(all_data, test_data)
+
+        # Verify complete file read
+        with self.fs.open_input_file(test_file) as in_file:
+            complete_data = in_file.read()
+            self.assertEqual(complete_data, test_data)
+            self.assertEqual(len(complete_data), file_size)
+
+    def test_get_file_info_single_path(self):
+        """Test get_file_info with single path."""
+        test_file = self._get_test_path("info-test.txt")
+        test_data = b"test content"
+
+        # Write file
+        out_stream = self.fs.open_output_stream(test_file)
+        out_stream.write(test_data)
+        out_stream.close()
+
+        # Get file info
+        file_info = self.fs.get_file_info(test_file)
+        self.assertIsInstance(file_info, pafs.FileInfo)
+        self.assertEqual(file_info.type, pafs.FileType.File)
+        self.assertEqual(file_info.size, len(test_data))
+
+    def test_get_file_info_list(self):
+        """Test get_file_info with list of paths."""
+        test_file = self._get_test_path("info-test1.txt")
+        test_dir = self._get_test_path("dir1")
+
+        # Write files
+        out_stream = self.fs.open_output_stream(test_file)
+        out_stream.write(b"content1")
+        out_stream.close()
+        self.fs.create_dir(test_dir)
+
+        # Get file info for list
+        file_infos = self.fs.get_file_info([test_file, test_dir])
+        self.assertIsInstance(file_infos, list)
+        self.assertEqual(len(file_infos), 2)
+        self.assertEqual(file_infos[0].type, pafs.FileType.File)
+        self.assertTrue(test_file in file_infos[0].path)
+        self.assertEqual(file_infos[1].type, pafs.FileType.Directory)
+        self.assertTrue(test_dir in file_infos[1].path)
+
+    def test_get_file_info_with_selector(self):
+        """Test get_file_info with FileSelector."""
+        test_dir = self._get_test_path("selector-test/")
+        test_file1 = self._get_test_path("selector-test/file1.txt")
+        test_file2 = self._get_test_path("selector-test/file2.txt")
+
+        # Create files
+        self.fs.create_dir(test_dir)
+        out_stream1 = self.fs.open_output_stream(test_file1)
+        out_stream1.write(b"content1")
+        out_stream1.close()
+        out_stream2 = self.fs.open_output_stream(test_file2)
+        out_stream2.write(b"content2")
+        out_stream2.close()
+
+        # Test non-recursive listing
+        selector = pafs.FileSelector(test_dir, recursive=False, 
allow_not_found=False)
+        file_infos = self.fs.get_file_info(selector)
+        self.assertIsInstance(file_infos, list)
+        self.assertEqual(len(file_infos), 2)
+
+        # Verify we got the files
+        file_names = [info.path for info in file_infos]
+        self.assertTrue(any("file1.txt" in name for name in file_names))
+        self.assertTrue(any("file2.txt" in name for name in file_names))
+
+    def test_get_file_info_with_selector_recursive(self):
+        """Test get_file_info with FileSelector recursive=True."""
+        test_dir = self._get_test_path("selector-recursive/")
+        test_subdir = self._get_test_path("selector-recursive/subdir/")
+        test_file1 = self._get_test_path("selector-recursive/file1.txt")
+        test_file2 = self._get_test_path("selector-recursive/subdir/file2.txt")
+
+        # Create directory structure
+        self.fs.create_dir(test_dir)
+        self.fs.create_dir(test_subdir)
+        out_stream1 = self.fs.open_output_stream(test_file1)
+        out_stream1.write(b"content1")
+        out_stream1.close()
+        out_stream2 = self.fs.open_output_stream(test_file2)
+        out_stream2.write(b"content2")
+        out_stream2.close()
+
+        # Test recursive listing
+        selector = pafs.FileSelector(test_dir, recursive=True, 
allow_not_found=False)
+        file_infos = self.fs.get_file_info(selector)
+        self.assertIsInstance(file_infos, list)
+        self.assertEqual(len(file_infos), 3)
+
+    def test_get_file_info_not_found(self):
+        """Test get_file_info for non-existent file."""
+        non_existent = self._get_test_path("non-existent-file.txt")
+
+        file_info = self.fs.get_file_info(non_existent)
+        self.assertEqual(file_info.type, pafs.FileType.NotFound)
+
+        # Try to open non-existent file should raise FileNotFoundError
+        with self.assertRaises(FileNotFoundError):
+            with self.fs.open_input_file(non_existent) as in_file:
+                in_file.read()
+
+    def test_get_file_info_selector_allow_not_found(self):
+        """Test get_file_info with FileSelector allow_not_found=True."""
+        non_existent_dir = self._get_test_path("non-existent-dir/")
+
+        # Test with allow_not_found=True
+        selector = pafs.FileSelector(non_existent_dir, recursive=False, 
allow_not_found=True)
+        file_infos = self.fs.get_file_info(selector)
+        self.assertIsInstance(file_infos, list)
+        self.assertEqual(len(file_infos), 0)
+
+    def test_delete_file(self):
+        """Test delete_file method."""
+        test_file = self._get_test_path("delete-test.txt")
+
+        # Create file
+        out_stream = self.fs.open_output_stream(test_file)
+        out_stream.write(b"test content")
+        out_stream.close()
+
+        # Verify file exists
+        file_info = self.fs.get_file_info(test_file)
+        self.assertEqual(file_info.type, pafs.FileType.File)
+
+        # Delete file
+        self.fs.delete_file(test_file)
+
+        # Verify file is deleted - should raise FileNotFoundError when 
accessing
+        with self.assertRaises(FileNotFoundError):
+            with self.fs.open_input_file(test_file) as in_file:
+                in_file.read()
+
+    def test_delete_dir(self):
+        """Test delete_dir method."""
+        test_dir = self._get_test_path("delete-dir/")
+
+        # Create directory
+        self.fs.create_dir(test_dir)
+
+        # Verify directory exists
+        file_info = self.fs.get_file_info(test_dir)
+        self.assertEqual(file_info.type, pafs.FileType.Directory)
+
+        # Delete directory
+        self.fs.delete_dir(test_dir)
+
+        # Verify directory is deleted - should raise FileNotFoundError when 
accessing
+        with self.assertRaises(FileNotFoundError):
+            # Try to list directory contents
+            selector = pafs.FileSelector(test_dir, recursive=False, 
allow_not_found=False)
+            self.fs.get_file_info(selector)
+
+    def test_delete_dir_contents(self):
+        """Test delete_dir_contents method."""
+        test_dir = self._get_test_path("delete-contents/")
+        test_file1 = self._get_test_path("delete-contents/file1.txt")
+        test_file2 = self._get_test_path("delete-contents/file2.txt")
+        test_subdir = self._get_test_path("delete-contents/subdir/")
+        test_file3 = self._get_test_path("delete-contents/subdir/file3.txt")
+
+        # Create directory structure
+        self.fs.create_dir(test_dir)
+        out_stream1 = self.fs.open_output_stream(test_file1)
+        out_stream1.write(b"content1")
+        out_stream1.close()
+        out_stream2 = self.fs.open_output_stream(test_file2)
+        out_stream2.write(b"content2")
+        out_stream2.close()
+        self.fs.create_dir(test_subdir)
+        out_stream3 = self.fs.open_output_stream(test_file3)
+        out_stream3.write(b"content3")
+        out_stream3.close()
+
+        # Delete directory contents
+        self.fs.delete_dir_contents(test_dir)
+
+        # Verify directory still exists but is empty
+        file_info = self.fs.get_file_info(test_dir)
+        self.assertEqual(file_info.type, pafs.FileType.Directory)
+        selector = pafs.FileSelector(test_dir, recursive=False, 
allow_not_found=False)
+        file_infos = self.fs.get_file_info(selector)
+        self.assertEqual(len(file_infos), 0)
+
+        # Verify files are deleted - should raise FileNotFoundError when 
accessing
+        with self.assertRaises(FileNotFoundError):
+            with self.fs.open_input_file(test_file1) as in_file:
+                in_file.read()
+        with self.assertRaises(FileNotFoundError):
+            with self.fs.open_input_file(test_file2) as in_file:
+                in_file.read()
+        with self.assertRaises(FileNotFoundError):
+            with self.fs.open_input_file(test_file3) as in_file:
+                in_file.read()
+
+    def test_move_file(self):
+        """Test move method for file."""
+        src_file = self._get_test_path("move-src.txt")
+        dst_file = self._get_test_path("move-dst.txt")
+        test_data = b"move test content"
+
+        # Create source file
+        out_stream = self.fs.open_output_stream(src_file)
+        out_stream.write(test_data)
+        out_stream.close()
+
+        # Move file
+        self.fs.move(src_file, dst_file)
+
+        # Verify source is gone - should raise FileNotFoundError when accessing
+        with self.assertRaises(FileNotFoundError):
+            with self.fs.open_input_file(src_file) as in_file:
+                in_file.read()
+
+        # Verify destination exists with correct content
+        dst_info = self.fs.get_file_info(dst_file)
+        self.assertEqual(dst_info.type, pafs.FileType.File)
+
+        with self.fs.open_input_file(dst_file) as in_file:
+            read_data = in_file.read()
+            self.assertEqual(read_data, test_data)
+
+    def test_move_directory(self):
+        """Test move method for directory."""
+        src_dir = self._get_test_path("move-src-dir/")
+        dst_dir = self._get_test_path("move-dst-dir/")
+        src_file = self._get_test_path("move-src-dir/file.txt")
+
+        # Create source directory and file
+        self.fs.create_dir(src_dir)
+        out_stream = self.fs.open_output_stream(src_file)
+        out_stream.write(b"test content")
+        out_stream.close()
+
+        # Move directory
+        self.fs.move(src_dir, dst_dir)
+
+        # Verify source is gone - should raise FileNotFoundError when accessing
+        with self.assertRaises(FileNotFoundError):
+            # Try to list directory contents
+            selector = pafs.FileSelector(src_dir, recursive=False, 
allow_not_found=False)
+            self.fs.get_file_info(selector)
+
+        # Verify destination exists
+        dst_info = self.fs.get_file_info(dst_dir)
+        self.assertEqual(dst_info.type, pafs.FileType.Directory)
+
+    def test_copy_file(self):
+        """Test copy_file method."""
+        src_file = self._get_test_path("copy-src.txt")
+        dst_file = self._get_test_path("copy-dst.txt")
+        test_data = os.urandom(1024 * 1024)  # 1MB random data
+
+        # Create source file
+        out_stream = self.fs.open_output_stream(src_file)
+        out_stream.write(test_data)
+        out_stream.close()
+
+        # Copy file
+        self.fs.copy_file(src_file, dst_file)
+
+        # Verify both files exist
+        src_info = self.fs.get_file_info(src_file)
+        self.assertEqual(src_info.type, pafs.FileType.File)
+        dst_info = self.fs.get_file_info(dst_file)
+        self.assertEqual(dst_info.type, pafs.FileType.File)
+
+        # Verify destination content matches source
+        with self.fs.open_input_file(dst_file) as in_file:
+            read_data = in_file.read()
+            self.assertEqual(read_data, test_data)
+            self.assertEqual(len(read_data), len(test_data))
+
+    def test_delete_nonexistent_file(self):
+        """Test deleting non-existent file."""
+        non_existent = self._get_test_path("non-existent-delete.txt")
+
+        # Try to delete non-existent file
+        with self.assertRaises(IOError):
+            self.fs.delete_file(non_existent)
+
+    def test_multiple_sequential_reads(self):
+        """Test multiple sequential reads from same file."""
+        test_file = self._get_test_path("sequential-read.txt")
+        test_data = os.urandom(10000)  # 10KB
+
+        # Write file
+        out_stream = self.fs.open_output_stream(test_file)
+        out_stream.write(test_data)
+        out_stream.close()
+
+        # Read in chunks sequentially
+        with self.fs.open_input_file(test_file) as in_file:
+            chunk1 = in_file.read(1000)
+            chunk2 = in_file.read(2000)
+            chunk3 = in_file.read(3000)
+            chunk4 = in_file.read()  # Read rest
+
+            # Verify all chunks
+            self.assertEqual(chunk1, test_data[0:1000])
+            self.assertEqual(chunk2, test_data[1000:3000])
+            self.assertEqual(chunk3, test_data[3000:6000])
+            self.assertEqual(chunk4, test_data[6000:])
+
+            # Verify total length
+            total_read = len(chunk1) + len(chunk2) + len(chunk3) + len(chunk4)
+            self.assertEqual(total_read, len(test_data))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/tests/oss_file_io_test.py 
b/paimon-python/pypaimon/tests/oss_file_io_test.py
new file mode 100644
index 0000000000..f9602c975e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/oss_file_io_test.py
@@ -0,0 +1,362 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import unittest
+import uuid
+
+import pyarrow.fs as pafs
+
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+
+
+class OSSFileIOTest(unittest.TestCase):
+    """Test cases for PyArrowFileIO with OSS."""
+
+    def setUp(self):
+        """Set up test fixtures."""
+        # Get OSS credentials from environment variables
+        self.bucket = os.environ.get("OSS_TEST_BUCKET")
+        access_key_id = os.environ.get("OSS_ACCESS_KEY_ID")
+        access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET")
+        endpoint = os.environ.get("OSS_ENDPOINT")
+        oss_impl = os.environ.get("OSS_IMPL", "default")
+
+        if not self.bucket:
+            self.skipTest("test bucket is not configured")
+            return
+        if not access_key_id:
+            self.skipTest("test access key id is not configured")
+            return
+        if not access_key_secret:
+            self.skipTest("test access key secret is not configured")
+            return
+        if not endpoint:
+            self.skipTest("test endpoint is not configured")
+            return
+        
+        self.root_path = f"oss://{self.bucket}/"
+        
+        self.catalog_options = Options({
+            OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id,
+            OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret,
+            OssOptions.OSS_ENDPOINT.key(): endpoint,
+            OssOptions.OSS_IMPL.key(): oss_impl,
+        })
+        
+        # Create PyArrowFileIO instance
+        self.file_io = PyArrowFileIO(self.root_path, self.catalog_options)
+        
+        # Create unique test prefix to avoid conflicts
+        self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"
+
+    def tearDown(self):
+        """Clean up test files and directories."""
+        # Delete the entire test prefix directory
+        test_dir = f"{self.root_path}{self.test_prefix}"
+        try:
+            if self.file_io.exists(test_dir):
+                self.file_io.delete(test_dir, recursive=True)
+        except Exception:
+            pass  # Ignore cleanup errors
+
+    def _get_test_path(self, name: str) -> str:
+        """Get a test path under test_prefix."""
+        return f"{self.root_path}{self.test_prefix}{name}"
+
+    def test_get_file_status_directory(self):
+        """Test get_file_status for a directory."""
+        # Create a test directory
+        test_dir = self._get_test_path("test-dir/")
+        self.file_io.mkdirs(test_dir)
+        # Get file status
+        file_status = self.file_io.get_file_status(test_dir)
+        # Verify the returned file status
+        self.assertIsNotNone(file_status)
+        self.assertEqual(file_status.type, pafs.FileType.Directory)
+
+    def test_new_input_stream_read(self):
+        """Test new_input_stream and read method."""
+        # Create test data
+        test_data = b"Hello, World! This is a test file for OSS input stream."
+        test_file = self._get_test_path("test-input-stream.txt")
+        
+        # Write test data to file
+        with self.file_io.new_output_stream(test_file) as out_stream:
+            out_stream.write(test_data)
+        
+        # Test new_input_stream
+        input_stream = self.file_io.new_input_stream(test_file)
+        self.assertIsNotNone(input_stream)
+        
+        # Test read without nbytes (read all)
+        input_stream.seek(0)
+        read_data = input_stream.read()
+        self.assertEqual(read_data, test_data)
+        
+        # Test read with nbytes
+        input_stream.seek(0)
+        read_partial = input_stream.read(5)
+        self.assertEqual(read_partial, b"Hello")
+        
+        # Test read more bytes
+        read_partial2 = input_stream.read(7)
+        self.assertEqual(read_partial2, b", World")
+        
+        # Test read remaining
+        read_remaining = input_stream.read()
+        self.assertEqual(read_remaining, b"! This is a test file for OSS input 
stream.")
+        
+        # Verify complete data
+        input_stream.seek(0)
+        complete_data = input_stream.read()
+        self.assertEqual(complete_data, test_data)
+        
+        # Close the stream
+        input_stream.close()
+        
+        # Test context manager
+        with self.file_io.new_input_stream(test_file) as input_stream2:
+            data = input_stream2.read()
+            self.assertEqual(data, test_data)
+
+    def test_new_input_stream_read_large_file(self):
+        """Test new_input_stream with larger file and read operations."""
+        # Create larger test data (1MB)
+        test_data = b"X" * (1024 * 1024)
+        test_file = self._get_test_path("test-large-input-stream.bin")
+        
+        # Write test data
+        with self.file_io.new_output_stream(test_file) as out_stream:
+            out_stream.write(test_data)
+        
+        # Test reading in chunks
+        chunk_size = 64 * 1024  # 64KB chunks
+        with self.file_io.new_input_stream(test_file) as input_stream:
+            read_chunks = []
+            while True:
+                chunk = input_stream.read(chunk_size)
+                if not chunk:
+                    break
+                read_chunks.append(chunk)
+            
+            # Verify all data was read
+            read_data = b''.join(read_chunks)
+            self.assertEqual(len(read_data), len(test_data))
+            self.assertEqual(read_data, test_data)
+        
+        # Test read_at method if available
+        with self.file_io.new_input_stream(test_file) as input_stream:
+            if hasattr(input_stream, 'read_at'):
+                # Read from middle of file
+                offset = len(test_data) // 2
+                read_at_data = input_stream.read_at(1024, offset)
+                self.assertEqual(len(read_at_data), 1024)
+                self.assertEqual(read_at_data, test_data[offset:offset+1024])
+
+    def test_new_input_stream_file_not_found(self):
+        """Test new_input_stream with non-existent file."""
+        non_existent_file = self._get_test_path("non-existent-file.txt")
+        
+        with self.assertRaises(FileNotFoundError):
+            self.file_io.new_input_stream(non_existent_file)
+
+    def test_exists(self):
+        """Test exists method."""
+        missing_uri = self._get_test_path("nonexistent_xyz")
+        self.assertFalse(self.file_io.exists(missing_uri))
+        with self.assertRaises(FileNotFoundError):
+            self.file_io.get_file_status(missing_uri)
+
+    def test_write_file_with_overwrite_flag(self):
+        """Test write_file with overwrite flag."""
+        test_file_uri = self._get_test_path("overwrite_test.txt")
+
+        # 1) Write to a new file with default overwrite=False
+        self.file_io.write_file(test_file_uri, "first content")
+        self.assertTrue(self.file_io.exists(test_file_uri))
+        content = self.file_io.read_file_utf8(test_file_uri)
+        self.assertEqual(content, "first content")
+
+        # 2) Attempt to write again with overwrite=False should raise 
FileExistsError
+        with self.assertRaises(FileExistsError):
+            self.file_io.write_file(test_file_uri, "second content", 
overwrite=False)
+
+        # Ensure content is unchanged
+        content = self.file_io.read_file_utf8(test_file_uri)
+        self.assertEqual(content, "first content")
+
+        # 3) Write with overwrite=True should replace the content
+        self.file_io.write_file(test_file_uri, "overwritten content", 
overwrite=True)
+        content = self.file_io.read_file_utf8(test_file_uri)
+        self.assertEqual(content, "overwritten content")
+
+    def test_exists_does_not_catch_exception(self):
+        """Test that exists does not catch exceptions."""
+        test_file = self._get_test_path("test_file.txt")
+        
+        # Write a test file
+        with self.file_io.new_output_stream(test_file) as out_stream:
+            out_stream.write(b"test")
+        self.assertTrue(self.file_io.exists(test_file))
+        
self.assertFalse(self.file_io.exists(self._get_test_path("nonexistent.txt")))
+
+    def test_delete_non_empty_directory_raises_error(self):
+        """Test that delete raises error for non-empty directory."""
+        test_dir = self._get_test_path("test_dir/")
+        test_file = self._get_test_path("test_dir/test_file.txt")
+
+        # Create directory and file
+        self.file_io.mkdirs(test_dir)
+        with self.file_io.new_output_stream(test_file) as out_stream:
+            out_stream.write(b"test")
+
+        with self.assertRaises(OSError) as context:
+            self.file_io.delete(test_dir, recursive=False)
+        self.assertIn("is not empty", str(context.exception))
+
+    def test_delete_returns_false_when_file_not_exists(self):
+        """Test that delete returns False when file does not exist."""
+        result = self.file_io.delete(self._get_test_path("nonexistent.txt"))
+        self.assertFalse(result, "delete() should return False when file does 
not exist")
+
+        result = self.file_io.delete(self._get_test_path("nonexistent_dir"), 
recursive=False)
+        self.assertFalse(result, "delete() should return False when directory 
does not exist")
+
+    def test_mkdirs_raises_error_when_path_is_file(self):
+        """Test that mkdirs raises error when path is a file."""
+        test_file = self._get_test_path("test_file.txt")
+        
+        # Create a file
+        with self.file_io.new_output_stream(test_file) as out_stream:
+            out_stream.write(b"test")
+
+        with self.assertRaises(FileExistsError) as context:
+            self.file_io.mkdirs(test_file)
+        self.assertIn("is not a directory", str(context.exception))
+
+    def test_rename_returns_false_when_dst_exists(self):
+        """Test that rename returns False when destination exists."""
+        src_file = self._get_test_path("src.txt")
+        dst_file = self._get_test_path("dst.txt")
+        
+        # Create source and destination files
+        with self.file_io.new_output_stream(src_file) as out_stream:
+            out_stream.write(b"src")
+        with self.file_io.new_output_stream(dst_file) as out_stream:
+            out_stream.write(b"dst")
+
+        result = self.file_io.rename(src_file, dst_file)
+        self.assertFalse(result)
+
+    def test_get_file_status_raises_error_when_file_not_exists(self):
+        """Test that get_file_status raises error when file does not exist."""
+        with self.assertRaises(FileNotFoundError) as context:
+            
self.file_io.get_file_status(self._get_test_path("nonexistent.txt"))
+        self.assertIn("does not exist", str(context.exception))
+
+        test_file = self._get_test_path("test_file.txt")
+        with self.file_io.new_output_stream(test_file) as out_stream:
+            out_stream.write(b"test content")
+        
+        file_info = self.file_io.get_file_status(test_file)
+        self.assertEqual(file_info.type, pafs.FileType.File)
+        self.assertIsNotNone(file_info.size)
+
+        with self.assertRaises(FileNotFoundError) as context:
+            self.file_io.get_file_size(self._get_test_path("nonexistent.txt"))
+        self.assertIn("does not exist", str(context.exception))
+
+        with self.assertRaises(FileNotFoundError) as context:
+            self.file_io.is_dir(self._get_test_path("nonexistent_dir"))
+        self.assertIn("does not exist", str(context.exception))
+
+    def test_copy_file(self):
+        """Test copy_file method."""
+        source_file = self._get_test_path("source.txt")
+        target_file = self._get_test_path("target.txt")
+        
+        # Create source file
+        with self.file_io.new_output_stream(source_file) as out_stream:
+            out_stream.write(b"source content")
+        
+        # Test 1: Raises FileExistsError when target exists and overwrite=False
+        with self.file_io.new_output_stream(target_file) as out_stream:
+            out_stream.write(b"target content")
+        
+        with self.assertRaises(FileExistsError) as context:
+            self.file_io.copy_file(source_file, target_file, overwrite=False)
+        self.assertIn("already exists", str(context.exception))
+        
+        # Verify target content unchanged
+        with self.file_io.new_input_stream(target_file) as in_stream:
+            content = in_stream.read()
+            self.assertEqual(content, b"target content")
+        
+        # Test 2: Overwrites when overwrite=True
+        self.file_io.copy_file(source_file, target_file, overwrite=True)
+        with self.file_io.new_input_stream(target_file) as in_stream:
+            content = in_stream.read()
+            self.assertEqual(content, b"source content")
+        
+        # Test 3: Creates parent directory if it doesn't exist
+        target_file_in_subdir = self._get_test_path("subdir/target.txt")
+        self.file_io.copy_file(source_file, target_file_in_subdir, 
overwrite=False)
+        self.assertTrue(self.file_io.exists(target_file_in_subdir))
+        with self.file_io.new_input_stream(target_file_in_subdir) as in_stream:
+            content = in_stream.read()
+            self.assertEqual(content, b"source content")
+
+    def test_try_to_write_atomic(self):
+        """Test try_to_write_atomic method."""
+        target_dir = self._get_test_path("target_dir/")
+        normal_file = self._get_test_path("normal_file.txt")
+        
+        # Create target directory
+        self.file_io.mkdirs(target_dir)
+        self.assertFalse(
+            self.file_io.try_to_write_atomic(target_dir, "test content"),
+            "PyArrowFileIO should return False when target is a directory")
+        
+        # Verify no file was created inside the directory
+        # List directory contents to verify it's empty
+        selector = 
pafs.FileSelector(self.file_io.to_filesystem_path(target_dir), recursive=False, 
allow_not_found=True)
+        dir_contents = self.file_io.filesystem.get_file_info(selector)
+        self.assertEqual(len(dir_contents), 0, "No file should be created 
inside the directory")
+        
+        self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test 
content"))
+        content = self.file_io.read_file_utf8(normal_file)
+        self.assertEqual(content, "test content")
+        
+        # Delete and test again
+        self.file_io.delete(normal_file)
+        self.assertFalse(
+            self.file_io.try_to_write_atomic(target_dir, "test content"),
+            "PyArrowFileIO should return False when target is a directory")
+        
+        # Verify no file was created inside the directory
+        dir_contents = self.file_io.filesystem.get_file_info(selector)
+        self.assertEqual(len(dir_contents), 0, "No file should be created 
inside the directory")
+        
+        self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test 
content"))
+        content = self.file_io.read_file_utf8(normal_file)
+        self.assertEqual(content, "test content")
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to