leekeiabstraction commented on code in PR #360:
URL: https://github.com/apache/fluss-rust/pull/360#discussion_r2836312402


##########
crates/fluss/src/client/write/bucket_assigner.rs:
##########
@@ -106,6 +106,41 @@ impl BucketAssigner for StickyBucketAssigner {
     }
 }
 
+/// Unlike [StickyBucketAssigner], each record is assigned to the next bucket
+/// in a rotating sequence, providing even data distribution across all 
buckets.
+pub struct RoundRobinBucketAssigner {
+    table_path: Arc<PhysicalTablePath>,
+    counter: AtomicI32,

Review Comment:
   We can store bucket number like Java side as well instead of Cluster's 
method.



##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -125,8 +125,12 @@ impl WriterClient {
             if let Some(assigner) = self.bucket_assigners.get(table_path) {
                 assigner.clone()
             } else {
-                let assigner =
-                    Self::create_bucket_assigner(table_info, 
Arc::clone(table_path), bucket_key)?;
+                let assigner = Self::create_bucket_assigner(
+                    table_info,
+                    Arc::clone(table_path),
+                    bucket_key,
+                    &self.config,
+                )?;
                 self.bucket_assigners
                     .insert(Arc::clone(table_path), 
Arc::clone(&assigner.clone()));

Review Comment:
   Just spotted an unnecessary clone here, would appreciate if you can amend 
this line to🙏
   
   ```rust
   .insert(Arc::clone(table_path), Arc::clone(&assigner));
   ```



##########
crates/fluss/src/client/write/bucket_assigner.rs:
##########
@@ -106,6 +106,41 @@ impl BucketAssigner for StickyBucketAssigner {
     }
 }
 
+/// Unlike [StickyBucketAssigner], each record is assigned to the next bucket
+/// in a rotating sequence, providing even data distribution across all 
buckets.
+pub struct RoundRobinBucketAssigner {
+    table_path: Arc<PhysicalTablePath>,
+    counter: AtomicI32,
+}
+
+impl RoundRobinBucketAssigner {
+    pub fn new(table_path: Arc<PhysicalTablePath>) -> Self {
+        Self {
+            table_path,
+            counter: AtomicI32::new(0),

Review Comment:
   Java side uses random initial value, that is actually helpful when multiple 
writers are used in that they don't write to the same bucket at the same time 
or in another scenario, if users misuse and create writer per-write, writes 
will only ever go to bucket 0.
   
   ```java
   
   public class RoundRobinBucketAssigner extends DynamicBucketAssigner {
   ...
       private final AtomicInteger counter = new AtomicInteger(new 
Random().nextInt());
   }
   ```



##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -173,8 +178,15 @@ impl WriterClient {
                 function,
             )))
         } else {
-            // TODO: Wire up toi use round robin/sticky according to 
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
-            Ok(Arc::new(StickyBucketAssigner::new(table_path)))
+            match config.writer_bucket_no_key_assigner.as_str() {
+                "sticky" => 
Ok(Arc::new(StickyBucketAssigner::new(table_path))),
+                "round_robin" => 
Ok(Arc::new(RoundRobinBucketAssigner::new(table_path))),

Review Comment:
   These could be enums as well like Java side. Can we also ensure that 
documentation are updated to describe these?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to