numinnex commented on code in PR #3068:
URL: https://github.com/apache/iggy/pull/3068#discussion_r3091904395


##########
core/common/src/types/segment_storage/messages_writer/direct.rs:
##########
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+    IggyByteSize, IggyError, IggyMessagesBatch,
+    alloc::memory_pool::ALIGNMENT,
+    types::segment_storage::direct_file::{DirectFile, SharedTail},
+};
+use bytes::Bytes;
+use compio::io::AsyncReadAtExt;
+use std::{
+    cell::RefCell,
+    rc::Rc,
+    sync::atomic::{AtomicU64, Ordering},
+};
+use tracing::{error, trace};
+
+/// A dedicated struct for writing to the messages file.
+#[derive(Debug)]
+pub struct DirectMessagesWriter {
+    file_path: String,
+    file: RefCell<DirectFile>,
+    messages_size_bytes: Rc<AtomicU64>,
+    shared_tail: Rc<SharedTail>,
+}
+
+// Safety: We are guaranteeing that MessagesWriter will never be used from 
multiple threads
+unsafe impl Send for DirectMessagesWriter {}
+
+impl DirectMessagesWriter {
+    /// Opens the messages file in write mode.
+    ///
+    /// If the server confirmation is set to `NoWait`, the file handle is 
transferred to the
+    /// persister task (and stored in `persister_task`) so that writes are 
done asynchronously.
+    /// Otherwise, the file is retained in `self.file` for synchronous writes.
+    pub async fn new(
+        file_path: &str,
+        messages_size_bytes: Rc<AtomicU64>,
+        file_exists: bool,
+        dsync: bool,
+    ) -> Result<Self, IggyError> {
+        let initial_position = if file_exists {
+            let metadata = compio::fs::metadata(file_path).await.map_err(|err| 
{
+                error!("Failed to get metadata of messages file: {file_path}, 
error: {err}");
+                IggyError::CannotReadFileMetadata
+            })?;
+
+            let actual_size = metadata.len();
+            messages_size_bytes.store(actual_size, Ordering::Relaxed);
+
+            // DirectFile requires aligned initial position
+            // Align down to ALIGNMENT boundary
+            actual_size & !(ALIGNMENT as u64 - 1)
+        } else {
+            0
+        };
+
+        let mut direct_file =
+            DirectFile::open(file_path, initial_position, file_exists, 
dsync).await?;
+
+        if file_exists {
+            let actual_size = messages_size_bytes.load(Ordering::Relaxed);
+            let tail_bytes = (actual_size - initial_position) as usize;
+            if tail_bytes > 0 {
+                trace!(
+                    "Recovering {} tail bytes from previous session for {}",
+                    tail_bytes, file_path
+                );
+
+                let read_file = 
compio::fs::File::open(file_path).await.map_err(|err| {
+                    error!("Failed to open file for tail recovery: 
{file_path}, error: {err}");
+                    IggyError::CannotReadFile
+                })?;
+
+                let mut tail_buf = vec![0u8; tail_bytes];
+                let (result, buf) = read_file
+                    .read_exact_at(tail_buf, initial_position)
+                    .await
+                    .into();
+
+                result.map_err(|err| {
+                    error!("Failed to read tail bytes at pos 
{initial_position}: {file_path}, error: {err}");
+                    IggyError::CannotReadFile
+                })?;
+
+                tail_buf = buf;
+                direct_file.set_tail(&tail_buf);
+            }
+        }
+
+        trace!(
+            "Opened DirectFile messages writer: {file_path}, pos: {}, size: 
{}",
+            direct_file.position(),
+            messages_size_bytes.load(Ordering::Acquire)
+        );
+
+        Ok(Self {
+            file_path: file_path.to_string(),
+            file: RefCell::new(direct_file),
+            messages_size_bytes,
+            shared_tail: Rc::new(SharedTail::new()),
+        })
+    }
+
+    pub fn shared_tail(&self) -> Rc<SharedTail> {
+        Rc::clone(&self.shared_tail)
+    }
+
+    fn update_shared_tail(&self) {
+        let df = self.file.borrow_mut();
+        self.shared_tail
+            .update(df.position(), &df.tail_buffer()[..], df.tail_len());
+    }
+
+    #[allow(clippy::await_holding_refcell_ref)]
+    // SAFETY: compio is a single-threaded runtime — no other task runs on
+    // this thread during .await, so the RefCell borrow cannot conflict
+    pub async fn save_frozen_batches(

Review Comment:
   The safety comment isn't necessary true, we've learned it the hard way 
during the migration to thread-per-core shared nothing architecture. Single 
threaded runtime doesn't imply lack of concurrency and concurrent  execution is 
something that can trigger the runtime panic on that `borrow` there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to