ozankabak commented on code in PR #13412:
URL: https://github.com/apache/datafusion/pull/13412#discussion_r1844912389


##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -168,6 +172,164 @@ pub enum FilePushdownSupport {
     Supported,
 }
 
+/// Possible outputs of a [`BatchDeserializer`].
+#[derive(Debug, PartialEq)]
+pub enum DeserializerOutput {
+    /// A successfully deserialized [`RecordBatch`].
+    RecordBatch(RecordBatch),
+    /// The deserializer requires more data to make progress.
+    RequiresMoreData,
+    /// The input data has been exhausted.
+    InputExhausted,
+}
+
+/// Trait defining a scheme for deserializing byte streams into structured 
data.
+/// Implementors of this trait are responsible for converting raw bytes into
+/// `RecordBatch` objects.
+pub trait BatchDeserializer<T>: Send + Debug {
+    /// Feeds a message for deserialization, updating the internal state of
+    /// this `BatchDeserializer`. Note that one can call this function multiple
+    /// times before calling `next`, which will queue multiple messages for
+    /// deserialization. Returns the number of bytes consumed.
+    fn digest(&mut self, message: T) -> usize;
+
+    /// Attempts to deserialize any pending messages and returns a
+    /// `DeserializerOutput` to indicate progress.
+    fn next(&mut self) -> Result<DeserializerOutput, ArrowError>;
+
+    /// Informs the deserializer that no more messages will be provided for
+    /// deserialization.
+    fn finish(&mut self);
+}
+
+/// A general interface for decoders such as [`arrow::json::reader::Decoder`] 
and
+/// [`arrow::csv::reader::Decoder`]. Defines an interface similar to
+/// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes
+/// a method to check if the decoder can flush early. Intended to be used in
+/// conjunction with [`DecoderDeserializer`].
+///
+/// [`arrow::json::reader::Decoder`]: ::arrow::json::reader::Decoder
+/// [`arrow::csv::reader::Decoder`]: ::arrow::csv::reader::Decoder
+/// [`Decoder::decode`]: ::arrow::json::reader::Decoder::decode
+/// [`Decoder::flush`]: ::arrow::json::reader::Decoder::flush
+pub(crate) trait Decoder: Send + Debug {
+    /// See [`arrow::json::reader::Decoder::decode`].
+    ///
+    /// [`arrow::json::reader::Decoder::decode`]: 
::arrow::json::reader::Decoder::decode
+    fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>;
+
+    /// See [`arrow::json::reader::Decoder::flush`].
+    ///
+    /// [`arrow::json::reader::Decoder::flush`]: 
::arrow::json::reader::Decoder::flush
+    fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>;
+
+    /// Whether the decoder can flush early in its current state.
+    fn can_flush_early(&self) -> bool;
+}
+
+impl<T: Decoder> Debug for DecoderDeserializer<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("Deserializer")
+            .field("buffered_queue", &self.buffered_queue)
+            .field("finalized", &self.finalized)
+            .finish()
+    }
+}
+
+impl<T: Decoder> BatchDeserializer<Bytes> for DecoderDeserializer<T> {
+    fn digest(&mut self, message: Bytes) -> usize {
+        if message.is_empty() {
+            return 0;
+        }
+
+        let consumed = message.len();
+        self.buffered_queue.push_back(message);
+        consumed
+    }
+
+    fn next(&mut self) -> Result<DeserializerOutput, ArrowError> {
+        while let Some(buffered) = self.buffered_queue.front_mut() {
+            let decoded = self.decoder.decode(buffered)?;
+            buffered.advance(decoded);
+
+            if buffered.is_empty() {
+                self.buffered_queue.pop_front();
+            }
+
+            // Flush when the stream ends or batch size is reached
+            // Certain implementations can flush early
+            if decoded == 0 || self.decoder.can_flush_early() {
+                return match self.decoder.flush() {
+                    Ok(Some(batch)) => 
Ok(DeserializerOutput::RecordBatch(batch)),
+                    Ok(None) => continue,
+                    Err(e) => Err(e),
+                };
+            }
+        }
+        if self.finalized {
+            Ok(DeserializerOutput::InputExhausted)
+        } else {
+            Ok(DeserializerOutput::RequiresMoreData)
+        }
+    }
+
+    fn finish(&mut self) {
+        self.finalized = true;
+        // Ensure the decoder is flushed:
+        self.buffered_queue.push_back(Bytes::new());
+    }
+}
+
+/// A generic, decoder-based deserialization scheme for processing encoded 
data.
+///
+/// This struct is responsible for converting a stream of bytes, which 
represent
+/// encoded data, into a stream of `RecordBatch` objects, following the 
specified
+/// schema and formatting options.

Review Comment:
   Done - thank you for pointing it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to