Copilot commented on code in PR #375:
URL: https://github.com/apache/fluss-rust/pull/375#discussion_r2852956180


##########
bindings/cpp/src/lib.rs:
##########
@@ -243,6 +248,9 @@ mod ffi {
         type LookupResultInner;
 
         // Connection
+        // TODO: all Result<*mut T> methods lose server error codes (mapped to 
CLIENT_ERROR).
+        // Fix by introducing  some struct like { result: FfiResult, ptr: i64 
} to preserve error
+        // codes from the server, matching how Rust and Python bindings handle 
errors.
         fn new_connection(config: &FfiConfig) -> Result<*mut Connection>;

Review Comment:
   The C++ error code TODO is accurate: the Result type for `new_connection` 
loses the server error code when authentication fails. Authentication errors 
will appear as CLIENT_ERROR (-2) instead of AUTHENTICATE_EXCEPTION (46). The 
workaround in the test (checking for "Authentication failed" in the error 
message) is brittle. This is a known limitation documented in the code, but 
users may experience degraded error diagnostics. Consider prioritizing the fix 
suggested in the TODO comment to preserve error codes through FFI boundaries.
   ```suggestion
           fn new_connection(config: &FfiConfig, out_conn: &mut *mut 
Connection) -> FfiResult;
   ```



##########
crates/fluss/tests/integration/fluss_cluster.rs:
##########
@@ -210,42 +344,99 @@ impl FlussTestingClusterBuilder {
 
 /// Provides an easy way to launch a Fluss cluster with coordinator and tablet 
servers.
 #[derive(Clone)]
+#[allow(dead_code)] // Fields held for RAII (keeping Docker containers alive).
 pub struct FlussTestingCluster {
     zookeeper: Arc<ContainerAsync<GenericImage>>,
     coordinator_server: Arc<ContainerAsync<GenericImage>>,
     tablet_servers: HashMap<i32, Arc<ContainerAsync<GenericImage>>>,
+    /// Bootstrap servers for plaintext connections.
+    /// When dual listeners are configured, this points to the PLAIN_CLIENT 
listener.
     bootstrap_servers: String,
+    /// Bootstrap servers for SASL connections (only set when dual listeners 
are configured).
+    sasl_bootstrap_servers: Option<String>,
     remote_data_dir: Option<std::path::PathBuf>,
+    sasl_users: Vec<(String, String)>,
+    container_names: Vec<String>,
 }
 
 impl FlussTestingCluster {
-    pub async fn stop(&self) {
-        for tablet_server in self.tablet_servers.values() {
-            tablet_server.stop().await.unwrap()
+    /// Synchronously stops and removes all Docker containers and cleans up the
+    /// remote data directory. Safe to call from non-async contexts (e.g. 
atexit).
+    #[allow(dead_code)]
+    pub fn stop(&self) {
+        for name in &self.container_names {
+            let _ = std::process::Command::new("docker")
+                .args(["rm", "-f", name])
+                .output();
         }
-        self.coordinator_server.stop().await.unwrap();
-        self.zookeeper.stop().await.unwrap();
-        if let Some(remote_data_dir) = &self.remote_data_dir {
-            // Try to clean up the remote data directory, but don't fail if it 
can't be deleted.
-            // This can happen in CI environments or if Docker containers are 
still using the directory.
-            // The directory will be cleaned up by the CI system or OS 
eventually.
-            if let Err(e) = tokio::fs::remove_dir_all(remote_data_dir).await {
-                eprintln!(
-                    "Warning: Failed to delete remote data directory: {:?}, 
error: {:?}. \
-                     This is non-fatal and the directory may be cleaned up 
later.",
-                    remote_data_dir, e
-                );
-            }
+        if let Some(ref dir) = self.remote_data_dir {
+            let _ = std::fs::remove_dir_all(dir);
         }
     }

Review Comment:
   The cluster cleanup uses synchronous Docker commands in an atexit handler, 
which is appropriate for process termination. However, if `stop()` is called 
explicitly during test execution (e.g., in teardown), it won't wait for 
containers to fully stop before returning, potentially causing issues if tests 
run immediately after. Consider adding a small delay or polling mechanism to 
verify containers have stopped, or document that this is intentional for fast 
test execution.



##########
crates/fluss/tests/integration/fluss_cluster.rs:
##########
@@ -25,17 +25,29 @@ use testcontainers::core::ContainerPort;
 use testcontainers::runners::AsyncRunner;
 use testcontainers::{ContainerAsync, GenericImage, ImageExt};
 
-const FLUSS_VERSION: &str = "0.7.0";
+const FLUSS_VERSION: &str = "0.8.0-incubating";

Review Comment:
   The Fluss server version was updated from 0.7.0 to 0.8.0-incubating to 
support SASL authentication. Ensure that the 0.8.0-incubating Docker image 
(apache/fluss:0.8.0-incubating) is available in the Docker registry before 
merging, as CI and local development will pull this image. If the image is not 
yet published, tests will fail with image pull errors.
   ```suggestion
   const FLUSS_VERSION: &str = "0.7.0";
   ```



##########
crates/fluss/tests/integration/utils.rs:
##########
@@ -18,20 +18,97 @@
 use crate::integration::fluss_cluster::{FlussTestingCluster, 
FlussTestingClusterBuilder};
 use fluss::client::FlussAdmin;
 use fluss::metadata::{PartitionSpec, TableDescriptor, TablePath};
-use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::sync::Arc;
+use std::sync::LazyLock;
 use std::time::Duration;
 
-/// Polls the cluster until CoordinatorEventProcessor is initialized and 
tablet server is available.
-/// Times out after 20 seconds.
-pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) {
-    let timeout = Duration::from_secs(20);
+extern "C" fn cleanup_on_exit() {
+    SHARED_CLUSTER.stop();
+}
+
+/// Shared cluster with dual listeners: PLAIN_CLIENT (plaintext) on port 9223
+/// and CLIENT (SASL) on port 9123. Includes remote storage config so
+/// table_remote_scan can also use this cluster.
+static SHARED_CLUSTER: LazyLock<FlussTestingCluster> = LazyLock::new(|| {
+    std::thread::spawn(|| {
+        let rt = tokio::runtime::Runtime::new().expect("Failed to create 
runtime");
+        rt.block_on(async {
+            let temp_dir = std::env::current_dir()
+                .unwrap_or_else(|_| std::path::PathBuf::from("."))
+                .join("target")
+                .join(format!("test-remote-data-{}", uuid::Uuid::new_v4()));
+            let _ = std::fs::remove_dir_all(&temp_dir);
+            std::fs::create_dir_all(&temp_dir)
+                .expect("Failed to create temporary directory for remote 
data");
+            let temp_dir = temp_dir
+                .canonicalize()
+                .expect("Failed to canonicalize remote data directory path");
+
+            let mut cluster_conf = HashMap::new();
+            cluster_conf.insert("log.segment.file-size".to_string(), 
"120b".to_string());
+            cluster_conf.insert(
+                "remote.log.task-interval-duration".to_string(),
+                "1s".to_string(),
+            );
+
+            let cluster =
+                
FlussTestingClusterBuilder::new_with_cluster_conf("shared-test", &cluster_conf)
+                    .with_sasl(vec![
+                        ("admin".to_string(), "admin-secret".to_string()),
+                        ("alice".to_string(), "alice-secret".to_string()),
+                    ])
+                    .with_remote_data_dir(temp_dir)
+                    .build()
+                    .await;
+            wait_for_cluster_ready_with_sasl(&cluster).await;
+
+            // Register cleanup so containers are removed on process exit.
+            unsafe {
+                unsafe extern "C" {
+                    fn atexit(f: extern "C" fn()) -> std::os::raw::c_int;
+                }
+                atexit(cleanup_on_exit);
+            }
+
+            cluster
+        })
+    })
+    .join()
+    .expect("Failed to initialize shared cluster")
+});

Review Comment:
   The shared cluster is initialized in a LazyLock with a thread spawn and 
runtime creation. If initialization fails (e.g., Docker not available), the 
panic message from `expect("Failed to initialize shared cluster")` may not be 
clear about the root cause. Consider improving error messages throughout the 
initialization chain to help developers diagnose issues like "Docker not found" 
or "Port 9123 already in use".



##########
crates/fluss/src/rpc/server_connection.rs:
##########
@@ -100,7 +134,74 @@ impl RpcClient {
             self.max_message_size,
             self.client_id.clone(),
         );
-        Ok(ServerConnection::new(messenger))
+        let connection = ServerConnection::new(messenger);
+
+        if let Some(ref sasl) = self.sasl_config {
+            Self::authenticate(&connection, &sasl.username, 
&sasl.password).await?;
+        }
+
+        Ok(connection)
+    }
+
+    /// Perform SASL/PLAIN authentication handshake.
+    ///
+    /// Retries on `RetriableAuthenticateException` with exponential backoff
+    /// (matching Java's unbounded retry behaviour). Non-retriable errors
+    /// (wrong password, unknown user) propagate immediately as
+    /// `Error::FlussAPIError` with the original error code.
+    async fn authenticate(
+        connection: &ServerConnection,
+        username: &str,
+        password: &str,
+    ) -> Result<(), Error> {
+        use crate::rpc::fluss_api_error::FlussError;
+        use crate::rpc::message::AuthenticateRequest;
+        use rand::Rng;
+
+        let initial_request = AuthenticateRequest::new_plain(username, 
password);
+        let mut retry_count: u32 = 0;
+
+        loop {
+            let request = initial_request.clone();
+            let result = connection.request(request).await;
+
+            match result {
+                Ok(response) => {
+                    // Check for server challenge (multi-round auth).
+                    // PLAIN mechanism never sends a challenge, but we handle 
it
+                    // for protocol correctness matching Java's 
handleAuthenticateResponse.
+                    if let Some(challenge) = response.challenge {
+                        let challenge_req = 
AuthenticateRequest::from_challenge("PLAIN", challenge);
+                        connection.request(challenge_req).await?;
+                    }
+                    return Ok(());
+                }
+                Err(Error::FlussAPIError { ref api_error })
+                    if FlussError::for_code(api_error.code)
+                        == FlussError::RetriableAuthenticateException =>
+                {
+                    retry_count += 1;
+                    // Cap the exponent like Java's ExponentialBackoff.expMax 
so that
+                    // jitter still produces a range at steady state instead 
of being
+                    // clamped to AUTH_MAX_BACKOFF_MS.
+                    let exp_max = (AUTH_MAX_BACKOFF_MS / 
AUTH_INITIAL_BACKOFF_MS).log2();
+                    let exp = ((retry_count as f64) - 1.0).min(exp_max);
+                    let term = AUTH_INITIAL_BACKOFF_MS * 
AUTH_BACKOFF_MULTIPLIER.powf(exp);
+                    let jitter_factor =
+                        1.0 - AUTH_JITTER + rand::rng().random::<f64>() * (2.0 
* AUTH_JITTER);

Review Comment:
   The jitter calculation uses `rand::rng().random::<f64>()` which returns a 
value in [0, 1), but the formula applies it as `1.0 - AUTH_JITTER + random * 
(2.0 * AUTH_JITTER)`. With AUTH_JITTER = 0.2, this produces a range of [0.8, 
1.2). However, the similar backoff implementation in `remote_log.rs` uses 
`random_range(0.75..=1.25)` which is more explicit. For consistency and 
clarity, consider using `random_range` instead of the manual calculation, or 
document why this specific formula is used to match Java's implementation.
   ```suggestion
                           rand::rng().random_range(1.0 - AUTH_JITTER..1.0 + 
AUTH_JITTER);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to