Copilot commented on code in PR #360:
URL: https://github.com/apache/fluss-rust/pull/360#discussion_r2837014400


##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -125,8 +125,12 @@ impl WriterClient {
             if let Some(assigner) = self.bucket_assigners.get(table_path) {
                 assigner.clone()
             } else {
-                let assigner =
-                    Self::create_bucket_assigner(table_info, 
Arc::clone(table_path), bucket_key)?;
+                let assigner = Self::create_bucket_assigner(
+                    table_info,
+                    Arc::clone(table_path),
+                    bucket_key,
+                    &self.config,
+                )?;

Review Comment:
   `RoundRobinBucketAssigner::abort_if_batch_full()` is never actually 
consulted by `WriterClient::send` (it always calls 
`RecordAccumulator::append(..., abort_if_batch_full = true)`), so selecting 
`round_robin` will still cause records to be retried with a new bucket when a 
batch is full. Please thread `bucket_assigner.abort_if_batch_full()` into the 
initial `append` call so round-robin can create a new batch for the same bucket 
instead of forcing a reassign.



##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -125,8 +125,12 @@ impl WriterClient {
             if let Some(assigner) = self.bucket_assigners.get(table_path) {
                 assigner.clone()
             } else {
-                let assigner =
-                    Self::create_bucket_assigner(table_info, 
Arc::clone(table_path), bucket_key)?;
+                let assigner = Self::create_bucket_assigner(
+                    table_info,
+                    Arc::clone(table_path),
+                    bucket_key,
+                    &self.config,
+                )?;
                 self.bucket_assigners
                     .insert(Arc::clone(table_path), 
Arc::clone(&assigner.clone()));

Review Comment:
   Redundant cloning when inserting into `bucket_assigners`: 
`Arc::clone(&assigner.clone())` clones the `Arc` twice. This can be simplified 
to a single clone (e.g., `assigner.clone()` or `Arc::clone(&assigner)`) to 
reduce unnecessary work and improve readability.
   ```suggestion
                       .insert(Arc::clone(table_path), Arc::clone(&assigner));
   ```



##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -173,8 +178,15 @@ impl WriterClient {
                 function,
             )))
         } else {
-            // TODO: Wire up toi use round robin/sticky according to 
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
-            Ok(Arc::new(StickyBucketAssigner::new(table_path)))
+            match config.writer_bucket_no_key_assigner.as_str() {
+                "sticky" => 
Ok(Arc::new(StickyBucketAssigner::new(table_path))),
+                "round_robin" => 
Ok(Arc::new(RoundRobinBucketAssigner::new(table_path))),
+                other => Err(Error::IllegalArgument {
+                    message: format!(
+                        "unknown writer_bucket_no_key_assigner '{other}', 
expected 'sticky' or 'round_robin'"
+                    ),
+                }),
+            }

Review Comment:
   The new `writer_bucket_no_key_assigner` wiring in `create_bucket_assigner` 
isn’t covered by tests (e.g., asserting that "round_robin" selects 
`RoundRobinBucketAssigner` and an unknown value returns `IllegalArgument`). 
Adding a small unit test around `WriterClient::create_bucket_assigner` would 
prevent regressions in config handling.



##########
crates/fluss/src/config.rs:
##########
@@ -45,6 +46,11 @@ pub struct Config {
     #[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
     pub writer_batch_size: i32,
 
+    /// Bucket assigner for tables without bucket keys: "sticky" or 
"round_robin".
+    /// It is to match Java `client.writer.bucket.no-key-assigner`.
+    #[arg(long, default_value_t = 
String::from(DEFAULT_BUCKET_NO_KEY_ASSIGNER))]

Review Comment:
   This config value is only validated at runtime when a record without a 
bucket key is written. Since the allowed values are fixed, consider enforcing 
them at CLI parse time (e.g., via `clap` value parser / `ValueEnum`) so invalid 
values fail fast with a clear usage error.
   ```suggestion
       #[arg(
           long,
           value_parser = ["sticky", "round_robin"],
           default_value = DEFAULT_BUCKET_NO_KEY_ASSIGNER
       )]
   ```



##########
crates/fluss/src/client/write/bucket_assigner.rs:
##########
@@ -106,6 +106,41 @@ impl BucketAssigner for StickyBucketAssigner {
     }
 }
 
+/// Unlike [StickyBucketAssigner], each record is assigned to the next bucket
+/// in a rotating sequence, providing even data distribution across all 
buckets.
+pub struct RoundRobinBucketAssigner {
+    table_path: Arc<PhysicalTablePath>,
+    counter: AtomicI32,
+}
+
+impl RoundRobinBucketAssigner {
+    pub fn new(table_path: Arc<PhysicalTablePath>) -> Self {
+        Self {
+            table_path,
+            counter: AtomicI32::new(0),
+        }
+    }
+}
+
+impl BucketAssigner for RoundRobinBucketAssigner {
+    fn abort_if_batch_full(&self) -> bool {
+        false
+    }
+
+    fn on_new_batch(&self, _cluster: &Cluster, _prev_bucket_id: i32) {}
+
+    fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) -> 
Result<i32> {
+        let available_buckets = 
cluster.get_available_buckets_for_table_path(&self.table_path);
+        let idx = self.counter.fetch_add(1, Ordering::Relaxed) & i32::MAX;
+        if available_buckets.is_empty() {
+            let num_buckets = 
cluster.get_bucket_count(self.table_path.get_table_path());

Review Comment:
   Potential panic: when `available_buckets` is empty, this does `idx % 
num_buckets` where `num_buckets` comes from `cluster.get_bucket_count(...)` and 
is not validated to be > 0. If a table is ever created/loaded with `num_buckets 
== 0`, this will panic (division by zero). Consider returning an 
`IllegalArgument`/`invalid_table` error when `num_buckets <= 0` instead of 
performing the modulo.
   ```suggestion
               let num_buckets = 
cluster.get_bucket_count(self.table_path.get_table_path());
               if num_buckets <= 0 {
                   return Err(IllegalArgument(
                       "Bucket count for table must be positive in 
RoundRobinBucketAssigner"
                           .to_string(),
                   ));
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to