fresh-borzoni commented on code in PR #351:
URL: https://github.com/apache/fluss-rust/pull/351#discussion_r2821765010


##########
bindings/python/src/table.rs:
##########
@@ -155,6 +153,247 @@ impl RecordBatch {
     }
 }
 
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+    records_by_bucket: HashMap<TableBucket, Vec<Py<ScanRecord>>>,
+    total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+    /// List of distinct buckets that have records in this result.
+    pub fn buckets(&self) -> Vec<TableBucket> {
+        self.records_by_bucket.keys().cloned().collect()
+    }
+
+    /// Get records for a specific bucket.
+    ///
+    /// Returns an empty list if the bucket is not present (matches Rust/Java 
behavior).
+    pub fn records(&self, py: Python, bucket: &TableBucket) -> 
Vec<Py<ScanRecord>> {
+        self.records_by_bucket
+            .get(bucket)
+            .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+            .unwrap_or_default()
+    }
+
+    /// Total number of records across all buckets.
+    pub fn count(&self) -> usize {
+        self.total_count
+    }
+
+    /// Whether the result set is empty.
+    pub fn is_empty(&self) -> bool {
+        self.total_count == 0
+    }
+
+    fn __len__(&self) -> usize {
+        self.total_count
+    }
+
+    /// Type-dispatched indexing:
+    ///   records[0]       → ScanRecord (flat index)
+    ///   records[-1]      → ScanRecord (negative index)
+    ///   records[1:3]     → list[ScanRecord] (slice)
+    ///   records[bucket]  → list[ScanRecord] (by bucket)
+    fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) -> 
PyResult<Py<pyo3::PyAny>> {
+        // Try integer index first
+        if let Ok(mut idx) = key.extract::<isize>() {
+            let len = self.total_count as isize;
+            if idx < 0 {
+                idx += len;
+            }
+            if idx < 0 || idx >= len {
+                return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+                    "index {idx} out of range for ScanRecords of size {len}"
+                )));
+            }
+            let idx = idx as usize;
+            let mut offset = 0;
+            for recs in self.records_by_bucket.values() {
+                if idx < offset + recs.len() {
+                    return Ok(recs[idx - offset].clone_ref(py).into_any());
+                }
+                offset += recs.len();
+            }
+            return Err(pyo3::exceptions::PyRuntimeError::new_err(
+                "internal error: total_count out of sync with records",
+            ));
+        }
+        // Try slice
+        if let Ok(slice) = key.downcast::<pyo3::types::PySlice>() {
+            let indices = slice.indices(self.total_count as isize)?;
+            let mut result: Vec<Py<ScanRecord>> = Vec::new();
+            let mut i = indices.start;
+            while (indices.step > 0 && i < indices.stop) || (indices.step < 0 
&& i > indices.stop) {
+                let idx = i as usize;
+                let mut offset = 0;
+                for recs in self.records_by_bucket.values() {
+                    if idx < offset + recs.len() {
+                        result.push(recs[idx - offset].clone_ref(py));
+                        break;
+                    }
+                    offset += recs.len();
+                }
+                i += indices.step;
+            }

Review Comment:
   No, added tests to demonstate it's correct



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to