FANNG1 commented on code in PR #5878: URL: https://github.com/apache/gravitino/pull/5878#discussion_r1890215836
########## clients/filesystem-fuse/src/filesystem.rs: ########## @@ -239,3 +364,412 @@ pub trait FileWriter: Sync + Send { Ok(()) } } + +/// SimpleFileSystem is a simple implementation for the file system. +/// it is used to manage the file metadata and file handle. +/// The operations of the file system are implemented by the PathFileSystem. +/// Note: This class is not use in the production code, it is used for the demo and testing +pub struct DefaultRawFileSystem<T: PathFileSystem> { + /// file entries + file_entry_manager: RwLock<FileEntryManager>, + /// opened files + opened_file_manager: OpenedFileManager, + /// inode id generator + file_id_generator: AtomicU64, + + /// real filesystem + fs: T, +} + +impl<T: PathFileSystem> DefaultRawFileSystem<T> { + const INITIAL_FILE_ID: u64 = 10000; + const ROOT_DIR_PARENT_FILE_ID: u64 = 1; + const ROOT_DIR_FILE_ID: u64 = 1; + const ROOT_DIR_NAME: &'static str = ""; + + pub(crate) fn new(fs: T) -> Self { + Self { + file_entry_manager: RwLock::new(FileEntryManager::new()), + opened_file_manager: OpenedFileManager::new(), + file_id_generator: AtomicU64::new(Self::INITIAL_FILE_ID), + fs, + } + } + + fn next_file_id(&self) -> u64 { + self.file_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + async fn get_file_entry(&self, file_id: u64) -> Result<FileEntry> { + self.file_entry_manager + .read() + .await + .get_file_by_id(file_id) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn get_file_entry_by_path(&self, path: &str) -> Option<FileEntry> { + self.file_entry_manager.read().await.get_file_by_path(path) + } + + async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat, parent_file_id: u64) { + let mut file_manager = self.file_entry_manager.write().await; + let file_entry = file_manager.get_file_by_path(&file_stat.path); + match file_entry { + None => { + // allocate new file id + file_stat.set_file_id(parent_file_id, self.next_file_id()); + file_manager.insert(file_stat.parent_file_id, file_stat.file_id, &file_stat.path); + } + Some(file) => { + // use the exist file id + file_stat.set_file_id(file.parent_file_id, file.file_id); + } + } + } + + async fn open_file_internal( + &self, + file_id: u64, + flags: u32, + kind: FileType, + ) -> Result<FileHandle> { + let file_entry = self.get_file_entry(file_id).await?; + + let mut opened_file = { + match kind { + FileType::Directory => { + self.fs + .open_dir(&file_entry.path, OpenFileFlags(flags)) + .await? + } + FileType::RegularFile => { + self.fs + .open_file(&file_entry.path, OpenFileFlags(flags)) + .await? + } + _ => return Err(Errno::from(libc::EINVAL)), + } + }; + // set the exists file id + opened_file.set_file_id(file_entry.parent_file_id, file_id); + let file = self.opened_file_manager.put_file(opened_file); + let file = file.lock().await; + Ok(file.file_handle()) + } +} + +#[async_trait] +impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> { + async fn init(&self) -> Result<()> { + // init root directory + self.file_entry_manager.write().await.insert( + Self::ROOT_DIR_PARENT_FILE_ID, + Self::ROOT_DIR_FILE_ID, + Self::ROOT_DIR_NAME, + ); + self.fs.init().await + } + + async fn get_file_path(&self, file_id: u64) -> String { + let file_entry = self.get_file_entry(file_id).await; + file_entry + .map(|x| x.path) + .unwrap_or_else(|_| "".to_string()) + } + + async fn valid_file_handle_id(&self, file_id: u64, fh: u64) -> Result<()> { + let fh_file_id = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))? + .lock() + .await + .file_stat + .file_id; + + (file_id == fh_file_id) + .then_some(()) + .ok_or(Errno::from(libc::EBADF)) + } + + async fn stat(&self, file_id: u64) -> Result<FileStat> { + let file_entry = self.get_file_entry(file_id).await?; + let mut file_stat = self.fs.stat(&file_entry.path).await?; + file_stat.set_file_id(file_entry.parent_file_id, file_entry.file_id); + Ok(file_stat) + } + + async fn lookup(&self, parent_file_id: u64, name: &str) -> Result<FileStat> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut file_stat = self.fs.lookup(&parent_file_entry.path, name).await?; + // fill the file id to file stat + self.resolve_file_id_to_filestat(&mut file_stat, parent_file_id) + .await; + Ok(file_stat) + } + + async fn read_dir(&self, file_id: u64) -> Result<Vec<FileStat>> { + let file_entry = self.get_file_entry(file_id).await?; + let mut child_filestats = self.fs.read_dir(&file_entry.path).await?; + for file in child_filestats.iter_mut() { + self.resolve_file_id_to_filestat(file, file.file_id).await; + } + Ok(child_filestats) + } + + async fn open_file(&self, file_id: u64, flags: u32) -> Result<FileHandle> { + self.open_file_internal(file_id, flags, FileType::RegularFile) + .await + } + + async fn open_dir(&self, file_id: u64, flags: u32) -> Result<FileHandle> { + self.open_file_internal(file_id, flags, FileType::Directory) + .await + } + + async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result<FileHandle> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut opened_file = self + .fs + .create_file(&parent_file_entry.path, name, OpenFileFlags(flags)) + .await?; + + opened_file.set_file_id(parent_file_id, self.next_file_id()); + + // insert the new file to file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.insert( + parent_file_id, + opened_file.file_stat.file_id, + &opened_file.file_stat.path, + ); + } + + // put the file to the opened file manager + let opened_file = self.opened_file_manager.put_file(opened_file); + let opened_file = opened_file.lock().await; + Ok(opened_file.file_handle()) + } + + async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result<u64> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut filestat = self.fs.create_dir(&parent_file_entry.path, name).await?; + + filestat.set_file_id(parent_file_id, self.next_file_id()); + + // insert the new file to file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.insert(filestat.parent_file_id, filestat.file_id, &filestat.path); + } + Ok(filestat.file_id) + } + + async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let file_entry = self.get_file_entry(file_id).await?; + self.fs.set_attr(&file_entry.path, file_stat, true).await + } + + async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + self.fs.remove_file(&parent_file_entry.path, name).await?; + + // remove the file from file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.remove(&join_file_path(&parent_file_entry.path, name)); + } + Ok(()) + } + + async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + self.fs.remove_dir(&parent_file_entry.path, name).await?; + + // remove the dir from file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.remove(&join_file_path(&parent_file_entry.path, name)); + } + Ok(()) + } + + async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> { + let opened_file = self + .opened_file_manager + .remove_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut file = opened_file.lock().await; + file.close().await + } + + async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> Result<Bytes> { + let file_stat: FileStat; + let data = { + let opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + file_stat = opened_file.file_stat.clone(); + opened_file.read(offset, size).await + }; + + // update the file atime + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + data + } + + async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result<u32> { + let (len, file_stat) = { + let opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + let len = opened_file.write(offset, data).await; + (len, opened_file.file_stat.clone()) + }; + + // update the file size, mtime and atime + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + len + } +} + +/// File entry is represent the abstract file. +#[derive(Debug, Clone)] +struct FileEntry { + file_id: u64, + parent_file_id: u64, + path: String, +} + +/// FileEntryManager is manage all the file entries in memory. it is used manger the file relationship and name mapping. +struct FileEntryManager { + // file_id_map is a map of file_id to file entry. + file_id_map: HashMap<u64, FileEntry>, + + // file_path_map is a map of file path to file entry. + file_path_map: HashMap<String, FileEntry>, +} + +impl FileEntryManager { + fn new() -> Self { + Self { + file_id_map: HashMap::new(), + file_path_map: HashMap::new(), + } + } + + fn get_file_by_id(&self, file_id: u64) -> Option<FileEntry> { + self.file_id_map.get(&file_id).cloned() + } + + fn get_file_by_path(&self, path: &str) -> Option<FileEntry> { + self.file_path_map.get(path).cloned() + } + + fn insert(&mut self, parent_file_id: u64, file_id: u64, path: &str) { + let file = FileEntry { Review Comment: please use fileEntry ########## clients/filesystem-fuse/src/opened_file_manager.rs: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::OpenedFile; +use dashmap::DashMap; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use tokio::sync::Mutex; + +// OpenedFileManager is a manager all the opened files. and allocate a file handle id for the opened file. +pub(crate) struct OpenedFileManager { + // file_handle_map is a map of file_handle_id to opened file. + file_handle_map: DashMap<u64, Arc<Mutex<OpenedFile>>>, + + // file_handle_id_generator is used to generate unique file handle IDs. + handle_id_generator: AtomicU64, +} + +impl OpenedFileManager { + pub fn new() -> Self { + Self { + file_handle_map: Default::default(), + handle_id_generator: AtomicU64::new(1), + } + } + + pub(crate) fn next_handle_id(&self) -> u64 { + self.handle_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + pub(crate) fn put_file(&self, mut file: OpenedFile) -> Arc<Mutex<OpenedFile>> { Review Comment: put_opened_file ########## clients/filesystem-fuse/src/utils.rs: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// join the parent and name to a path +pub fn join_file_path(parent: &str, name: &str) -> String { Review Comment: get_file_path ########## clients/filesystem-fuse/src/opened_file_manager.rs: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::OpenedFile; +use dashmap::DashMap; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use tokio::sync::Mutex; + +// OpenedFileManager is a manager all the opened files. and allocate a file handle id for the opened file. +pub(crate) struct OpenedFileManager { + // file_handle_map is a map of file_handle_id to opened file. + file_handle_map: DashMap<u64, Arc<Mutex<OpenedFile>>>, + + // file_handle_id_generator is used to generate unique file handle IDs. + handle_id_generator: AtomicU64, +} + +impl OpenedFileManager { + pub fn new() -> Self { + Self { + file_handle_map: Default::default(), + handle_id_generator: AtomicU64::new(1), + } + } + + pub(crate) fn next_handle_id(&self) -> u64 { + self.handle_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + pub(crate) fn put_file(&self, mut file: OpenedFile) -> Arc<Mutex<OpenedFile>> { + let file_handle_id = self.next_handle_id(); + file.handle_id = file_handle_id; + let file_handle = Arc::new(Mutex::new(file)); + self.file_handle_map + .insert(file_handle_id, file_handle.clone()); + file_handle + } + + pub(crate) fn get_file(&self, handle_id: u64) -> Option<Arc<Mutex<OpenedFile>>> { Review Comment: get_opened_file ########## clients/filesystem-fuse/src/filesystem.rs: ########## @@ -239,3 +364,412 @@ pub trait FileWriter: Sync + Send { Ok(()) } } + +/// SimpleFileSystem is a simple implementation for the file system. +/// it is used to manage the file metadata and file handle. +/// The operations of the file system are implemented by the PathFileSystem. +/// Note: This class is not use in the production code, it is used for the demo and testing +pub struct DefaultRawFileSystem<T: PathFileSystem> { + /// file entries + file_entry_manager: RwLock<FileEntryManager>, + /// opened files + opened_file_manager: OpenedFileManager, + /// inode id generator + file_id_generator: AtomicU64, + + /// real filesystem + fs: T, +} + +impl<T: PathFileSystem> DefaultRawFileSystem<T> { + const INITIAL_FILE_ID: u64 = 10000; + const ROOT_DIR_PARENT_FILE_ID: u64 = 1; + const ROOT_DIR_FILE_ID: u64 = 1; + const ROOT_DIR_NAME: &'static str = ""; + + pub(crate) fn new(fs: T) -> Self { + Self { + file_entry_manager: RwLock::new(FileEntryManager::new()), + opened_file_manager: OpenedFileManager::new(), + file_id_generator: AtomicU64::new(Self::INITIAL_FILE_ID), + fs, + } + } + + fn next_file_id(&self) -> u64 { + self.file_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + async fn get_file_entry(&self, file_id: u64) -> Result<FileEntry> { + self.file_entry_manager + .read() + .await + .get_file_by_id(file_id) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn get_file_entry_by_path(&self, path: &str) -> Option<FileEntry> { + self.file_entry_manager.read().await.get_file_by_path(path) + } + + async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat, parent_file_id: u64) { + let mut file_manager = self.file_entry_manager.write().await; + let file_entry = file_manager.get_file_by_path(&file_stat.path); + match file_entry { + None => { + // allocate new file id + file_stat.set_file_id(parent_file_id, self.next_file_id()); + file_manager.insert(file_stat.parent_file_id, file_stat.file_id, &file_stat.path); + } + Some(file) => { + // use the exist file id + file_stat.set_file_id(file.parent_file_id, file.file_id); + } + } + } + + async fn open_file_internal( + &self, + file_id: u64, + flags: u32, + kind: FileType, + ) -> Result<FileHandle> { + let file_entry = self.get_file_entry(file_id).await?; + + let mut opened_file = { + match kind { + FileType::Directory => { + self.fs + .open_dir(&file_entry.path, OpenFileFlags(flags)) + .await? + } + FileType::RegularFile => { + self.fs + .open_file(&file_entry.path, OpenFileFlags(flags)) + .await? + } + _ => return Err(Errno::from(libc::EINVAL)), + } + }; + // set the exists file id + opened_file.set_file_id(file_entry.parent_file_id, file_id); + let file = self.opened_file_manager.put_file(opened_file); + let file = file.lock().await; + Ok(file.file_handle()) + } +} + +#[async_trait] +impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> { + async fn init(&self) -> Result<()> { + // init root directory + self.file_entry_manager.write().await.insert( + Self::ROOT_DIR_PARENT_FILE_ID, + Self::ROOT_DIR_FILE_ID, + Self::ROOT_DIR_NAME, + ); + self.fs.init().await + } + + async fn get_file_path(&self, file_id: u64) -> String { + let file_entry = self.get_file_entry(file_id).await; + file_entry + .map(|x| x.path) + .unwrap_or_else(|_| "".to_string()) + } + + async fn valid_file_handle_id(&self, file_id: u64, fh: u64) -> Result<()> { + let fh_file_id = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))? + .lock() + .await + .file_stat + .file_id; + + (file_id == fh_file_id) + .then_some(()) + .ok_or(Errno::from(libc::EBADF)) + } + + async fn stat(&self, file_id: u64) -> Result<FileStat> { + let file_entry = self.get_file_entry(file_id).await?; + let mut file_stat = self.fs.stat(&file_entry.path).await?; + file_stat.set_file_id(file_entry.parent_file_id, file_entry.file_id); + Ok(file_stat) + } + + async fn lookup(&self, parent_file_id: u64, name: &str) -> Result<FileStat> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut file_stat = self.fs.lookup(&parent_file_entry.path, name).await?; + // fill the file id to file stat + self.resolve_file_id_to_filestat(&mut file_stat, parent_file_id) + .await; + Ok(file_stat) + } + + async fn read_dir(&self, file_id: u64) -> Result<Vec<FileStat>> { + let file_entry = self.get_file_entry(file_id).await?; + let mut child_filestats = self.fs.read_dir(&file_entry.path).await?; + for file in child_filestats.iter_mut() { + self.resolve_file_id_to_filestat(file, file.file_id).await; + } + Ok(child_filestats) + } + + async fn open_file(&self, file_id: u64, flags: u32) -> Result<FileHandle> { + self.open_file_internal(file_id, flags, FileType::RegularFile) + .await + } + + async fn open_dir(&self, file_id: u64, flags: u32) -> Result<FileHandle> { + self.open_file_internal(file_id, flags, FileType::Directory) + .await + } + + async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result<FileHandle> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut opened_file = self + .fs + .create_file(&parent_file_entry.path, name, OpenFileFlags(flags)) + .await?; + + opened_file.set_file_id(parent_file_id, self.next_file_id()); + + // insert the new file to file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.insert( + parent_file_id, + opened_file.file_stat.file_id, + &opened_file.file_stat.path, + ); + } + + // put the file to the opened file manager + let opened_file = self.opened_file_manager.put_file(opened_file); + let opened_file = opened_file.lock().await; + Ok(opened_file.file_handle()) + } + + async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result<u64> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut filestat = self.fs.create_dir(&parent_file_entry.path, name).await?; + + filestat.set_file_id(parent_file_id, self.next_file_id()); + + // insert the new file to file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.insert(filestat.parent_file_id, filestat.file_id, &filestat.path); + } + Ok(filestat.file_id) + } + + async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let file_entry = self.get_file_entry(file_id).await?; + self.fs.set_attr(&file_entry.path, file_stat, true).await + } + + async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + self.fs.remove_file(&parent_file_entry.path, name).await?; + + // remove the file from file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.remove(&join_file_path(&parent_file_entry.path, name)); + } + Ok(()) + } + + async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + self.fs.remove_dir(&parent_file_entry.path, name).await?; + + // remove the dir from file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.remove(&join_file_path(&parent_file_entry.path, name)); + } + Ok(()) + } + + async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> { + let opened_file = self + .opened_file_manager + .remove_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut file = opened_file.lock().await; + file.close().await + } + + async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> Result<Bytes> { + let file_stat: FileStat; + let data = { + let opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + file_stat = opened_file.file_stat.clone(); + opened_file.read(offset, size).await + }; + + // update the file atime + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + data + } + + async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result<u32> { + let (len, file_stat) = { + let opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + let len = opened_file.write(offset, data).await; + (len, opened_file.file_stat.clone()) + }; + + // update the file size, mtime and atime + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + len + } +} + +/// File entry is represent the abstract file. +#[derive(Debug, Clone)] +struct FileEntry { + file_id: u64, + parent_file_id: u64, + path: String, +} + +/// FileEntryManager is manage all the file entries in memory. it is used manger the file relationship and name mapping. +struct FileEntryManager { + // file_id_map is a map of file_id to file entry. + file_id_map: HashMap<u64, FileEntry>, + + // file_path_map is a map of file path to file entry. + file_path_map: HashMap<String, FileEntry>, +} + +impl FileEntryManager { + fn new() -> Self { + Self { + file_id_map: HashMap::new(), + file_path_map: HashMap::new(), + } + } + + fn get_file_by_id(&self, file_id: u64) -> Option<FileEntry> { + self.file_id_map.get(&file_id).cloned() + } + + fn get_file_by_path(&self, path: &str) -> Option<FileEntry> { Review Comment: get_file_entry_by_path ########## clients/filesystem-fuse/src/filesystem.rs: ########## @@ -201,6 +246,86 @@ pub(crate) struct OpenedFile { pub writer: Option<Box<dyn FileWriter>>, } +impl OpenedFile { + pub fn new(file_stat: FileStat) -> Self { + OpenedFile { + file_stat: file_stat, + handle_id: 0, + reader: None, + writer: None, + } + } + + async fn read(&mut self, offset: u64, size: u32) -> Result<Bytes> { + let reader = self.reader.as_mut().ok_or(Errno::from(libc::EBADF))?; + let result = reader.read(offset, size).await?; + + // update the atime + self.file_stat.atime = Timestamp::from(SystemTime::now()); + + Ok(result) + } + + async fn write(&mut self, offset: u64, data: &[u8]) -> Result<u32> { + let writer = self.writer.as_mut().ok_or(Errno::from(libc::EBADF))?; + let written = writer.write(offset, data).await?; Review Comment: please handle the cases `written` is less than data.len() ########## clients/filesystem-fuse/src/utils.rs: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// join the parent and name to a path +pub fn join_file_path(parent: &str, name: &str) -> String { + if parent.is_empty() { + name.to_string() + } else { + format!("{}/{}", parent, name) Review Comment: is it possible parent ends with '/'? ########## clients/filesystem-fuse/src/filesystem.rs: ########## @@ -239,3 +364,412 @@ pub trait FileWriter: Sync + Send { Ok(()) } } + +/// SimpleFileSystem is a simple implementation for the file system. +/// it is used to manage the file metadata and file handle. +/// The operations of the file system are implemented by the PathFileSystem. +/// Note: This class is not use in the production code, it is used for the demo and testing +pub struct DefaultRawFileSystem<T: PathFileSystem> { + /// file entries + file_entry_manager: RwLock<FileEntryManager>, + /// opened files + opened_file_manager: OpenedFileManager, + /// inode id generator + file_id_generator: AtomicU64, + + /// real filesystem + fs: T, +} + +impl<T: PathFileSystem> DefaultRawFileSystem<T> { + const INITIAL_FILE_ID: u64 = 10000; + const ROOT_DIR_PARENT_FILE_ID: u64 = 1; + const ROOT_DIR_FILE_ID: u64 = 1; + const ROOT_DIR_NAME: &'static str = ""; + + pub(crate) fn new(fs: T) -> Self { + Self { + file_entry_manager: RwLock::new(FileEntryManager::new()), + opened_file_manager: OpenedFileManager::new(), + file_id_generator: AtomicU64::new(Self::INITIAL_FILE_ID), + fs, + } + } + + fn next_file_id(&self) -> u64 { + self.file_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + async fn get_file_entry(&self, file_id: u64) -> Result<FileEntry> { + self.file_entry_manager + .read() + .await + .get_file_by_id(file_id) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn get_file_entry_by_path(&self, path: &str) -> Option<FileEntry> { + self.file_entry_manager.read().await.get_file_by_path(path) + } + + async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat, parent_file_id: u64) { + let mut file_manager = self.file_entry_manager.write().await; + let file_entry = file_manager.get_file_by_path(&file_stat.path); + match file_entry { + None => { + // allocate new file id + file_stat.set_file_id(parent_file_id, self.next_file_id()); + file_manager.insert(file_stat.parent_file_id, file_stat.file_id, &file_stat.path); + } + Some(file) => { + // use the exist file id + file_stat.set_file_id(file.parent_file_id, file.file_id); + } + } + } + + async fn open_file_internal( + &self, + file_id: u64, + flags: u32, + kind: FileType, + ) -> Result<FileHandle> { + let file_entry = self.get_file_entry(file_id).await?; + + let mut opened_file = { + match kind { + FileType::Directory => { + self.fs + .open_dir(&file_entry.path, OpenFileFlags(flags)) + .await? + } + FileType::RegularFile => { + self.fs + .open_file(&file_entry.path, OpenFileFlags(flags)) + .await? + } + _ => return Err(Errno::from(libc::EINVAL)), + } + }; + // set the exists file id + opened_file.set_file_id(file_entry.parent_file_id, file_id); + let file = self.opened_file_manager.put_file(opened_file); + let file = file.lock().await; + Ok(file.file_handle()) + } +} + +#[async_trait] +impl<T: PathFileSystem> RawFileSystem for DefaultRawFileSystem<T> { + async fn init(&self) -> Result<()> { + // init root directory + self.file_entry_manager.write().await.insert( + Self::ROOT_DIR_PARENT_FILE_ID, + Self::ROOT_DIR_FILE_ID, + Self::ROOT_DIR_NAME, + ); + self.fs.init().await + } + + async fn get_file_path(&self, file_id: u64) -> String { + let file_entry = self.get_file_entry(file_id).await; + file_entry + .map(|x| x.path) + .unwrap_or_else(|_| "".to_string()) + } + + async fn valid_file_handle_id(&self, file_id: u64, fh: u64) -> Result<()> { + let fh_file_id = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))? + .lock() + .await + .file_stat + .file_id; + + (file_id == fh_file_id) + .then_some(()) + .ok_or(Errno::from(libc::EBADF)) + } + + async fn stat(&self, file_id: u64) -> Result<FileStat> { + let file_entry = self.get_file_entry(file_id).await?; + let mut file_stat = self.fs.stat(&file_entry.path).await?; + file_stat.set_file_id(file_entry.parent_file_id, file_entry.file_id); + Ok(file_stat) + } + + async fn lookup(&self, parent_file_id: u64, name: &str) -> Result<FileStat> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut file_stat = self.fs.lookup(&parent_file_entry.path, name).await?; + // fill the file id to file stat + self.resolve_file_id_to_filestat(&mut file_stat, parent_file_id) + .await; + Ok(file_stat) + } + + async fn read_dir(&self, file_id: u64) -> Result<Vec<FileStat>> { + let file_entry = self.get_file_entry(file_id).await?; + let mut child_filestats = self.fs.read_dir(&file_entry.path).await?; + for file in child_filestats.iter_mut() { + self.resolve_file_id_to_filestat(file, file.file_id).await; + } + Ok(child_filestats) + } + + async fn open_file(&self, file_id: u64, flags: u32) -> Result<FileHandle> { + self.open_file_internal(file_id, flags, FileType::RegularFile) + .await + } + + async fn open_dir(&self, file_id: u64, flags: u32) -> Result<FileHandle> { + self.open_file_internal(file_id, flags, FileType::Directory) + .await + } + + async fn create_file(&self, parent_file_id: u64, name: &str, flags: u32) -> Result<FileHandle> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut opened_file = self + .fs + .create_file(&parent_file_entry.path, name, OpenFileFlags(flags)) + .await?; + + opened_file.set_file_id(parent_file_id, self.next_file_id()); + + // insert the new file to file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.insert( + parent_file_id, + opened_file.file_stat.file_id, + &opened_file.file_stat.path, + ); + } + + // put the file to the opened file manager + let opened_file = self.opened_file_manager.put_file(opened_file); + let opened_file = opened_file.lock().await; + Ok(opened_file.file_handle()) + } + + async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result<u64> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + let mut filestat = self.fs.create_dir(&parent_file_entry.path, name).await?; + + filestat.set_file_id(parent_file_id, self.next_file_id()); + + // insert the new file to file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.insert(filestat.parent_file_id, filestat.file_id, &filestat.path); + } + Ok(filestat.file_id) + } + + async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let file_entry = self.get_file_entry(file_id).await?; + self.fs.set_attr(&file_entry.path, file_stat, true).await + } + + async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + self.fs.remove_file(&parent_file_entry.path, name).await?; + + // remove the file from file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.remove(&join_file_path(&parent_file_entry.path, name)); + } + Ok(()) + } + + async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> { + let parent_file_entry = self.get_file_entry(parent_file_id).await?; + self.fs.remove_dir(&parent_file_entry.path, name).await?; + + // remove the dir from file entry manager + { + let mut file_manager = self.file_entry_manager.write().await; + file_manager.remove(&join_file_path(&parent_file_entry.path, name)); + } + Ok(()) + } + + async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> { + let opened_file = self + .opened_file_manager + .remove_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut file = opened_file.lock().await; + file.close().await + } + + async fn read(&self, _file_id: u64, fh: u64, offset: u64, size: u32) -> Result<Bytes> { + let file_stat: FileStat; + let data = { + let opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + file_stat = opened_file.file_stat.clone(); + opened_file.read(offset, size).await + }; + + // update the file atime + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + data + } + + async fn write(&self, _file_id: u64, fh: u64, offset: u64, data: &[u8]) -> Result<u32> { + let (len, file_stat) = { + let opened_file = self + .opened_file_manager + .get_file(fh) + .ok_or(Errno::from(libc::EBADF))?; + let mut opened_file = opened_file.lock().await; + let len = opened_file.write(offset, data).await; + (len, opened_file.file_stat.clone()) + }; + + // update the file size, mtime and atime + self.fs.set_attr(&file_stat.path, &file_stat, false).await?; + + len + } +} + +/// File entry is represent the abstract file. +#[derive(Debug, Clone)] +struct FileEntry { + file_id: u64, + parent_file_id: u64, + path: String, +} + +/// FileEntryManager is manage all the file entries in memory. it is used manger the file relationship and name mapping. +struct FileEntryManager { + // file_id_map is a map of file_id to file entry. + file_id_map: HashMap<u64, FileEntry>, + + // file_path_map is a map of file path to file entry. + file_path_map: HashMap<String, FileEntry>, +} + +impl FileEntryManager { + fn new() -> Self { + Self { + file_id_map: HashMap::new(), + file_path_map: HashMap::new(), + } + } + + fn get_file_by_id(&self, file_id: u64) -> Option<FileEntry> { Review Comment: get_file_entry_by_id ########## clients/filesystem-fuse/src/filesystem.rs: ########## @@ -239,3 +364,412 @@ pub trait FileWriter: Sync + Send { Ok(()) } } + +/// SimpleFileSystem is a simple implementation for the file system. Review Comment: please rewrite the comment, remove Note line? ########## clients/filesystem-fuse/src/filesystem.rs: ########## @@ -239,3 +364,412 @@ pub trait FileWriter: Sync + Send { Ok(()) } } + +/// SimpleFileSystem is a simple implementation for the file system. +/// it is used to manage the file metadata and file handle. +/// The operations of the file system are implemented by the PathFileSystem. +/// Note: This class is not use in the production code, it is used for the demo and testing +pub struct DefaultRawFileSystem<T: PathFileSystem> { + /// file entries + file_entry_manager: RwLock<FileEntryManager>, + /// opened files + opened_file_manager: OpenedFileManager, + /// inode id generator + file_id_generator: AtomicU64, + + /// real filesystem + fs: T, +} + +impl<T: PathFileSystem> DefaultRawFileSystem<T> { + const INITIAL_FILE_ID: u64 = 10000; + const ROOT_DIR_PARENT_FILE_ID: u64 = 1; + const ROOT_DIR_FILE_ID: u64 = 1; + const ROOT_DIR_NAME: &'static str = ""; + + pub(crate) fn new(fs: T) -> Self { + Self { + file_entry_manager: RwLock::new(FileEntryManager::new()), + opened_file_manager: OpenedFileManager::new(), + file_id_generator: AtomicU64::new(Self::INITIAL_FILE_ID), + fs, + } + } + + fn next_file_id(&self) -> u64 { + self.file_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + async fn get_file_entry(&self, file_id: u64) -> Result<FileEntry> { + self.file_entry_manager + .read() + .await + .get_file_by_id(file_id) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn get_file_entry_by_path(&self, path: &str) -> Option<FileEntry> { + self.file_entry_manager.read().await.get_file_by_path(path) + } + + async fn resolve_file_id_to_filestat(&self, file_stat: &mut FileStat, parent_file_id: u64) { + let mut file_manager = self.file_entry_manager.write().await; + let file_entry = file_manager.get_file_by_path(&file_stat.path); + match file_entry { + None => { + // allocate new file id + file_stat.set_file_id(parent_file_id, self.next_file_id()); + file_manager.insert(file_stat.parent_file_id, file_stat.file_id, &file_stat.path); + } + Some(file) => { + // use the exist file id + file_stat.set_file_id(file.parent_file_id, file.file_id); + } + } + } + + async fn open_file_internal( + &self, + file_id: u64, + flags: u32, + kind: FileType, + ) -> Result<FileHandle> { + let file_entry = self.get_file_entry(file_id).await?; + + let mut opened_file = { + match kind { + FileType::Directory => { + self.fs + .open_dir(&file_entry.path, OpenFileFlags(flags)) + .await? + } + FileType::RegularFile => { + self.fs + .open_file(&file_entry.path, OpenFileFlags(flags)) + .await? + } + _ => return Err(Errno::from(libc::EINVAL)), + } + }; + // set the exists file id + opened_file.set_file_id(file_entry.parent_file_id, file_id); + let file = self.opened_file_manager.put_file(opened_file); Review Comment: why not using file handler in open_file directly? ########## clients/filesystem-fuse/src/opened_file_manager.rs: ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::OpenedFile; +use dashmap::DashMap; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; +use tokio::sync::Mutex; + +// OpenedFileManager is a manager all the opened files. and allocate a file handle id for the opened file. +pub(crate) struct OpenedFileManager { + // file_handle_map is a map of file_handle_id to opened file. + file_handle_map: DashMap<u64, Arc<Mutex<OpenedFile>>>, + + // file_handle_id_generator is used to generate unique file handle IDs. + handle_id_generator: AtomicU64, +} + +impl OpenedFileManager { + pub fn new() -> Self { + Self { + file_handle_map: Default::default(), + handle_id_generator: AtomicU64::new(1), + } + } + + pub(crate) fn next_handle_id(&self) -> u64 { + self.handle_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + pub(crate) fn put_file(&self, mut file: OpenedFile) -> Arc<Mutex<OpenedFile>> { + let file_handle_id = self.next_handle_id(); + file.handle_id = file_handle_id; + let file_handle = Arc::new(Mutex::new(file)); + self.file_handle_map + .insert(file_handle_id, file_handle.clone()); + file_handle + } + + pub(crate) fn get_file(&self, handle_id: u64) -> Option<Arc<Mutex<OpenedFile>>> { + self.file_handle_map + .get(&handle_id) + .map(|x| x.value().clone()) + } + + pub(crate) fn remove_file(&self, handle_id: u64) -> Option<Arc<Mutex<OpenedFile>>> { Review Comment: remove_opened_file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@gravitino.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org