fresh-borzoni commented on code in PR #407:
URL: https://github.com/apache/fluss-rust/pull/407#discussion_r2881058266


##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -386,12 +411,76 @@ where
             ConnectionState::Poison(e) => return 
Err(RpcError::Poisoned(Arc::clone(e)).into()),
         }
 
-        self.send_message(buf).await?;
+        // Guard to decrement in-flight on cancellation, panic, or any exit 
without
+        // explicitly calling record_completion_metrics. Prevents gauge drift 
when
+        // the request future is dropped (e.g. tokio::select! timeout).
+        let mut in_flight_guard = if let Some(label) = api_label {
+            metrics::counter!(
+                crate::metrics::CLIENT_REQUESTS_TOTAL,
+                crate::metrics::LABEL_API_KEY => label
+            )
+            .increment(1);
+            metrics::counter!(
+                crate::metrics::CLIENT_BYTES_SENT_TOTAL,

Review Comment:
   why do we increment it before sending?



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -561,3 +650,594 @@ impl Drop for CleanupRequestStateOnCancel {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::error::Error;
+    use crate::rpc::ApiKey;
+    use crate::rpc::api_version::ApiVersion;
+    use crate::rpc::frame::{ReadError, WriteError};
+    use crate::rpc::message::{ReadVersionedType, RequestBody, 
WriteVersionedType};
+    use metrics_util::debugging::DebuggingRecorder;
+    use std::sync::OnceLock;
+    use tokio::io::{AsyncReadExt, AsyncWriteExt, BufStream};
+    use tokio::sync::Mutex as AsyncMutex;
+
+    // -- Test-only request/response types --------------------------------
+
+    struct TestProduceRequest;
+    struct TestProduceResponse;
+
+    impl RequestBody for TestProduceRequest {
+        type ResponseBody = TestProduceResponse;
+        const API_KEY: ApiKey = ApiKey::ProduceLog;
+        const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+    }
+
+    impl WriteVersionedType<Vec<u8>> for TestProduceRequest {
+        fn write_versioned(&self, _w: &mut Vec<u8>, _v: ApiVersion) -> 
Result<(), WriteError> {
+            Ok(())
+        }
+    }
+
+    impl ReadVersionedType<Cursor<Vec<u8>>> for TestProduceResponse {
+        fn read_versioned(_r: &mut Cursor<Vec<u8>>, _v: ApiVersion) -> 
Result<Self, ReadError> {
+            Ok(TestProduceResponse)
+        }
+    }
+
+    struct TestMetadataRequest;
+    struct TestMetadataResponse;
+
+    impl RequestBody for TestMetadataRequest {
+        type ResponseBody = TestMetadataResponse;
+        const API_KEY: ApiKey = ApiKey::MetaData;
+        const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+    }
+
+    impl WriteVersionedType<Vec<u8>> for TestMetadataRequest {
+        fn write_versioned(&self, _w: &mut Vec<u8>, _v: ApiVersion) -> 
Result<(), WriteError> {
+            Ok(())
+        }
+    }
+
+    impl ReadVersionedType<Cursor<Vec<u8>>> for TestMetadataResponse {
+        fn read_versioned(_r: &mut Cursor<Vec<u8>>, _v: ApiVersion) -> 
Result<Self, ReadError> {
+            Ok(TestMetadataResponse)
+        }
+    }
+
+    // -- Mock server -----------------------------------------------------
+
+    /// Reads framed requests and echoes back minimal success responses.
+    async fn mock_echo_server(mut stream: tokio::io::DuplexStream) {
+        loop {
+            let mut len_buf = [0u8; 4];
+            if stream.read_exact(&mut len_buf).await.is_err() {
+                return;
+            }
+            let len = i32::from_be_bytes(len_buf) as usize;
+
+            let mut payload = vec![0u8; len];
+            if stream.read_exact(&mut payload).await.is_err() {
+                return;
+            }
+
+            // Header layout: api_key(2) + api_version(2) + request_id(4)
+            let request_id = i32::from_be_bytes([payload[4], payload[5], 
payload[6], payload[7]]);
+
+            // Response: resp_type(1, 0=success) + request_id(4)
+            let mut resp = Vec::with_capacity(5);
+            resp.push(0u8);
+            resp.extend_from_slice(&request_id.to_be_bytes());
+
+            let resp_len = (resp.len() as i32).to_be_bytes();
+            if stream.write_all(&resp_len).await.is_err()
+                || stream.write_all(&resp).await.is_err()
+                || stream.flush().await.is_err()
+            {
+                return;
+            }
+        }
+    }
+
+    /// Reads framed requests and echoes back error responses (resp_type=1).
+    async fn mock_error_server(mut stream: tokio::io::DuplexStream) {
+        use prost::Message;
+
+        loop {
+            let mut len_buf = [0u8; 4];
+            if stream.read_exact(&mut len_buf).await.is_err() {
+                return;
+            }
+            let len = i32::from_be_bytes(len_buf) as usize;
+
+            let mut payload = vec![0u8; len];
+            if stream.read_exact(&mut payload).await.is_err() {
+                return;
+            }
+
+            let request_id = i32::from_be_bytes([payload[4], payload[5], 
payload[6], payload[7]]);
+
+            let err = crate::proto::ErrorResponse {
+                error_code: 1,
+                error_message: Some("test error".to_string()),
+            };
+            let mut err_buf = Vec::new();
+            err.encode(&mut err_buf).expect("ErrorResponse encode");
+
+            let mut resp = Vec::with_capacity(5 + err_buf.len());
+            resp.push(1u8); // ERROR_RESPONSE
+            resp.extend_from_slice(&request_id.to_be_bytes());
+            resp.extend(err_buf);
+
+            let resp_len = (resp.len() as i32).to_be_bytes();
+            if stream.write_all(&resp_len).await.is_err()
+                || stream.write_all(&resp).await.is_err()
+                || stream.flush().await.is_err()
+            {
+                return;
+            }
+        }
+    }
+
+    // -- Recorder setup --------------------------------------------------
+
+    /// Shared test recorder (installed once per test binary).
+    static TEST_SNAPSHOTTER: OnceLock<metrics_util::debugging::Snapshotter> = 
OnceLock::new();
+    static TEST_LOCK: OnceLock<AsyncMutex<()>> = OnceLock::new();
+
+    fn test_snapshotter() -> &'static metrics_util::debugging::Snapshotter {
+        TEST_SNAPSHOTTER.get_or_init(|| {
+            let recorder = DebuggingRecorder::new();
+            let snapshotter = recorder.snapshotter();
+            recorder
+                .install()
+                .expect("debugging recorder install should succeed in this 
test binary");
+            snapshotter
+        })
+    }
+
+    fn test_lock() -> &'static AsyncMutex<()> {
+        TEST_LOCK.get_or_init(|| AsyncMutex::new(()))
+    }
+
+    // -- Tests -----------------------------------------------------------
+
+    #[tokio::test]
+    async fn request_records_metrics_for_reportable_api_key() {
+        let _test_guard = test_lock().lock().await;
+        let snapshotter = test_snapshotter();
+
+        let (client, server) = tokio::io::duplex(4096);
+        tokio::spawn(mock_echo_server(server));
+
+        let conn = ServerConnectionInner::new(BufStream::new(client), 
usize::MAX, Arc::from("t"));
+
+        let before: Vec<_> = snapshotter.snapshot().into_vec();
+        let request_before = before
+            .iter()
+            .find_map(|(key, _, _, value)| {

Review Comment:
   can we extract this pattern?



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -386,12 +411,76 @@ where
             ConnectionState::Poison(e) => return 
Err(RpcError::Poisoned(Arc::clone(e)).into()),
         }
 
-        self.send_message(buf).await?;
+        // Guard to decrement in-flight on cancellation, panic, or any exit 
without
+        // explicitly calling record_completion_metrics. Prevents gauge drift 
when
+        // the request future is dropped (e.g. tokio::select! timeout).
+        let mut in_flight_guard = if let Some(label) = api_label {

Review Comment:
   The scopeguard only covers cancellation, so every other exit still needs 
manual disarm + record_completion_metrics. A custom Drop struct that handles 
all exits in one place might be better- no disarm, no per-path cleanup to 
forget.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to