This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 71998d9df4 [#6279] feat (gvfs-fuse): Add gvfs-fuse integration tests 
for big files and open-file flag test cases (#6280)
71998d9df4 is described below

commit 71998d9df4f7af0c5cedcb8ee845721a86d8455b
Author: Yuhui <h...@datastrato.com>
AuthorDate: Wed Feb 5 16:02:19 2025 +0800

    [#6279] feat (gvfs-fuse): Add gvfs-fuse integration tests for big files and 
open-file flag test cases (#6280)
    
    ### What changes were proposed in this pull request?
    
    Add gvfs-fuse integration tests for big files and open-file flag test
    cases
    
    ### Why are the changes needed?
    
    Fix: #6279
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    IT
---
 clients/filesystem-fuse/Makefile                   |   2 +
 clients/filesystem-fuse/src/filesystem.rs          |  20 +-
 clients/filesystem-fuse/src/main.rs                |   2 +-
 clients/filesystem-fuse/src/memory_filesystem.rs   |  35 +-
 clients/filesystem-fuse/src/open_dal_filesystem.rs | 142 +++++-
 .../filesystem-fuse/tests/bin/run_fuse_testers.sh  |  11 +-
 .../filesystem-fuse/tests/bin/run_s3fs_testers.sh  |   9 +
 clients/filesystem-fuse/tests/fuse_test.rs         | 487 +++++++++++++++++----
 8 files changed, 587 insertions(+), 121 deletions(-)

diff --git a/clients/filesystem-fuse/Makefile b/clients/filesystem-fuse/Makefile
index 86dd2f2215..21a97ceec7 100644
--- a/clients/filesystem-fuse/Makefile
+++ b/clients/filesystem-fuse/Makefile
@@ -71,5 +71,7 @@ test-s3:
 test: doc-test
        cargo test --no-fail-fast --all-targets --all-features --workspace
 
+test-all: test test-s3 test-fuse-it    
+
 clean:
        cargo clean
diff --git a/clients/filesystem-fuse/src/filesystem.rs 
b/clients/filesystem-fuse/src/filesystem.rs
index dcf35f8ebc..c0c27a5fbe 100644
--- a/clients/filesystem-fuse/src/filesystem.rs
+++ b/clients/filesystem-fuse/src/filesystem.rs
@@ -297,7 +297,7 @@ pub trait FileWriter: Sync + Send {
 #[cfg(test)]
 pub(crate) mod tests {
     use super::*;
-    use libc::{O_APPEND, O_CREAT, O_RDONLY};
+    use libc::{O_CREAT, O_RDONLY, O_WRONLY};
     use std::collections::HashMap;
     use std::path::Component;
 
@@ -461,7 +461,11 @@ pub(crate) mod tests {
 
             // Test create file
             let file_handle = self
-                .test_create_file(parent_file_id, "file1.txt".as_ref())
+                .test_create_file(
+                    parent_file_id,
+                    "file1.txt".as_ref(),
+                    (O_CREAT | O_WRONLY) as u32,
+                )
                 .await;
 
             // Test write file
@@ -545,11 +549,13 @@ pub(crate) mod tests {
             self.files.insert(file_stat.file_id, file_stat);
         }
 
-        async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) 
-> FileHandle {
-            let file = self
-                .fs
-                .create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32)
-                .await;
+        async fn test_create_file(
+            &mut self,
+            root_file_id: u64,
+            name: &OsStr,
+            flags: u32,
+        ) -> FileHandle {
+            let file = self.fs.create_file(root_file_id, name, flags).await;
             assert!(file.is_ok());
             let file = file.unwrap();
             assert!(file.handle_id > 0);
diff --git a/clients/filesystem-fuse/src/main.rs 
b/clients/filesystem-fuse/src/main.rs
index 3534e03346..4e517c76b3 100644
--- a/clients/filesystem-fuse/src/main.rs
+++ b/clients/filesystem-fuse/src/main.rs
@@ -24,7 +24,7 @@ use tokio::signal;
 
 #[tokio::main]
 async fn main() -> fuse3::Result<()> {
-    tracing_subscriber::fmt().init();
+    tracing_subscriber::fmt::init();
 
     // todo need inmprove the args parsing
     let args: Vec<String> = std::env::args().collect();
diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs 
b/clients/filesystem-fuse/src/memory_filesystem.rs
index f56e65ea33..d038539072 100644
--- a/clients/filesystem-fuse/src/memory_filesystem.rs
+++ b/clients/filesystem-fuse/src/memory_filesystem.rs
@@ -91,7 +91,7 @@ impl PathFileSystem for MemoryFileSystem {
         Ok(results)
     }
 
-    async fn open_file(&self, path: &Path, _flags: OpenFileFlags) -> 
Result<OpenedFile> {
+    async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
         let file_stat = self.stat(path).await?;
         let mut opened_file = OpenedFile::new(file_stat);
         match opened_file.file_stat.kind {
@@ -105,8 +105,17 @@ impl PathFileSystem for MemoryFileSystem {
                     .unwrap()
                     .data
                     .clone();
-                opened_file.reader = Some(Box::new(MemoryFileReader { data: 
data.clone() }));
-                opened_file.writer = Some(Box::new(MemoryFileWriter { data: 
data }));
+                if flags.is_read() {
+                    opened_file.reader = Some(Box::new(MemoryFileReader { 
data: data.clone() }));
+                }
+                if flags.is_write() || flags.is_append() || 
flags.is_truncate() {
+                    opened_file.writer = Some(Box::new(MemoryFileWriter { 
data: data.clone() }));
+                }
+
+                if flags.is_truncate() {
+                    let mut data = data.lock().unwrap();
+                    data.clear();
+                }
                 Ok(opened_file)
             }
             _ => Err(Errno::from(libc::EBADF)),
@@ -117,27 +126,19 @@ impl PathFileSystem for MemoryFileSystem {
         self.open_file(path, flags).await
     }
 
-    async fn create_file(&self, path: &Path, _flags: OpenFileFlags) -> 
Result<OpenedFile> {
-        let mut file_map = self.file_map.write().unwrap();
-        if file_map.contains_key(path) {
+    async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
+        if self.file_map.read().unwrap().contains_key(path) && 
flags.is_exclusive() {
             return Err(Errno::from(libc::EEXIST));
         }
 
-        let mut opened_file = 
OpenedFile::new(FileStat::new_file_filestat_with_path(path, 0));
-
-        let data = Arc::new(Mutex::new(Vec::new()));
-        file_map.insert(
-            opened_file.file_stat.path.clone(),
+        self.file_map.write().unwrap().insert(
+            path.to_path_buf(),
             MemoryFile {
                 kind: RegularFile,
-                data: data.clone(),
+                data: Arc::new(Mutex::new(Vec::new())),
             },
         );
-
-        opened_file.reader = Some(Box::new(MemoryFileReader { data: 
data.clone() }));
-        opened_file.writer = Some(Box::new(MemoryFileWriter { data: data }));
-
-        Ok(opened_file)
+        self.open_file(path, flags).await
     }
 
     async fn create_dir(&self, path: &Path) -> Result<FileStat> {
diff --git a/clients/filesystem-fuse/src/open_dal_filesystem.rs 
b/clients/filesystem-fuse/src/open_dal_filesystem.rs
index d32b014d1f..9e094873f5 100644
--- a/clients/filesystem-fuse/src/open_dal_filesystem.rs
+++ b/clients/filesystem-fuse/src/open_dal_filesystem.rs
@@ -26,19 +26,26 @@ use bytes::Bytes;
 use fuse3::FileType::{Directory, RegularFile};
 use fuse3::{Errno, FileType, Timestamp};
 use log::error;
-use opendal::{EntryMode, ErrorKind, Metadata, Operator};
+use opendal::{Buffer, EntryMode, ErrorKind, Metadata, Operator};
+use std::mem::swap;
 use std::path::{Path, PathBuf};
 use std::time::SystemTime;
 
 pub(crate) struct OpenDalFileSystem {
     op: Operator,
+    block_size: u32,
 }
 
 impl OpenDalFileSystem {}
 
 impl OpenDalFileSystem {
-    pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context: 
&FileSystemContext) -> Self {
-        Self { op: op }
+    const WRITE_BUFFER_SIZE: usize = 5 * 1024 * 1024;
+
+    pub(crate) fn new(op: Operator, config: &AppConfig, _fs_context: 
&FileSystemContext) -> Self {
+        Self {
+            op: op,
+            block_size: config.filesystem.block_size,
+        }
     }
 
     fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut 
FileStat) {
@@ -120,14 +127,30 @@ impl PathFileSystem for OpenDalFileSystem {
                 .map_err(opendal_error_to_errno)?;
             file.reader = Some(Box::new(FileReaderImpl { reader }));
         }
-        if flags.is_write() || flags.is_create() || flags.is_append() || 
flags.is_truncate() {
+        if !flags.is_create() && flags.is_append() {
+            error!("The file system does not support open a exists file with 
the append mode");
+            return Err(Errno::from(libc::EINVAL));
+        }
+
+        if flags.is_truncate() {
+            self.op
+                .write(&file_name, Buffer::new())
+                .await
+                .map_err(opendal_error_to_errno)?;
+        }
+
+        if flags.is_write() || flags.is_append() || flags.is_truncate() {
             let writer = self
                 .op
                 .writer_with(&file_name)
                 .await
                 .map_err(opendal_error_to_errno)?;
-            file.writer = Some(Box::new(FileWriterImpl { writer }));
+            file.writer = Some(Box::new(FileWriterImpl::new(
+                writer,
+                Self::WRITE_BUFFER_SIZE + self.block_size as usize,
+            )));
         }
+
         Ok(file)
     }
 
@@ -141,15 +164,17 @@ impl PathFileSystem for OpenDalFileSystem {
 
     async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> 
Result<OpenedFile> {
         let file_name = path.to_string_lossy().to_string();
+        if flags.is_exclusive() {
+            let meta = self.op.stat(&file_name).await;
+            if meta.is_ok() {
+                return Err(Errno::from(libc::EEXIST));
+            }
+        }
 
-        let mut writer = self
-            .op
-            .writer_with(&file_name)
+        self.op
+            .write(&file_name, Buffer::new())
             .await
             .map_err(opendal_error_to_errno)?;
-
-        writer.close().await.map_err(opendal_error_to_errno)?;
-
         let file = self.open_file(path, flags).await?;
         Ok(file)
     }
@@ -210,19 +235,45 @@ impl FileReader for FileReaderImpl {
 
 struct FileWriterImpl {
     writer: opendal::Writer,
+    buffer: Vec<u8>,
+    buffer_size: usize,
+}
+
+impl FileWriterImpl {
+    fn new(writer: opendal::Writer, buffer_size: usize) -> Self {
+        Self {
+            writer,
+            buffer_size: buffer_size,
+            buffer: Vec::with_capacity(buffer_size),
+        }
+    }
 }
 
 #[async_trait]
 impl FileWriter for FileWriterImpl {
     async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
-        self.writer
-            .write(data.to_vec())
-            .await
-            .map_err(opendal_error_to_errno)?;
+        if self.buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
+            let mut new_buffer: Vec<u8> = Vec::with_capacity(self.buffer_size);
+            swap(&mut new_buffer, &mut self.buffer);
+
+            self.writer
+                .write(new_buffer)
+                .await
+                .map_err(opendal_error_to_errno)?;
+        }
+        self.buffer.extend(data);
         Ok(data.len() as u32)
     }
 
     async fn close(&mut self) -> Result<()> {
+        if !self.buffer.is_empty() {
+            let mut new_buffer: Vec<u8> = vec![];
+            swap(&mut new_buffer, &mut self.buffer);
+            self.writer
+                .write(new_buffer)
+                .await
+                .map_err(opendal_error_to_errno)?;
+        }
         self.writer.close().await.map_err(opendal_error_to_errno)?;
         Ok(())
     }
@@ -260,10 +311,12 @@ fn opendal_filemode_to_filetype(mode: EntryMode) -> 
FileType {
 #[cfg(test)]
 mod test {
     use crate::config::AppConfig;
+    use crate::open_dal_filesystem::OpenDalFileSystem;
     use crate::s3_filesystem::extract_s3_config;
     use crate::s3_filesystem::tests::s3_test_config;
     use crate::test_enable_with;
     use crate::RUN_TEST_WITH_S3;
+    use bytes::Buf;
     use opendal::layers::LoggingLayer;
     use opendal::{services, Builder, Operator};
 
@@ -327,4 +380,63 @@ mod test {
             }
         }
     }
+
+    #[tokio::test]
+    async fn s3_ut_test_s3_write() {
+        test_enable_with!(RUN_TEST_WITH_S3);
+        let config = s3_test_config();
+
+        let op = create_opendal(&config);
+        let path = "/s1/fileset1/gvfs_test/test_dir/test_file";
+        let mut writer = op.writer_with(path).await.unwrap();
+
+        let mut buffer: Vec<u8> = vec![];
+        let num_batch = 10 * 1024;
+        for i in 0..num_batch {
+            let data = vec![i as u8; num_batch];
+            buffer.extend(&data);
+
+            if buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
+                writer.write(buffer).await.unwrap();
+                buffer = vec![];
+            };
+        }
+
+        if !buffer.is_empty() {
+            writer.write(buffer).await.unwrap();
+        }
+        writer.close().await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn s3_ut_test_s3_read() {
+        test_enable_with!(RUN_TEST_WITH_S3);
+        let config = s3_test_config();
+
+        let op = create_opendal(&config);
+        let path = "/s1/fileset1/test_dir/test_big_file";
+        let meta = op.stat(path).await;
+        if meta.is_err() {
+            println!("stat error: {:?}", meta.err());
+            return;
+        }
+        let reader = op.reader(path).await.unwrap();
+
+        let mut buffer = Vec::new();
+
+        let batch_size = 1024;
+        let mut start = 0;
+        let mut end = batch_size;
+        loop {
+            let buf = reader.read(start..end).await.unwrap();
+            if buf.is_empty() {
+                break;
+            }
+            buffer.extend_from_slice(buf.chunk());
+            start = end;
+            end += batch_size;
+        }
+
+        println!("Read {} bytes.", buffer.len());
+    }
 }
diff --git a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh 
b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
index 6dc38c48f0..7088a310b5 100755
--- a/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
+++ b/clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
@@ -50,13 +50,22 @@ if [ "$1" == "test" ]; then
   echo "Running tests..."
   cd $CLIENT_FUSE_DIR
   export RUN_TEST_WITH_FUSE=1
-  cargo test --test fuse_test fuse_it_
+  cargo test --test fuse_test fuse_it_ -- weak_consistency
 
 elif [ "$1" == "start" ]; then
   # Start the servers
   echo "Starting servers..."
   start_servers
 
+elif [ "$1" == "restart" ]; then
+  # Stop the servers
+  echo "Stopping servers..."
+  stop_servers
+
+  # Start the servers
+  echo "Starting servers..."
+  start_servers
+
 elif [ "$1" == "stop" ]; then
   # Stop the servers
   echo "Stopping servers..."
diff --git a/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh 
b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
index ac5f9812c9..8f25c0b395 100644
--- a/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
+++ b/clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
@@ -51,6 +51,15 @@ elif [ "$1" == "start" ]; then
   echo "Starting servers..."
   start_servers
 
+elif [ "$1" == "restart" ]; then
+  # Stop the servers
+  echo "Stopping servers..."
+  stop_servers
+
+  # Start the servers
+  echo "Starting servers..."
+  start_servers
+
 elif [ "$1" == "stop" ]; then
   # Stop the servers
   echo "Stopping servers..."
diff --git a/clients/filesystem-fuse/tests/fuse_test.rs 
b/clients/filesystem-fuse/tests/fuse_test.rs
index 41e385c49f..1d1ef80b78 100644
--- a/clients/filesystem-fuse/tests/fuse_test.rs
+++ b/clients/filesystem-fuse/tests/fuse_test.rs
@@ -22,29 +22,32 @@ use gvfs_fuse::config::AppConfig;
 use gvfs_fuse::RUN_TEST_WITH_FUSE;
 use gvfs_fuse::{gvfs_mount, gvfs_unmount, test_enable_with};
 use log::{error, info};
-use std::fs::File;
-use std::path::Path;
-use std::sync::Arc;
-use std::thread::sleep;
+use once_cell::sync::Lazy;
+use std::collections::HashSet;
+use std::fs::{File, OpenOptions};
+use std::io::{Read, Write};
+use std::path::{Path, PathBuf};
 use std::time::{Duration, Instant};
-use std::{fs, panic, process};
+use std::{env, fs};
 use tokio::runtime::Runtime;
 use tokio::task::JoinHandle;
+use tokio::time::interval;
 
-struct FuseTest {
-    runtime: Arc<Runtime>,
+static ASYNC_RUNTIME: Lazy<Runtime> = Lazy::new(|| Runtime::new().unwrap());
+
+struct FuseTestEnv {
     mount_point: String,
     gvfs_mount: Option<JoinHandle<fuse3::Result<()>>>,
 }
 
-impl FuseTest {
+impl FuseTestEnv {
     pub fn setup(&mut self) {
         info!("Start gvfs fuse server");
         let mount_point = self.mount_point.clone();
 
         let config = 
AppConfig::from_file(Some("tests/conf/gvfs_fuse_memory.toml"))
             .expect("Failed to load config");
-        self.runtime.spawn(async move {
+        ASYNC_RUNTIME.spawn(async move {
             let result = gvfs_mount(&mount_point, "", &config).await;
             if let Err(e) = result {
                 error!("Failed to mount gvfs: {:?}", e);
@@ -57,116 +60,440 @@ impl FuseTest {
     }
 
     pub fn shutdown(&mut self) {
-        self.runtime.block_on(async {
+        ASYNC_RUNTIME.block_on(async {
             let _ = gvfs_unmount().await;
         });
     }
 
     fn wait_for_fuse_server_ready(path: &str, timeout: Duration) -> bool {
         let test_file = format!("{}/.gvfs_meta", path);
-        let start_time = Instant::now();
+        AwaitUtil::wait(timeout, Duration::from_millis(500), || {
+            file_exists(&test_file)
+        })
+    }
+}
+
+struct AwaitUtil();
 
-        while start_time.elapsed() < timeout {
-            if file_exists(&test_file) {
-                info!("Fuse server is ready",);
-                return true;
+impl AwaitUtil {
+    pub(crate) fn wait(
+        max_wait: Duration,
+        poll_interval: Duration,
+        check_fn: impl Fn() -> bool + Send,
+    ) -> bool {
+        ASYNC_RUNTIME.block_on(async {
+            let start = Instant::now();
+            let mut interval = interval(poll_interval);
+
+            while start.elapsed() < max_wait {
+                interval.tick().await;
+                if check_fn() {
+                    return true;
+                }
             }
-            info!("Wait for fuse server ready",);
-            sleep(Duration::from_secs(1));
-        }
-        false
+            false
+        })
     }
 }
 
-impl Drop for FuseTest {
+impl Drop for FuseTestEnv {
     fn drop(&mut self) {
         info!("Shutdown fuse server");
         self.shutdown();
     }
 }
 
-#[test]
-fn test_fuse_with_memory_fs() {
-    tracing_subscriber::fmt().init();
+struct SequenceFileOperationTest {
+    test_dir: PathBuf,
+    weak_consistency: bool,
+}
 
-    panic::set_hook(Box::new(|info| {
-        error!("A panic occurred: {:?}", info);
-        process::exit(1);
-    }));
+impl SequenceFileOperationTest {
+    fn new(test_dir: &Path) -> Self {
+        let args: Vec<String> = env::args().collect();
+        let weak_consistency = args.contains(&"weak_consistency".to_string());
 
-    let mount_point = "target/gvfs";
-    let _ = fs::create_dir_all(mount_point);
+        SequenceFileOperationTest {
+            test_dir: test_dir.to_path_buf(),
+            weak_consistency: weak_consistency,
+        }
+    }
+    fn test_create_file(&self, name: &str, open_options: Option<&OpenOptions>) 
-> File {
+        let path = self.test_dir.join(name);
+        let file = {
+            match open_options {
+                None => File::create(&path)
+                    .unwrap_or_else(|_| panic!("Failed to create file: {:?}", 
path)),
+                Some(options) => options.open(&path).unwrap_or_else(|_| {
+                    panic!(
+                        "Failed to create file: {:?},
+                        options {:?}",
+                        path, options
+                    )
+                }),
+            }
+        };
+        let file_metadata = file
+            .metadata()
+            .unwrap_or_else(|_| panic!("Failed to get file metadata: {:?}", 
path));
+        assert!(file_exists(path));
+        if !self.weak_consistency {
+            assert_eq!(file_metadata.len(), 0);
+        }
+        file
+    }
 
-    let mut test = FuseTest {
-        runtime: Arc::new(Runtime::new().unwrap()),
-        mount_point: mount_point.to_string(),
-        gvfs_mount: None,
-    };
+    fn test_open_file(&self, name: &str, open_options: Option<&OpenOptions>) 
-> File {
+        let path = self.test_dir.join(name);
+        let file = {
+            match open_options {
+                None => {
+                    File::open(&path).unwrap_or_else(|_| panic!("Failed to 
open file: {:?}", path))
+                }
+                Some(options) => options.open(&path).unwrap_or_else(|_| {
+                    panic!(
+                        "Failed to open file: {:?},
+                        options {:?}",
+                        path, options
+                    )
+                }),
+            }
+        };
+        let file_metadata = file
+            .metadata()
+            .unwrap_or_else(|_| panic!("Failed to get file metadata: {:?}", 
path));
+        assert!(file_metadata.is_file());
+        assert!(file_exists(path));
+        file
+    }
 
-    test.setup();
-    test_fuse_filesystem(mount_point);
-}
+    fn test_read_file(&self, file: &mut File, expect: &[u8]) {
+        let mut content = vec![0; expect.len()];
+        file.read_exact(&mut content).expect("Failed to read file");
+        assert_eq!(content, *expect, "File content mismatch");
+    }
 
-#[test]
-fn fuse_it_test_fuse() {
-    test_enable_with!(RUN_TEST_WITH_FUSE);
+    fn test_read_data(&self, file: &mut File, len: usize) -> Vec<u8> {
+        let mut content = vec![0; len];
+        file.read_exact(&mut content).expect("Failed to read file");
+        content
+    }
 
-    test_fuse_filesystem("target/gvfs/gvfs_test");
-}
+    fn test_append_file(&self, file: &mut File, content: &[u8]) {
+        let old_len = file.metadata().unwrap().len();
+        let size = content.len();
+        file.write_all(content).expect("Failed to write file");
+
+        if !self.weak_consistency {
+            let new_len = file.metadata().unwrap().len();
+            assert_eq!(new_len, old_len + size as u64, "File size mismatch");
+        }
+    }
+
+    fn test_remove_file(&self, name: &str) {
+        let path = self.test_dir.join(name);
+        fs::remove_file(&path).unwrap_or_else(|_| panic!("Failed to remove 
file: {:?}", path));
+        assert!(!file_exists(path));
+    }
+
+    fn test_create_dir(&self, name: &str) {
+        let path = self.test_dir.join(name);
+        fs::create_dir(&path).unwrap_or_else(|_| panic!("Failed to create 
directory: {:?}", path));
+        assert!(file_exists(path));
+    }
+
+    fn test_list_dir_with_expect(&self, name: &str, expect_childs: &Vec<&str>) 
{
+        self.test_list_dir(name, expect_childs, &vec![]);
+    }
+
+    fn test_list_dir_with_unexpected(&self, name: &str, unexpected_childs: 
&Vec<&str>) {
+        self.test_list_dir(name, &vec![], unexpected_childs);
+    }
+
+    fn test_list_dir(&self, name: &str, expect_childs: &Vec<&str>, 
unexpected_childs: &Vec<&str>) {
+        let path = self.test_dir.join(name);
+        let dir_childs =
+            fs::read_dir(&path).unwrap_or_else(|_| panic!("Failed to list 
directory: {:?}", path));
+        let mut childs_set: HashSet<String> = HashSet::default();
+        for child in dir_childs {
+            let entry = child.expect("Failed to get entry");
+            childs_set.insert(entry.file_name().to_string_lossy().to_string());
+        }
+        for expect_child in expect_childs {
+            assert!(
+                childs_set.contains(*expect_child),
+                "Expect child not found: {}",
+                expect_child
+            );
+        }
+
+        for unexpected_child in unexpected_childs {
+            assert!(
+                !childs_set.contains(*unexpected_child),
+                "Unexpected child found: {}",
+                unexpected_child
+            );
+        }
+    }
+
+    fn test_remove_dir(&self, name: &str) {
+        let path = self.test_dir.join(name);
+        fs::remove_dir(&path).unwrap_or_else(|_| panic!("Failed to remove 
directory: {:?}", path));
+        assert!(!file_exists(path));
+    }
 
-fn test_fuse_filesystem(mount_point: &str) {
-    info!("Test startup");
-    let base_path = Path::new(mount_point);
+    // some file storage can't sync file immediately, so we need to sync file 
to make sure the file is written to disk
+    fn sync_file(&self, file: File, name: &str, expect_len: u64) -> Result<(), 
()> {
+        if !self.weak_consistency {
+            return Ok(());
+        }
+        drop(file);
 
-    if !file_exists(base_path) {
-        fs::create_dir_all(base_path).expect("Failed to create test dir");
+        let path = self.test_dir.join(name);
+        let success = AwaitUtil::wait(Duration::from_secs(3), 
Duration::from_millis(200), || {
+            let file =
+                File::open(&path).unwrap_or_else(|_| panic!("Failed to open 
file: {:?}", path));
+            let file_len = file.metadata().unwrap().len();
+            file_len >= expect_len
+        });
+        if !success {
+            return Err(());
+        }
+        Ok(())
     }
 
-    //test create file
-    let test_file = base_path.join("test_create");
-    let file = File::create(&test_file).expect("Failed to create file");
-    assert!(file.metadata().is_ok(), "Failed to get file metadata");
-    assert!(file_exists(&test_file));
+    fn test_basic_filesystem(fs_test: &SequenceFileOperationTest) {
+        let file_name1 = "test_create";
+        //test create file
+        let mut file1 = fs_test.test_create_file(file_name1, None);
+
+        //test write file
+        let content = "write test".as_bytes();
+        fs_test.test_append_file(&mut file1, content);
+        fs_test
+            .sync_file(file1, file_name1, content.len() as u64)
+            .expect("Failed to sync file");
+
+        //test read file
+        let mut file1 = fs_test.test_open_file(file_name1, None);
+        fs_test.test_read_file(&mut file1, content);
+
+        //test delete file
+        fs_test.test_remove_file(file_name1);
+
+        //test create directory
+        let dir_name1 = "test_dir";
+        fs_test.test_create_dir(dir_name1);
 
-    //test write file
-    fs::write(&test_file, "read test").expect("Failed to write file");
+        //test create file in directory
+        let test_file2 = "test_dir/test_file";
+        let mut file2 = fs_test.test_create_file(test_file2, None);
 
-    //test read file
-    let content = fs::read_to_string(&test_file).expect("Failed to read file");
-    assert_eq!(content, "read test", "File content mismatch");
+        //test write file in directory
+        fs_test.test_append_file(&mut file2, content);
+        fs_test
+            .sync_file(file2, test_file2, content.len() as u64)
+            .expect("Failed to sync file");
 
-    //test delete file
-    fs::remove_file(&test_file).expect("Failed to delete file");
-    assert!(!file_exists(&test_file));
+        //test read file in directory
+        let mut file2 = fs_test.test_open_file(test_file2, None);
+        fs_test.test_read_file(&mut file2, content);
 
-    //test create directory
-    let test_dir = base_path.join("test_dir");
-    fs::create_dir(&test_dir).expect("Failed to create directory");
+        //test list directory
+        fs_test.test_list_dir_with_expect(dir_name1, &vec!["test_file"]);
 
-    //test create file in directory
-    let test_file = base_path.join("test_dir/test_file");
-    let file = File::create(&test_file).expect("Failed to create file");
-    assert!(file.metadata().is_ok(), "Failed to get file metadata");
+        //test delete file in directory
+        fs_test.test_remove_file(test_file2);
 
-    //test write file in directory
-    let test_file = base_path.join("test_dir/test_read");
-    fs::write(&test_file, "read test").expect("Failed to write file");
+        //test list directory after delete file
+        fs_test.test_list_dir_with_unexpected(dir_name1, &vec!["test_file"]);
 
-    //test read file in directory
-    let content = fs::read_to_string(&test_file).expect("Failed to read file");
-    assert_eq!(content, "read test", "File content mismatch");
+        //test delete directory
+        fs_test.test_remove_dir(dir_name1);
+    }
+
+    #[allow(clippy::needless_range_loop)]
+    fn test_big_file(fs_test: &SequenceFileOperationTest) {
+        let test_file = "test_big_file";
+        let round_size: usize = 1024 * 1024;
+        let round: u8 = 1;
+
+        //test write big file
+        {
+            let mut file = fs_test.test_create_file(test_file, None);
+
+            for i in 0..round {
+                let mut content = vec![0; round_size];
+                for j in 0..round_size {
+                    content[j] = (i as usize + j) as u8;
+                }
 
-    //test delete file in directory
-    fs::remove_file(&test_file).expect("Failed to delete file");
-    assert!(!file_exists(&test_file));
+                fs_test.test_append_file(&mut file, &content);
+            }
+            fs_test
+                .sync_file(file, test_file, round_size as u64 * round as u64)
+                .expect("Failed to sync file");
+        }
 
-    //test delete directory
-    fs::remove_dir_all(&test_dir).expect("Failed to delete directory");
-    assert!(!file_exists(&test_dir));
+        //test read big file
+        {
+            let mut file = fs_test.test_open_file(test_file, None);
+            for i in 0..round {
+                let buffer = fs_test.test_read_data(&mut file, round_size);
 
-    info!("Success test");
+                for j in 0..round_size {
+                    assert_eq!(buffer[j], (i as usize + j) as u8, "File 
content mismatch");
+                }
+            }
+        }
+
+        fs_test.test_remove_file(test_file);
+    }
+
+    fn test_open_file_flag(fs_test: &SequenceFileOperationTest) {
+        let write_content = "write content";
+        {
+            // test open file with read and write create flag
+            let file_name = "test_open_file";
+            let mut file = fs_test.test_create_file(
+                file_name,
+                Some(OpenOptions::new().read(true).write(true).create(true)),
+            );
+
+            // test write can be done
+            fs_test.test_append_file(&mut file, write_content.as_bytes());
+
+            // test read end of file
+            let result = file.read_exact(&mut [1]);
+            assert!(result.is_err());
+            if let Err(e) = result {
+                assert_eq!(e.to_string(), "failed to fill whole buffer");
+            }
+        }
+
+        {
+            // test open file with write flag
+            let file_name = "test_open_file2";
+            let mut file = fs_test
+                .test_create_file(file_name, 
Some(OpenOptions::new().write(true).create(true)));
+
+            // test write can be done
+            fs_test.test_append_file(&mut file, write_content.as_bytes());
+
+            // test read can be have error
+            let result = file.read(&mut [0; 10]);
+            assert!(result.is_err());
+            if let Err(e) = result {
+                assert_eq!(e.to_string(), "Bad file descriptor (os error 9)");
+            }
+        }
+
+        {
+            // test open file with read flag
+            let file_name = "test_open_file2";
+            let mut file = fs_test.test_open_file(file_name, 
Some(OpenOptions::new().read(true)));
+
+            // test read can be done
+            fs_test.test_read_file(&mut file, write_content.as_bytes());
+
+            // test write can be have error
+            let result = file.write_all(write_content.as_bytes());
+            assert!(result.is_err());
+            if let Err(e) = result {
+                assert_eq!(e.to_string(), "Bad file descriptor (os error 9)");
+            }
+        }
+
+        {
+            // test open file with truncate file
+            let file_name = "test_open_file2";
+            let file = fs_test.test_open_file(
+                file_name,
+                Some(OpenOptions::new().write(true).truncate(true)),
+            );
+
+            // test file size is 0
+            assert_eq!(file.metadata().unwrap().len(), 0);
+        }
+
+        {
+            // test open file with append flag
+            let file_name = "test_open_file";
+
+            // opendal_fs does not support open and appand
+            let result = OpenOptions::new()
+                .append(true)
+                .open(fs_test.test_dir.join(file_name));
+            if let Err(e) = result {
+                assert_eq!(e.to_string(), "Invalid argument (os error 22)");
+                return;
+            }
+
+            let mut file = fs_test.test_open_file(file_name, 
Some(OpenOptions::new().append(true)));
+
+            assert_eq!(file.metadata().unwrap().len(), write_content.len() as 
u64);
+
+            // test append
+            fs_test.test_append_file(&mut file, write_content.as_bytes());
+            let file_len = file.metadata().unwrap().len();
+            assert_eq!(file_len, 2 * write_content.len() as u64);
+        }
+    }
 }
 
 fn file_exists<P: AsRef<Path>>(path: P) -> bool {
     fs::metadata(path).is_ok()
 }
+
+fn run_tests(test_dir: &Path) {
+    fs::create_dir_all(test_dir).expect("Failed to create test dir");
+
+    let fs_test = SequenceFileOperationTest::new(test_dir);
+
+    info!("test_fuse_filesystem started");
+    SequenceFileOperationTest::test_basic_filesystem(&fs_test);
+    info!("testtest_fuse_filesystem finished");
+
+    info!("test_big_file started");
+    SequenceFileOperationTest::test_big_file(&fs_test);
+    info!("test_big_file finished");
+
+    info!("test_open_file_flag started");
+    SequenceFileOperationTest::test_open_file_flag(&fs_test);
+    info!("test_open_file_flag finished");
+}
+
+fn test_manually() {
+    let mount_point = Path::new("target/gvfs");
+    let test_dir = mount_point.join("test_dir");
+    run_tests(&test_dir);
+}
+
+#[test]
+fn fuse_it_test_fuse() {
+    test_enable_with!(RUN_TEST_WITH_FUSE);
+    tracing_subscriber::fmt().init();
+
+    let mount_point = Path::new("target/gvfs");
+    let test_dir = mount_point.join("test_dir");
+
+    run_tests(&test_dir);
+}
+
+#[test]
+fn test_fuse_with_memory_fs() {
+    tracing_subscriber::fmt().init();
+
+    let mount_point = "target/gvfs";
+    let _ = fs::create_dir_all(mount_point);
+
+    let mut test = FuseTestEnv {
+        mount_point: mount_point.to_string(),
+        gvfs_mount: None,
+    };
+
+    test.setup();
+
+    let test_dir = Path::new(&test.mount_point).join("test_dir");
+    run_tests(&test_dir);
+}


Reply via email to