Copilot commented on code in PR #360:
URL: https://github.com/apache/fluss-rust/pull/360#discussion_r2837014400
##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -125,8 +125,12 @@ impl WriterClient {
if let Some(assigner) = self.bucket_assigners.get(table_path) {
assigner.clone()
} else {
- let assigner =
- Self::create_bucket_assigner(table_info,
Arc::clone(table_path), bucket_key)?;
+ let assigner = Self::create_bucket_assigner(
+ table_info,
+ Arc::clone(table_path),
+ bucket_key,
+ &self.config,
+ )?;
Review Comment:
`RoundRobinBucketAssigner::abort_if_batch_full()` is never actually
consulted by `WriterClient::send` (it always calls
`RecordAccumulator::append(..., abort_if_batch_full = true)`), so selecting
`round_robin` will still cause records to be retried with a new bucket when a
batch is full. Please thread `bucket_assigner.abort_if_batch_full()` into the
initial `append` call so round-robin can create a new batch for the same bucket
instead of forcing a reassign.
##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -125,8 +125,12 @@ impl WriterClient {
if let Some(assigner) = self.bucket_assigners.get(table_path) {
assigner.clone()
} else {
- let assigner =
- Self::create_bucket_assigner(table_info,
Arc::clone(table_path), bucket_key)?;
+ let assigner = Self::create_bucket_assigner(
+ table_info,
+ Arc::clone(table_path),
+ bucket_key,
+ &self.config,
+ )?;
self.bucket_assigners
.insert(Arc::clone(table_path),
Arc::clone(&assigner.clone()));
Review Comment:
Redundant cloning when inserting into `bucket_assigners`:
`Arc::clone(&assigner.clone())` clones the `Arc` twice. This can be simplified
to a single clone (e.g., `assigner.clone()` or `Arc::clone(&assigner)`) to
reduce unnecessary work and improve readability.
```suggestion
.insert(Arc::clone(table_path), Arc::clone(&assigner));
```
##########
crates/fluss/src/client/write/writer_client.rs:
##########
@@ -173,8 +178,15 @@ impl WriterClient {
function,
)))
} else {
- // TODO: Wire up toi use round robin/sticky according to
ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER
- Ok(Arc::new(StickyBucketAssigner::new(table_path)))
+ match config.writer_bucket_no_key_assigner.as_str() {
+ "sticky" =>
Ok(Arc::new(StickyBucketAssigner::new(table_path))),
+ "round_robin" =>
Ok(Arc::new(RoundRobinBucketAssigner::new(table_path))),
+ other => Err(Error::IllegalArgument {
+ message: format!(
+ "unknown writer_bucket_no_key_assigner '{other}',
expected 'sticky' or 'round_robin'"
+ ),
+ }),
+ }
Review Comment:
The new `writer_bucket_no_key_assigner` wiring in `create_bucket_assigner`
isn’t covered by tests (e.g., asserting that "round_robin" selects
`RoundRobinBucketAssigner` and an unknown value returns `IllegalArgument`).
Adding a small unit test around `WriterClient::create_bucket_assigner` would
prevent regressions in config handling.
##########
crates/fluss/src/config.rs:
##########
@@ -45,6 +46,11 @@ pub struct Config {
#[arg(long, default_value_t = DEFAULT_WRITER_BATCH_SIZE)]
pub writer_batch_size: i32,
+ /// Bucket assigner for tables without bucket keys: "sticky" or
"round_robin".
+ /// It is to match Java `client.writer.bucket.no-key-assigner`.
+ #[arg(long, default_value_t =
String::from(DEFAULT_BUCKET_NO_KEY_ASSIGNER))]
Review Comment:
This config value is only validated at runtime when a record without a
bucket key is written. Since the allowed values are fixed, consider enforcing
them at CLI parse time (e.g., via `clap` value parser / `ValueEnum`) so invalid
values fail fast with a clear usage error.
```suggestion
#[arg(
long,
value_parser = ["sticky", "round_robin"],
default_value = DEFAULT_BUCKET_NO_KEY_ASSIGNER
)]
```
##########
crates/fluss/src/client/write/bucket_assigner.rs:
##########
@@ -106,6 +106,41 @@ impl BucketAssigner for StickyBucketAssigner {
}
}
+/// Unlike [StickyBucketAssigner], each record is assigned to the next bucket
+/// in a rotating sequence, providing even data distribution across all
buckets.
+pub struct RoundRobinBucketAssigner {
+ table_path: Arc<PhysicalTablePath>,
+ counter: AtomicI32,
+}
+
+impl RoundRobinBucketAssigner {
+ pub fn new(table_path: Arc<PhysicalTablePath>) -> Self {
+ Self {
+ table_path,
+ counter: AtomicI32::new(0),
+ }
+ }
+}
+
+impl BucketAssigner for RoundRobinBucketAssigner {
+ fn abort_if_batch_full(&self) -> bool {
+ false
+ }
+
+ fn on_new_batch(&self, _cluster: &Cluster, _prev_bucket_id: i32) {}
+
+ fn assign_bucket(&self, _bucket_key: Option<&Bytes>, cluster: &Cluster) ->
Result<i32> {
+ let available_buckets =
cluster.get_available_buckets_for_table_path(&self.table_path);
+ let idx = self.counter.fetch_add(1, Ordering::Relaxed) & i32::MAX;
+ if available_buckets.is_empty() {
+ let num_buckets =
cluster.get_bucket_count(self.table_path.get_table_path());
Review Comment:
Potential panic: when `available_buckets` is empty, this does `idx %
num_buckets` where `num_buckets` comes from `cluster.get_bucket_count(...)` and
is not validated to be > 0. If a table is ever created/loaded with `num_buckets
== 0`, this will panic (division by zero). Consider returning an
`IllegalArgument`/`invalid_table` error when `num_buckets <= 0` instead of
performing the modulo.
```suggestion
let num_buckets =
cluster.get_bucket_count(self.table_path.get_table_path());
if num_buckets <= 0 {
return Err(IllegalArgument(
"Bucket count for table must be positive in
RoundRobinBucketAssigner"
.to_string(),
));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]