This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0798f21bb4 [python] Support PyJindo in pyarrow_file_io (#7410)
0798f21bb4 is described below
commit 0798f21bb4c057a7fb87ef059e720430f4f716ba
Author: timmyyao <[email protected]>
AuthorDate: Mon Mar 30 18:05:43 2026 +0800
[python] Support PyJindo in pyarrow_file_io (#7410)
Implement a filesystem for OSS backed by PyJindo
(https://aliyun.github.io/alibabacloud-jindodata/jindosdk/jindosdk_download/),
which provides higher performance and stability for visiting OSS.
---
paimon-python/pypaimon/common/options/config.py | 2 +
.../filesystem/jindo_file_system_handler.py | 283 +++++++++++
.../pypaimon/filesystem/pyarrow_file_io.py | 23 +-
paimon-python/pypaimon/tests/file_io_test.py | 8 +-
.../pypaimon/tests/jindo_file_system_test.py | 524 +++++++++++++++++++++
paimon-python/pypaimon/tests/oss_file_io_test.py | 362 ++++++++++++++
6 files changed, 1198 insertions(+), 4 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/config.py
b/paimon-python/pypaimon/common/options/config.py
index fb6a46446e..8b0230da64 100644
--- a/paimon-python/pypaimon/common/options/config.py
+++ b/paimon-python/pypaimon/common/options/config.py
@@ -18,6 +18,8 @@ from pypaimon.common.options.config_options import
ConfigOptions
class OssOptions:
+ OSS_IMPL =
ConfigOptions.key("fs.oss.impl").string_type().default_value("default").with_description(
+ "OSS filesystem implementation: default or jindo")
OSS_ACCESS_KEY_ID =
ConfigOptions.key("fs.oss.accessKeyId").string_type().no_default_value().with_description(
"OSS access key ID")
OSS_ACCESS_KEY_SECRET = ConfigOptions.key(
diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
new file mode 100644
index 0000000000..693416cc57
--- /dev/null
+++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py
@@ -0,0 +1,283 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import logging
+
+import pyarrow as pa
+from pyarrow import PythonFile
+from pyarrow._fs import FileSystemHandler
+from pyarrow.fs import FileInfo, FileSelector, FileType
+
+try:
+ import pyjindo.fs as jfs
+ import pyjindo.util as jutil
+ JINDO_AVAILABLE = True
+except ImportError:
+ JINDO_AVAILABLE = False
+ jfs = None
+ jutil = None
+
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+
+
+class JindoInputFile:
+ def __init__(self, jindo_stream):
+ self._stream = jindo_stream
+ self._closed = False
+
+ @property
+ def closed(self):
+ if hasattr(self._stream, 'closed'):
+ return self._stream.closed
+ return self._closed
+
+ def read(self, nbytes: int = -1):
+ if self.closed:
+ raise ValueError("I/O operation on closed file")
+ if nbytes is None or nbytes < 0:
+ return self._stream.read()
+ return self._stream.read(nbytes)
+
+ def seek(self, position: int, whence: int = 0):
+ if self.closed:
+ raise ValueError("I/O operation on closed file")
+ self._stream.seek(position, whence)
+
+ def tell(self) -> int:
+ if self.closed:
+ raise ValueError("I/O operation on closed file")
+ return self._stream.tell()
+
+ def read_at(self, nbytes: int, offset: int):
+ if self.closed:
+ raise ValueError("I/O operation on closed file")
+ return self._stream.pread(nbytes, offset)
+
+ def close(self):
+ if not self._closed:
+ self._stream.close()
+ self._closed = True
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+ return False
+
+
+class JindoOutputFile:
+ def __init__(self, jindo_stream):
+ self._stream = jindo_stream
+ self._closed = False
+
+ @property
+ def closed(self):
+ if hasattr(self._stream, 'closed'):
+ return self._stream.closed
+ return self._closed
+
+ def write(self, data: bytes) -> int:
+ if self.closed:
+ raise ValueError("I/O operation on closed file")
+ if isinstance(data, pa.Buffer):
+ data = data.to_pybytes()
+ elif not isinstance(data, bytes):
+ raise TypeError("Unsupported data type")
+ return self._stream.write(data)
+
+ def flush(self):
+ if self.closed:
+ raise ValueError("I/O operation on closed file")
+ if hasattr(self._stream, 'flush'):
+ self._stream.flush()
+
+ def close(self):
+ if not self._closed:
+ self._stream.close()
+ self._closed = True
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+ return False
+
+
+class JindoFileSystemHandler(FileSystemHandler):
+ def __init__(self, root_path: str, catalog_options: Options):
+ if not JINDO_AVAILABLE:
+ raise ImportError("Module pyjindo is not available. Please install
pyjindosdk.")
+
+ self.logger = logging.getLogger(__name__)
+ self.root_path = root_path
+ self.properties = catalog_options
+
+ # Build jindo config from catalog_options
+ config = jutil.Config()
+
+ access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID)
+ access_key_secret =
catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET)
+ security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN)
+ endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT)
+ region = catalog_options.get(OssOptions.OSS_REGION)
+
+ if access_key_id:
+ config.set("fs.oss.accessKeyId", access_key_id)
+ if access_key_secret:
+ config.set("fs.oss.accessKeySecret", access_key_secret)
+ if security_token:
+ config.set("fs.oss.securityToken", security_token)
+ if endpoint:
+ endpoint_clean = endpoint.replace('http://',
'').replace('https://', '')
+ config.set("fs.oss.endpoint", endpoint_clean)
+ if region:
+ config.set("fs.oss.region", region)
+ config.set("fs.oss.user.agent.features", "pypaimon")
+
+ self._jindo_fs = jfs.connect(self.root_path, "root", config)
+
+ def __eq__(self, other):
+ if isinstance(other, JindoFileSystemHandler):
+ return self.root_path == other.root_path
+ return NotImplemented
+
+ def __ne__(self, other):
+ if isinstance(other, JindoFileSystemHandler):
+ return not self.__eq__(other)
+ return NotImplemented
+
+ def _normalize_path(self, path: str) -> str:
+ if path.startswith('oss://'):
+ return path
+
+ if not path or path == '.':
+ return self.root_path.rstrip('/') + '/'
+
+ path_clean = path.lstrip('/')
+ return self.root_path.rstrip('/') + '/' + path_clean
+
+ def _convert_file_type(self, jindo_type) -> FileType:
+ if jindo_type == jfs.FileType.File:
+ return FileType.File
+ elif jindo_type == jfs.FileType.Directory:
+ return FileType.Directory
+ else:
+ return FileType.Unknown
+
+ def _convert_file_info(self, jindo_info) -> FileInfo:
+ pa_type = self._convert_file_type(jindo_info.type)
+ return FileInfo(
+ path=jindo_info.path,
+ type=pa_type,
+ size=jindo_info.size if jindo_info.type == jfs.FileType.File else
None,
+ mtime=jindo_info.mtime if hasattr(jindo_info, 'mtime') else None,
+ )
+
+ def get_type_name(self) -> str:
+ return "jindo"
+
+ def get_file_info(self, paths) -> list:
+ infos = []
+ for path in paths:
+ normalized = self._normalize_path(path)
+ try:
+ jindo_info = self._jindo_fs.get_file_info(normalized)
+ infos.append(self._convert_file_info(jindo_info))
+ except FileNotFoundError:
+ infos.append(FileInfo(normalized, FileType.NotFound))
+ return infos
+
+ def get_file_info_selector(self, selector: FileSelector) -> list:
+ normalized = self._normalize_path(selector.base_dir)
+ try:
+ items = self._jindo_fs.listdir(normalized,
recursive=selector.recursive)
+ return [self._convert_file_info(item) for item in items]
+ except FileNotFoundError:
+ if selector.allow_not_found:
+ return []
+ raise
+
+ def create_dir(self, path: str, recursive: bool):
+ normalized = self._normalize_path(path)
+ self._jindo_fs.mkdir(normalized)
+
+ def delete_dir(self, path: str):
+ normalized = self._normalize_path(path)
+ self._jindo_fs.remove(normalized)
+
+ def delete_dir_contents(self, path: str, missing_dir_ok: bool = False):
+ normalized = self._normalize_path(path)
+ if normalized == self.root_path:
+ raise ValueError(
+ "delete_dir_contents() does not accept root path"
+ )
+ self._delete_dir_contents(path, missing_dir_ok)
+
+ def delete_root_dir_contents(self):
+ self._delete_dir_contents("/", missing_dir_ok=False)
+
+ def _delete_dir_contents(self, path: str, missing_dir_ok: bool):
+ normalized = self._normalize_path(path)
+ try:
+ items = self._jindo_fs.listdir(normalized, recursive=False)
+ except FileNotFoundError:
+ if missing_dir_ok:
+ return
+ raise
+ except Exception as e:
+ self.logger.warning(f"Error listing {path}: {e}")
+ raise
+ for item in items:
+ self._jindo_fs.remove(item.path)
+
+ def delete_file(self, path: str):
+ normalized = self._normalize_path(path)
+ self._jindo_fs.remove(normalized)
+
+ def move(self, src: str, dest: str):
+ src_norm = self._normalize_path(src)
+ dst_norm = self._normalize_path(dest)
+ self._jindo_fs.rename(src_norm, dst_norm)
+
+ def copy_file(self, src: str, dest: str):
+ src_norm = self._normalize_path(src)
+ dst_norm = self._normalize_path(dest)
+ self._jindo_fs.copy_file(src_norm, dst_norm)
+
+ def open_input_stream(self, path: str):
+ normalized = self._normalize_path(path)
+ jindo_stream = self._jindo_fs.open(normalized, "rb")
+ return PythonFile(JindoInputFile(jindo_stream), mode="r")
+
+ def open_input_file(self, path: str):
+ normalized = self._normalize_path(path)
+ jindo_stream = self._jindo_fs.open(normalized, "rb")
+ return PythonFile(JindoInputFile(jindo_stream), mode="r")
+
+ def open_output_stream(self, path: str, metadata):
+ normalized = self._normalize_path(path)
+ jindo_stream = self._jindo_fs.open(normalized, "wb")
+ return PythonFile(JindoOutputFile(jindo_stream), mode="w")
+
+ def open_append_stream(self, path: str, metadata):
+ raise IOError("append mode is not supported")
+
+ def normalize_path(self, path: str) -> str:
+ return self._normalize_path(path)
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index 8f831389cb..2f4f8fd974 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -34,6 +34,7 @@ from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
+from pypaimon.filesystem.jindo_file_system_handler import
JindoFileSystemHandler
from pypaimon.schema.data_types import (AtomicType, DataField,
PyarrowFieldParser)
from pypaimon.table.row.blob import Blob, BlobData, BlobDescriptor
@@ -56,9 +57,13 @@ class PyArrowFileIO(FileIO):
self.uri_reader_factory = UriReaderFactory(catalog_options)
self._is_oss = scheme in {"oss"}
self._oss_bucket = None
+ self._oss_impl = self.properties.get(OssOptions.OSS_IMPL)
if self._is_oss:
self._oss_bucket = self._extract_oss_bucket(path)
- self.filesystem = self._initialize_oss_fs(path)
+ if self._oss_impl == "jindo":
+ self.filesystem = self._initialize_jindo_fs(path)
+ else:
+ self.filesystem = self._initialize_oss_fs(path)
elif scheme in {"s3", "s3a", "s3n"}:
self.filesystem = self._initialize_s3_fs()
elif scheme in {"hdfs", "viewfs"}:
@@ -116,6 +121,13 @@ class PyArrowFileIO(FileIO):
raise ValueError("Invalid OSS URI without bucket:
{}".format(location))
return bucket
+ def _initialize_jindo_fs(self, path) -> FileSystem:
+ """Initialize JindoFileSystem for OSS access."""
+ self.logger.info(f"Initializing JindoFileSystem for OSS access:
{path}")
+ root_path = f"oss://{self._oss_bucket}/"
+ fs_handler = JindoFileSystemHandler(root_path, self.properties)
+ return pafs.PyFileSystem(fs_handler)
+
def _initialize_oss_fs(self, path) -> FileSystem:
if self.properties.get(OssOptions.OSS_ACCESS_KEY_ID):
# When explicit credentials are provided, disable the EC2 Instance
Metadata
@@ -198,7 +210,9 @@ class PyArrowFileIO(FileIO):
def new_output_stream(self, path: str):
path_str = self.to_filesystem_path(path)
- if self._is_oss and not self._pyarrow_gte_7:
+ if self._oss_impl == "jindo":
+ pass
+ elif self._is_oss and not self._pyarrow_gte_7:
# For PyArrow 6.x + OSS, path_str is already just the key part
if '/' in path_str:
parent_dir = '/'.join(path_str.split('/')[:-1])
@@ -578,6 +592,11 @@ class PyArrowFileIO(FileIO):
path_part = normalized_path.lstrip('/')
return f"{drive_letter}:/{path_part}" if path_part else
f"{drive_letter}:"
+ if self._oss_impl == "jindo":
+ # For JindoFileSystem, pass key only
+ path_part = normalized_path.lstrip('/')
+ return path_part if path_part else '.'
+
if isinstance(self.filesystem, S3FileSystem):
if parsed.scheme:
if parsed.netloc:
diff --git a/paimon-python/pypaimon/tests/file_io_test.py
b/paimon-python/pypaimon/tests/file_io_test.py
index ba91f94a18..5cc4d7a821 100644
--- a/paimon-python/pypaimon/tests/file_io_test.py
+++ b/paimon-python/pypaimon/tests/file_io_test.py
@@ -67,7 +67,9 @@ class FileIOTest(unittest.TestCase):
lt7 = _pyarrow_lt_7()
oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({
- OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com'
+ OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com',
+ OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key',
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret',
}))
got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt")
self.assertEqual(got, "path/to/file.txt" if lt7 else
"test-bucket/path/to/file.txt")
@@ -286,7 +288,9 @@ class FileIOTest(unittest.TestCase):
file_io.delete_directory_quietly("file:///some/path")
oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({
- OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com'
+ OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com',
+ OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key',
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret',
}))
mock_fs = MagicMock()
mock_fs.get_file_info.return_value = [
diff --git a/paimon-python/pypaimon/tests/jindo_file_system_test.py
b/paimon-python/pypaimon/tests/jindo_file_system_test.py
new file mode 100644
index 0000000000..b6f168ac58
--- /dev/null
+++ b/paimon-python/pypaimon/tests/jindo_file_system_test.py
@@ -0,0 +1,524 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import unittest
+import uuid
+
+import pyarrow.fs as pafs
+
+from pyarrow.fs import PyFileSystem
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+from pypaimon.filesystem.jindo_file_system_handler import
JindoFileSystemHandler, JINDO_AVAILABLE
+
+
+class JindoFileSystemTest(unittest.TestCase):
+ """Test cases for JindoFileSystem."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ if not JINDO_AVAILABLE:
+ self.skipTest("pyjindo is not available")
+
+ # Get OSS credentials from environment variables or use test values
+ bucket = os.environ.get("OSS_TEST_BUCKET")
+ access_key_id = os.environ.get("OSS_ACCESS_KEY_ID")
+ access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET")
+ endpoint = os.environ.get("OSS_ENDPOINT")
+ if not bucket:
+ self.skipTest("test bucket is not configured")
+ return
+ if not access_key_id:
+ self.skipTest("test access key id is not configured")
+ return
+ if not access_key_secret:
+ self.skipTest("test access key secret is not configured")
+ return
+ if not endpoint:
+ self.skipTest("test endpoint is not configured")
+ return
+ self.root_path = f"oss://{bucket}/"
+
+ self.catalog_options = Options({
+ OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id,
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret,
+ OssOptions.OSS_ENDPOINT.key(): endpoint,
+ })
+
+ # Create JindoFileSystemHandler instance
+ fs_handler = JindoFileSystemHandler(self.root_path,
self.catalog_options)
+ self.fs = PyFileSystem(fs_handler)
+
+ # Create unique test prefix to avoid conflicts
+ self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"
+
+ def tearDown(self):
+ """Clean up test files and directories."""
+ # Delete the entire test prefix directory
+ test_dir = f"{self.root_path}{self.test_prefix}"
+ try:
+ file_info = self.fs.get_file_info(test_dir)
+ if file_info.type == pafs.FileType.Directory:
+ try:
+ self.fs.delete_dir(test_dir)
+ except Exception:
+ pass
+ except Exception:
+ pass # Ignore cleanup errors
+
+ def _get_test_path(self, name: str) -> str:
+ """Get a test path under test_prefix."""
+ return f"{self.root_path}{self.test_prefix}{name}"
+
+ def test_get_root_file_info(self):
+ """Test get_file_info for root path."""
+ # Verify directory exists using get_file_info
+ file_info = self.fs.get_file_info(self.root_path)
+
+ # Verify the returned file info
+ self.assertIsInstance(file_info, pafs.FileInfo)
+ self.assertEqual(file_info.type, pafs.FileType.Directory)
+
+ def test_create_dir_recursive(self):
+ """Test create_dir with recursive=True."""
+ test_dir = self._get_test_path("nested/deep/dir/")
+
+ # Create nested directory
+ self.fs.create_dir(test_dir, recursive=True)
+
+ # Verify directory exists
+ file_info = self.fs.get_file_info(test_dir)
+ self.assertEqual(file_info.type, pafs.FileType.Directory)
+
+ def test_write_and_read_small_file(self):
+ """Test writing and reading a small file (10 bytes) with random
data."""
+ test_file = self._get_test_path("small-file.txt")
+ test_data = os.urandom(10)
+
+ # Write file
+ out_stream = self.fs.open_output_stream(test_file)
+ out_stream.write(test_data)
+ out_stream.close()
+
+ # Read file
+ with self.fs.open_input_file(test_file) as in_file:
+ read_data = in_file.read()
+
+ # Verify data correctness
+ self.assertEqual(test_data, read_data)
+ self.assertEqual(len(read_data), 10)
+
+ # Verify file info
+ file_info = self.fs.get_file_info(test_file)
+ self.assertEqual(file_info.type, pafs.FileType.File)
+ self.assertEqual(file_info.size, 10)
+
+ def test_write_and_read_small_file_with_context(self):
+ """Test writing and reading a small file (10 bytes) with random data
using context manager."""
+ test_file = self._get_test_path("small-file-with-context.txt")
+ test_data = os.urandom(10)
+
+ # Write file using context manager
+ with self.fs.open_output_stream(test_file) as out_stream:
+ out_stream.write(test_data)
+
+ # Read file
+ with self.fs.open_input_file(test_file) as in_file:
+ read_data = in_file.read()
+
+ # Verify data correctness
+ self.assertEqual(test_data, read_data)
+ self.assertEqual(len(read_data), 10)
+
+ # Verify file info
+ file_info = self.fs.get_file_info(test_file)
+ self.assertEqual(file_info.type, pafs.FileType.File)
+ self.assertEqual(file_info.size, 10)
+
+ def test_write_and_read_large_file(self):
+ """Test writing and reading a large file (20MB) with random data."""
+ test_file = self._get_test_path("large-file.bin")
+ file_size = 20 * 1024 * 1024 # 20MB
+ test_data = os.urandom(file_size)
+
+ # Write file in chunks
+ chunk_size = 1024 * 1024 # 1MB chunks
+ out_stream = self.fs.open_output_stream(test_file)
+ for i in range(0, len(test_data), chunk_size):
+ chunk = test_data[i:i + chunk_size]
+ out_stream.write(chunk)
+ out_stream.close()
+
+ # Verify file info
+ file_info = self.fs.get_file_info(test_file)
+ self.assertEqual(file_info.type, pafs.FileType.File)
+ self.assertEqual(file_info.size, file_size)
+
+ # Test JindoInputFile all methods
+ with self.fs.open_input_file(test_file) as in_file:
+ # Test tell() - should be at position 0 initially
+ self.assertEqual(in_file.tell(), 0)
+
+ # Test read() with specific size
+ chunk1 = in_file.read(1024)
+ self.assertEqual(len(chunk1), 1024)
+ self.assertEqual(in_file.tell(), 1024)
+ self.assertEqual(chunk1, test_data[0:1024])
+
+ # Test seek() - seek to middle of file
+ middle_pos = file_size // 2
+ in_file.seek(middle_pos)
+ self.assertEqual(in_file.tell(), middle_pos)
+
+ # Test read_at()
+ read_at_data = in_file.read_at(1024, 0)
+ self.assertEqual(len(read_at_data), 1024)
+ self.assertEqual(read_at_data, test_data[0:1024])
+
+ # Test read_at() at different offset
+ offset = file_size - 2048
+ read_at_data2 = in_file.read_at(1024, offset)
+ self.assertEqual(len(read_at_data2), 1024)
+ self.assertEqual(read_at_data2, test_data[offset:offset + 1024])
+
+ # Test seek() to end
+ in_file.seek(file_size)
+ self.assertEqual(in_file.tell(), file_size)
+
+ # Test read() at end - should return empty bytes
+ empty_data = in_file.read(100)
+ self.assertEqual(len(empty_data), 0)
+
+ # Test read() without size - read remaining
+ in_file.seek(0)
+ all_data = in_file.read()
+ self.assertEqual(len(all_data), file_size)
+ self.assertEqual(all_data, test_data)
+
+ # Verify complete file read
+ with self.fs.open_input_file(test_file) as in_file:
+ complete_data = in_file.read()
+ self.assertEqual(complete_data, test_data)
+ self.assertEqual(len(complete_data), file_size)
+
+ def test_get_file_info_single_path(self):
+ """Test get_file_info with single path."""
+ test_file = self._get_test_path("info-test.txt")
+ test_data = b"test content"
+
+ # Write file
+ out_stream = self.fs.open_output_stream(test_file)
+ out_stream.write(test_data)
+ out_stream.close()
+
+ # Get file info
+ file_info = self.fs.get_file_info(test_file)
+ self.assertIsInstance(file_info, pafs.FileInfo)
+ self.assertEqual(file_info.type, pafs.FileType.File)
+ self.assertEqual(file_info.size, len(test_data))
+
+ def test_get_file_info_list(self):
+ """Test get_file_info with list of paths."""
+ test_file = self._get_test_path("info-test1.txt")
+ test_dir = self._get_test_path("dir1")
+
+ # Write files
+ out_stream = self.fs.open_output_stream(test_file)
+ out_stream.write(b"content1")
+ out_stream.close()
+ self.fs.create_dir(test_dir)
+
+ # Get file info for list
+ file_infos = self.fs.get_file_info([test_file, test_dir])
+ self.assertIsInstance(file_infos, list)
+ self.assertEqual(len(file_infos), 2)
+ self.assertEqual(file_infos[0].type, pafs.FileType.File)
+ self.assertTrue(test_file in file_infos[0].path)
+ self.assertEqual(file_infos[1].type, pafs.FileType.Directory)
+ self.assertTrue(test_dir in file_infos[1].path)
+
+ def test_get_file_info_with_selector(self):
+ """Test get_file_info with FileSelector."""
+ test_dir = self._get_test_path("selector-test/")
+ test_file1 = self._get_test_path("selector-test/file1.txt")
+ test_file2 = self._get_test_path("selector-test/file2.txt")
+
+ # Create files
+ self.fs.create_dir(test_dir)
+ out_stream1 = self.fs.open_output_stream(test_file1)
+ out_stream1.write(b"content1")
+ out_stream1.close()
+ out_stream2 = self.fs.open_output_stream(test_file2)
+ out_stream2.write(b"content2")
+ out_stream2.close()
+
+ # Test non-recursive listing
+ selector = pafs.FileSelector(test_dir, recursive=False,
allow_not_found=False)
+ file_infos = self.fs.get_file_info(selector)
+ self.assertIsInstance(file_infos, list)
+ self.assertEqual(len(file_infos), 2)
+
+ # Verify we got the files
+ file_names = [info.path for info in file_infos]
+ self.assertTrue(any("file1.txt" in name for name in file_names))
+ self.assertTrue(any("file2.txt" in name for name in file_names))
+
+ def test_get_file_info_with_selector_recursive(self):
+ """Test get_file_info with FileSelector recursive=True."""
+ test_dir = self._get_test_path("selector-recursive/")
+ test_subdir = self._get_test_path("selector-recursive/subdir/")
+ test_file1 = self._get_test_path("selector-recursive/file1.txt")
+ test_file2 = self._get_test_path("selector-recursive/subdir/file2.txt")
+
+ # Create directory structure
+ self.fs.create_dir(test_dir)
+ self.fs.create_dir(test_subdir)
+ out_stream1 = self.fs.open_output_stream(test_file1)
+ out_stream1.write(b"content1")
+ out_stream1.close()
+ out_stream2 = self.fs.open_output_stream(test_file2)
+ out_stream2.write(b"content2")
+ out_stream2.close()
+
+ # Test recursive listing
+ selector = pafs.FileSelector(test_dir, recursive=True,
allow_not_found=False)
+ file_infos = self.fs.get_file_info(selector)
+ self.assertIsInstance(file_infos, list)
+ self.assertEqual(len(file_infos), 3)
+
+ def test_get_file_info_not_found(self):
+ """Test get_file_info for non-existent file."""
+ non_existent = self._get_test_path("non-existent-file.txt")
+
+ file_info = self.fs.get_file_info(non_existent)
+ self.assertEqual(file_info.type, pafs.FileType.NotFound)
+
+ # Try to open non-existent file should raise FileNotFoundError
+ with self.assertRaises(FileNotFoundError):
+ with self.fs.open_input_file(non_existent) as in_file:
+ in_file.read()
+
+ def test_get_file_info_selector_allow_not_found(self):
+ """Test get_file_info with FileSelector allow_not_found=True."""
+ non_existent_dir = self._get_test_path("non-existent-dir/")
+
+ # Test with allow_not_found=True
+ selector = pafs.FileSelector(non_existent_dir, recursive=False,
allow_not_found=True)
+ file_infos = self.fs.get_file_info(selector)
+ self.assertIsInstance(file_infos, list)
+ self.assertEqual(len(file_infos), 0)
+
+ def test_delete_file(self):
+ """Test delete_file method."""
+ test_file = self._get_test_path("delete-test.txt")
+
+ # Create file
+ out_stream = self.fs.open_output_stream(test_file)
+ out_stream.write(b"test content")
+ out_stream.close()
+
+ # Verify file exists
+ file_info = self.fs.get_file_info(test_file)
+ self.assertEqual(file_info.type, pafs.FileType.File)
+
+ # Delete file
+ self.fs.delete_file(test_file)
+
+ # Verify file is deleted - should raise FileNotFoundError when
accessing
+ with self.assertRaises(FileNotFoundError):
+ with self.fs.open_input_file(test_file) as in_file:
+ in_file.read()
+
+ def test_delete_dir(self):
+ """Test delete_dir method."""
+ test_dir = self._get_test_path("delete-dir/")
+
+ # Create directory
+ self.fs.create_dir(test_dir)
+
+ # Verify directory exists
+ file_info = self.fs.get_file_info(test_dir)
+ self.assertEqual(file_info.type, pafs.FileType.Directory)
+
+ # Delete directory
+ self.fs.delete_dir(test_dir)
+
+ # Verify directory is deleted - should raise FileNotFoundError when
accessing
+ with self.assertRaises(FileNotFoundError):
+ # Try to list directory contents
+ selector = pafs.FileSelector(test_dir, recursive=False,
allow_not_found=False)
+ self.fs.get_file_info(selector)
+
+ def test_delete_dir_contents(self):
+ """Test delete_dir_contents method."""
+ test_dir = self._get_test_path("delete-contents/")
+ test_file1 = self._get_test_path("delete-contents/file1.txt")
+ test_file2 = self._get_test_path("delete-contents/file2.txt")
+ test_subdir = self._get_test_path("delete-contents/subdir/")
+ test_file3 = self._get_test_path("delete-contents/subdir/file3.txt")
+
+ # Create directory structure
+ self.fs.create_dir(test_dir)
+ out_stream1 = self.fs.open_output_stream(test_file1)
+ out_stream1.write(b"content1")
+ out_stream1.close()
+ out_stream2 = self.fs.open_output_stream(test_file2)
+ out_stream2.write(b"content2")
+ out_stream2.close()
+ self.fs.create_dir(test_subdir)
+ out_stream3 = self.fs.open_output_stream(test_file3)
+ out_stream3.write(b"content3")
+ out_stream3.close()
+
+ # Delete directory contents
+ self.fs.delete_dir_contents(test_dir)
+
+ # Verify directory still exists but is empty
+ file_info = self.fs.get_file_info(test_dir)
+ self.assertEqual(file_info.type, pafs.FileType.Directory)
+ selector = pafs.FileSelector(test_dir, recursive=False,
allow_not_found=False)
+ file_infos = self.fs.get_file_info(selector)
+ self.assertEqual(len(file_infos), 0)
+
+ # Verify files are deleted - should raise FileNotFoundError when
accessing
+ with self.assertRaises(FileNotFoundError):
+ with self.fs.open_input_file(test_file1) as in_file:
+ in_file.read()
+ with self.assertRaises(FileNotFoundError):
+ with self.fs.open_input_file(test_file2) as in_file:
+ in_file.read()
+ with self.assertRaises(FileNotFoundError):
+ with self.fs.open_input_file(test_file3) as in_file:
+ in_file.read()
+
+ def test_move_file(self):
+ """Test move method for file."""
+ src_file = self._get_test_path("move-src.txt")
+ dst_file = self._get_test_path("move-dst.txt")
+ test_data = b"move test content"
+
+ # Create source file
+ out_stream = self.fs.open_output_stream(src_file)
+ out_stream.write(test_data)
+ out_stream.close()
+
+ # Move file
+ self.fs.move(src_file, dst_file)
+
+ # Verify source is gone - should raise FileNotFoundError when accessing
+ with self.assertRaises(FileNotFoundError):
+ with self.fs.open_input_file(src_file) as in_file:
+ in_file.read()
+
+ # Verify destination exists with correct content
+ dst_info = self.fs.get_file_info(dst_file)
+ self.assertEqual(dst_info.type, pafs.FileType.File)
+
+ with self.fs.open_input_file(dst_file) as in_file:
+ read_data = in_file.read()
+ self.assertEqual(read_data, test_data)
+
+ def test_move_directory(self):
+ """Test move method for directory."""
+ src_dir = self._get_test_path("move-src-dir/")
+ dst_dir = self._get_test_path("move-dst-dir/")
+ src_file = self._get_test_path("move-src-dir/file.txt")
+
+ # Create source directory and file
+ self.fs.create_dir(src_dir)
+ out_stream = self.fs.open_output_stream(src_file)
+ out_stream.write(b"test content")
+ out_stream.close()
+
+ # Move directory
+ self.fs.move(src_dir, dst_dir)
+
+ # Verify source is gone - should raise FileNotFoundError when accessing
+ with self.assertRaises(FileNotFoundError):
+ # Try to list directory contents
+ selector = pafs.FileSelector(src_dir, recursive=False,
allow_not_found=False)
+ self.fs.get_file_info(selector)
+
+ # Verify destination exists
+ dst_info = self.fs.get_file_info(dst_dir)
+ self.assertEqual(dst_info.type, pafs.FileType.Directory)
+
+ def test_copy_file(self):
+ """Test copy_file method."""
+ src_file = self._get_test_path("copy-src.txt")
+ dst_file = self._get_test_path("copy-dst.txt")
+ test_data = os.urandom(1024 * 1024) # 1MB random data
+
+ # Create source file
+ out_stream = self.fs.open_output_stream(src_file)
+ out_stream.write(test_data)
+ out_stream.close()
+
+ # Copy file
+ self.fs.copy_file(src_file, dst_file)
+
+ # Verify both files exist
+ src_info = self.fs.get_file_info(src_file)
+ self.assertEqual(src_info.type, pafs.FileType.File)
+ dst_info = self.fs.get_file_info(dst_file)
+ self.assertEqual(dst_info.type, pafs.FileType.File)
+
+ # Verify destination content matches source
+ with self.fs.open_input_file(dst_file) as in_file:
+ read_data = in_file.read()
+ self.assertEqual(read_data, test_data)
+ self.assertEqual(len(read_data), len(test_data))
+
+ def test_delete_nonexistent_file(self):
+ """Test deleting non-existent file."""
+ non_existent = self._get_test_path("non-existent-delete.txt")
+
+ # Try to delete non-existent file
+ with self.assertRaises(IOError):
+ self.fs.delete_file(non_existent)
+
+ def test_multiple_sequential_reads(self):
+ """Test multiple sequential reads from same file."""
+ test_file = self._get_test_path("sequential-read.txt")
+ test_data = os.urandom(10000) # 10KB
+
+ # Write file
+ out_stream = self.fs.open_output_stream(test_file)
+ out_stream.write(test_data)
+ out_stream.close()
+
+ # Read in chunks sequentially
+ with self.fs.open_input_file(test_file) as in_file:
+ chunk1 = in_file.read(1000)
+ chunk2 = in_file.read(2000)
+ chunk3 = in_file.read(3000)
+ chunk4 = in_file.read() # Read rest
+
+ # Verify all chunks
+ self.assertEqual(chunk1, test_data[0:1000])
+ self.assertEqual(chunk2, test_data[1000:3000])
+ self.assertEqual(chunk3, test_data[3000:6000])
+ self.assertEqual(chunk4, test_data[6000:])
+
+ # Verify total length
+ total_read = len(chunk1) + len(chunk2) + len(chunk3) + len(chunk4)
+ self.assertEqual(total_read, len(test_data))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/tests/oss_file_io_test.py
b/paimon-python/pypaimon/tests/oss_file_io_test.py
new file mode 100644
index 0000000000..f9602c975e
--- /dev/null
+++ b/paimon-python/pypaimon/tests/oss_file_io_test.py
@@ -0,0 +1,362 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import unittest
+import uuid
+
+import pyarrow.fs as pafs
+
+from pypaimon.common.options import Options
+from pypaimon.common.options.config import OssOptions
+from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
+
+
+class OSSFileIOTest(unittest.TestCase):
+ """Test cases for PyArrowFileIO with OSS."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ # Get OSS credentials from environment variables
+ self.bucket = os.environ.get("OSS_TEST_BUCKET")
+ access_key_id = os.environ.get("OSS_ACCESS_KEY_ID")
+ access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET")
+ endpoint = os.environ.get("OSS_ENDPOINT")
+ oss_impl = os.environ.get("OSS_IMPL", "default")
+
+ if not self.bucket:
+ self.skipTest("test bucket is not configured")
+ return
+ if not access_key_id:
+ self.skipTest("test access key id is not configured")
+ return
+ if not access_key_secret:
+ self.skipTest("test access key secret is not configured")
+ return
+ if not endpoint:
+ self.skipTest("test endpoint is not configured")
+ return
+
+ self.root_path = f"oss://{self.bucket}/"
+
+ self.catalog_options = Options({
+ OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id,
+ OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret,
+ OssOptions.OSS_ENDPOINT.key(): endpoint,
+ OssOptions.OSS_IMPL.key(): oss_impl,
+ })
+
+ # Create PyArrowFileIO instance
+ self.file_io = PyArrowFileIO(self.root_path, self.catalog_options)
+
+ # Create unique test prefix to avoid conflicts
+ self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/"
+
+ def tearDown(self):
+ """Clean up test files and directories."""
+ # Delete the entire test prefix directory
+ test_dir = f"{self.root_path}{self.test_prefix}"
+ try:
+ if self.file_io.exists(test_dir):
+ self.file_io.delete(test_dir, recursive=True)
+ except Exception:
+ pass # Ignore cleanup errors
+
+ def _get_test_path(self, name: str) -> str:
+ """Get a test path under test_prefix."""
+ return f"{self.root_path}{self.test_prefix}{name}"
+
+ def test_get_file_status_directory(self):
+ """Test get_file_status for a directory."""
+ # Create a test directory
+ test_dir = self._get_test_path("test-dir/")
+ self.file_io.mkdirs(test_dir)
+ # Get file status
+ file_status = self.file_io.get_file_status(test_dir)
+ # Verify the returned file status
+ self.assertIsNotNone(file_status)
+ self.assertEqual(file_status.type, pafs.FileType.Directory)
+
+ def test_new_input_stream_read(self):
+ """Test new_input_stream and read method."""
+ # Create test data
+ test_data = b"Hello, World! This is a test file for OSS input stream."
+ test_file = self._get_test_path("test-input-stream.txt")
+
+ # Write test data to file
+ with self.file_io.new_output_stream(test_file) as out_stream:
+ out_stream.write(test_data)
+
+ # Test new_input_stream
+ input_stream = self.file_io.new_input_stream(test_file)
+ self.assertIsNotNone(input_stream)
+
+ # Test read without nbytes (read all)
+ input_stream.seek(0)
+ read_data = input_stream.read()
+ self.assertEqual(read_data, test_data)
+
+ # Test read with nbytes
+ input_stream.seek(0)
+ read_partial = input_stream.read(5)
+ self.assertEqual(read_partial, b"Hello")
+
+ # Test read more bytes
+ read_partial2 = input_stream.read(7)
+ self.assertEqual(read_partial2, b", World")
+
+ # Test read remaining
+ read_remaining = input_stream.read()
+ self.assertEqual(read_remaining, b"! This is a test file for OSS input
stream.")
+
+ # Verify complete data
+ input_stream.seek(0)
+ complete_data = input_stream.read()
+ self.assertEqual(complete_data, test_data)
+
+ # Close the stream
+ input_stream.close()
+
+ # Test context manager
+ with self.file_io.new_input_stream(test_file) as input_stream2:
+ data = input_stream2.read()
+ self.assertEqual(data, test_data)
+
+ def test_new_input_stream_read_large_file(self):
+ """Test new_input_stream with larger file and read operations."""
+ # Create larger test data (1MB)
+ test_data = b"X" * (1024 * 1024)
+ test_file = self._get_test_path("test-large-input-stream.bin")
+
+ # Write test data
+ with self.file_io.new_output_stream(test_file) as out_stream:
+ out_stream.write(test_data)
+
+ # Test reading in chunks
+ chunk_size = 64 * 1024 # 64KB chunks
+ with self.file_io.new_input_stream(test_file) as input_stream:
+ read_chunks = []
+ while True:
+ chunk = input_stream.read(chunk_size)
+ if not chunk:
+ break
+ read_chunks.append(chunk)
+
+ # Verify all data was read
+ read_data = b''.join(read_chunks)
+ self.assertEqual(len(read_data), len(test_data))
+ self.assertEqual(read_data, test_data)
+
+ # Test read_at method if available
+ with self.file_io.new_input_stream(test_file) as input_stream:
+ if hasattr(input_stream, 'read_at'):
+ # Read from middle of file
+ offset = len(test_data) // 2
+ read_at_data = input_stream.read_at(1024, offset)
+ self.assertEqual(len(read_at_data), 1024)
+ self.assertEqual(read_at_data, test_data[offset:offset+1024])
+
+ def test_new_input_stream_file_not_found(self):
+ """Test new_input_stream with non-existent file."""
+ non_existent_file = self._get_test_path("non-existent-file.txt")
+
+ with self.assertRaises(FileNotFoundError):
+ self.file_io.new_input_stream(non_existent_file)
+
+ def test_exists(self):
+ """Test exists method."""
+ missing_uri = self._get_test_path("nonexistent_xyz")
+ self.assertFalse(self.file_io.exists(missing_uri))
+ with self.assertRaises(FileNotFoundError):
+ self.file_io.get_file_status(missing_uri)
+
+ def test_write_file_with_overwrite_flag(self):
+ """Test write_file with overwrite flag."""
+ test_file_uri = self._get_test_path("overwrite_test.txt")
+
+ # 1) Write to a new file with default overwrite=False
+ self.file_io.write_file(test_file_uri, "first content")
+ self.assertTrue(self.file_io.exists(test_file_uri))
+ content = self.file_io.read_file_utf8(test_file_uri)
+ self.assertEqual(content, "first content")
+
+ # 2) Attempt to write again with overwrite=False should raise
FileExistsError
+ with self.assertRaises(FileExistsError):
+ self.file_io.write_file(test_file_uri, "second content",
overwrite=False)
+
+ # Ensure content is unchanged
+ content = self.file_io.read_file_utf8(test_file_uri)
+ self.assertEqual(content, "first content")
+
+ # 3) Write with overwrite=True should replace the content
+ self.file_io.write_file(test_file_uri, "overwritten content",
overwrite=True)
+ content = self.file_io.read_file_utf8(test_file_uri)
+ self.assertEqual(content, "overwritten content")
+
+ def test_exists_does_not_catch_exception(self):
+ """Test that exists does not catch exceptions."""
+ test_file = self._get_test_path("test_file.txt")
+
+ # Write a test file
+ with self.file_io.new_output_stream(test_file) as out_stream:
+ out_stream.write(b"test")
+ self.assertTrue(self.file_io.exists(test_file))
+
self.assertFalse(self.file_io.exists(self._get_test_path("nonexistent.txt")))
+
+ def test_delete_non_empty_directory_raises_error(self):
+ """Test that delete raises error for non-empty directory."""
+ test_dir = self._get_test_path("test_dir/")
+ test_file = self._get_test_path("test_dir/test_file.txt")
+
+ # Create directory and file
+ self.file_io.mkdirs(test_dir)
+ with self.file_io.new_output_stream(test_file) as out_stream:
+ out_stream.write(b"test")
+
+ with self.assertRaises(OSError) as context:
+ self.file_io.delete(test_dir, recursive=False)
+ self.assertIn("is not empty", str(context.exception))
+
+ def test_delete_returns_false_when_file_not_exists(self):
+ """Test that delete returns False when file does not exist."""
+ result = self.file_io.delete(self._get_test_path("nonexistent.txt"))
+ self.assertFalse(result, "delete() should return False when file does
not exist")
+
+ result = self.file_io.delete(self._get_test_path("nonexistent_dir"),
recursive=False)
+ self.assertFalse(result, "delete() should return False when directory
does not exist")
+
+ def test_mkdirs_raises_error_when_path_is_file(self):
+ """Test that mkdirs raises error when path is a file."""
+ test_file = self._get_test_path("test_file.txt")
+
+ # Create a file
+ with self.file_io.new_output_stream(test_file) as out_stream:
+ out_stream.write(b"test")
+
+ with self.assertRaises(FileExistsError) as context:
+ self.file_io.mkdirs(test_file)
+ self.assertIn("is not a directory", str(context.exception))
+
+ def test_rename_returns_false_when_dst_exists(self):
+ """Test that rename returns False when destination exists."""
+ src_file = self._get_test_path("src.txt")
+ dst_file = self._get_test_path("dst.txt")
+
+ # Create source and destination files
+ with self.file_io.new_output_stream(src_file) as out_stream:
+ out_stream.write(b"src")
+ with self.file_io.new_output_stream(dst_file) as out_stream:
+ out_stream.write(b"dst")
+
+ result = self.file_io.rename(src_file, dst_file)
+ self.assertFalse(result)
+
+ def test_get_file_status_raises_error_when_file_not_exists(self):
+ """Test that get_file_status raises error when file does not exist."""
+ with self.assertRaises(FileNotFoundError) as context:
+
self.file_io.get_file_status(self._get_test_path("nonexistent.txt"))
+ self.assertIn("does not exist", str(context.exception))
+
+ test_file = self._get_test_path("test_file.txt")
+ with self.file_io.new_output_stream(test_file) as out_stream:
+ out_stream.write(b"test content")
+
+ file_info = self.file_io.get_file_status(test_file)
+ self.assertEqual(file_info.type, pafs.FileType.File)
+ self.assertIsNotNone(file_info.size)
+
+ with self.assertRaises(FileNotFoundError) as context:
+ self.file_io.get_file_size(self._get_test_path("nonexistent.txt"))
+ self.assertIn("does not exist", str(context.exception))
+
+ with self.assertRaises(FileNotFoundError) as context:
+ self.file_io.is_dir(self._get_test_path("nonexistent_dir"))
+ self.assertIn("does not exist", str(context.exception))
+
+ def test_copy_file(self):
+ """Test copy_file method."""
+ source_file = self._get_test_path("source.txt")
+ target_file = self._get_test_path("target.txt")
+
+ # Create source file
+ with self.file_io.new_output_stream(source_file) as out_stream:
+ out_stream.write(b"source content")
+
+ # Test 1: Raises FileExistsError when target exists and overwrite=False
+ with self.file_io.new_output_stream(target_file) as out_stream:
+ out_stream.write(b"target content")
+
+ with self.assertRaises(FileExistsError) as context:
+ self.file_io.copy_file(source_file, target_file, overwrite=False)
+ self.assertIn("already exists", str(context.exception))
+
+ # Verify target content unchanged
+ with self.file_io.new_input_stream(target_file) as in_stream:
+ content = in_stream.read()
+ self.assertEqual(content, b"target content")
+
+ # Test 2: Overwrites when overwrite=True
+ self.file_io.copy_file(source_file, target_file, overwrite=True)
+ with self.file_io.new_input_stream(target_file) as in_stream:
+ content = in_stream.read()
+ self.assertEqual(content, b"source content")
+
+ # Test 3: Creates parent directory if it doesn't exist
+ target_file_in_subdir = self._get_test_path("subdir/target.txt")
+ self.file_io.copy_file(source_file, target_file_in_subdir,
overwrite=False)
+ self.assertTrue(self.file_io.exists(target_file_in_subdir))
+ with self.file_io.new_input_stream(target_file_in_subdir) as in_stream:
+ content = in_stream.read()
+ self.assertEqual(content, b"source content")
+
+ def test_try_to_write_atomic(self):
+ """Test try_to_write_atomic method."""
+ target_dir = self._get_test_path("target_dir/")
+ normal_file = self._get_test_path("normal_file.txt")
+
+ # Create target directory
+ self.file_io.mkdirs(target_dir)
+ self.assertFalse(
+ self.file_io.try_to_write_atomic(target_dir, "test content"),
+ "PyArrowFileIO should return False when target is a directory")
+
+ # Verify no file was created inside the directory
+ # List directory contents to verify it's empty
+ selector =
pafs.FileSelector(self.file_io.to_filesystem_path(target_dir), recursive=False,
allow_not_found=True)
+ dir_contents = self.file_io.filesystem.get_file_info(selector)
+ self.assertEqual(len(dir_contents), 0, "No file should be created
inside the directory")
+
+ self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test
content"))
+ content = self.file_io.read_file_utf8(normal_file)
+ self.assertEqual(content, "test content")
+
+ # Delete and test again
+ self.file_io.delete(normal_file)
+ self.assertFalse(
+ self.file_io.try_to_write_atomic(target_dir, "test content"),
+ "PyArrowFileIO should return False when target is a directory")
+
+ # Verify no file was created inside the directory
+ dir_contents = self.file_io.filesystem.get_file_info(selector)
+ self.assertEqual(len(dir_contents), 0, "No file should be created
inside the directory")
+
+ self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test
content"))
+ content = self.file_io.read_file_utf8(normal_file)
+ self.assertEqual(content, "test content")
+
+if __name__ == '__main__':
+ unittest.main()