Copilot commented on code in PR #351:
URL: https://github.com/apache/fluss-rust/pull/351#discussion_r2820755275
##########
website/docs/user-guide/rust/example/log-tables.md:
##########
@@ -63,6 +63,21 @@ log_scanner.subscribe(0, 0).await?;
// Poll for records
let records = log_scanner.poll(Duration::from_secs(10)).await?;
+// Per-bucket access
+for (bucket, bucket_records) in records.records_by_buckets() {
+ println!("Bucket {}: {} records", bucket.bucket, bucket_records.len());
Review Comment:
The Rust example code uses `bucket.bucket` to access the bucket ID, but the
TableBucket struct has a private field named `bucket` and a public accessor
method `bucket_id()`. The documentation should use `bucket.bucket_id()` instead
of `bucket.bucket` to match the public API.
```suggestion
println!("Bucket {}: {} records", bucket.bucket_id(),
bucket_records.len());
```
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +153,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: HashMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+ "index {idx} out of range for ScanRecords of size {len}"
+ )));
+ }
+ let idx = idx as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ return Ok(recs[idx - offset].clone_ref(py).into_any());
+ }
+ offset += recs.len();
+ }
+ return Err(pyo3::exceptions::PyRuntimeError::new_err(
+ "internal error: total_count out of sync with records",
+ ));
+ }
+ // Try slice
+ if let Ok(slice) = key.downcast::<pyo3::types::PySlice>() {
+ let indices = slice.indices(self.total_count as isize)?;
+ let mut result: Vec<Py<ScanRecord>> = Vec::new();
+ let mut i = indices.start;
+ while (indices.step > 0 && i < indices.stop) || (indices.step < 0
&& i > indices.stop) {
+ let idx = i as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ result.push(recs[idx - offset].clone_ref(py));
+ break;
+ }
+ offset += recs.len();
+ }
+ i += indices.step;
+ }
Review Comment:
The slice implementation doesn't correctly handle negative steps. When
`indices.step < 0`, the code should iterate backwards (from high indices to low
indices), but the inner loop (lines 232-238) always iterates forward through
buckets. This means slices like `scan_records[::-1]` or `scan_records[10:0:-1]`
will not return records in the correct order. The implementation needs to
either convert each negative index to the corresponding record lookup, or
maintain a consistent bucket ordering that allows proper reverse iteration.
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +153,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: HashMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+ "index {idx} out of range for ScanRecords of size {len}"
+ )));
+ }
+ let idx = idx as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ return Ok(recs[idx - offset].clone_ref(py).into_any());
+ }
+ offset += recs.len();
+ }
Review Comment:
The flat indexing (`__getitem__` with int/slice) and iteration (`__iter__`)
use different mechanisms to traverse buckets. Indexing directly iterates
`self.records_by_bucket.values()` on each call (lines 215, 233), while
iteration captures keys once in a Vec (line 305). This means the bucket
traversal order can differ between indexing and iteration operations on the
same ScanRecords instance. For example, `scan_records[0]` might return a
different record than `next(iter(scan_records))` due to HashMap iteration
non-determinism. Consider capturing bucket keys in a consistent order (e.g.,
sorted or insertion-ordered) to ensure that all access patterns on the same
ScanRecords instance use the same bucket ordering.
##########
website/docs/user-guide/python/api-reference.md:
##########
@@ -137,17 +137,69 @@ Builder for creating a `Lookuper`. Obtain via
`FlussTable.new_lookup()`.
| `.subscribe_partition_buckets(partition_bucket_offsets)` | Subscribe to
multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) |
| `.unsubscribe(bucket_id)` | Unsubscribe
from a bucket (non-partitioned tables) |
| `.unsubscribe_partition(partition_id, bucket_id)` | Unsubscribe
from a partition bucket |
-| `.poll(timeout_ms) -> list[ScanRecord]` | Poll
individual records (record scanner only) |
+| `.poll(timeout_ms) -> ScanRecords` | Poll
individual records (record scanner only) |
| `.poll_arrow(timeout_ms) -> pa.Table` | Poll as
Arrow Table (batch scanner only) |
| `.poll_record_batch(timeout_ms) -> list[RecordBatch]` | Poll batches
with metadata (batch scanner only) |
| `.to_arrow() -> pa.Table` | Read all
subscribed data as Arrow Table (batch scanner only) |
| `.to_pandas() -> pd.DataFrame` | Read all
subscribed data as DataFrame (batch scanner only) |
+## `ScanRecords`
+
+Returned by `LogScanner.poll()`. Records are grouped by bucket.
+
+> **Note:** Flat iteration and integer indexing traverse buckets in an
arbitrary order that is consistent within a single `ScanRecords` instance but
may differ between `poll()` calls. Use per-bucket access (`.items()`,
`.records(bucket)`) when bucket ordering matters.
Review Comment:
The note states that flat iteration and integer indexing "traverse buckets
in an arbitrary order that is consistent within a single ScanRecords instance".
However, this is not accurate. Due to HashMap iteration non-determinism in the
implementation, the bucket traversal order can actually differ between
different operations on the same ScanRecords instance. For example,
`scan_records[0]` (which calls `__getitem__`) and `next(iter(scan_records))`
(which calls `__iter__`) may return different records because they iterate over
the HashMap differently. The note should clarify that the order is consistent
within a single iteration/access operation but may vary between different
operations on the same instance.
```suggestion
> **Note:** Flat iteration and integer indexing traverse buckets in an
arbitrary, implementation-dependent order. The order is only consistent within
a single iteration or indexing operation and may differ between different
operations on the same `ScanRecords` instance (for example, `scan_records[0]`
vs `next(iter(scan_records))`) and between `poll()` calls. Use per-bucket
access (`.items()`, `.records(bucket)`) when bucket ordering matters.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]