This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new aeb277642 Implement `peek_next_page` and `skip_next_page` for
`InMemoryColumnCh… (#2155)
aeb277642 is described below
commit aeb277642d1267673055aa0f705646db96dad340
Author: Dan Harris <[email protected]>
AuthorDate: Tue Jul 26 08:15:21 2022 -0400
Implement `peek_next_page` and `skip_next_page` for `InMemoryColumnCh…
(#2155)
* Implement `peek_next_page` and `skip_next_page` for
`InMemoryColumnChunkReader`
* Conversion for parquet page header to page metadata
---
parquet/src/arrow/async_reader.rs | 155 ++++++++++++++++++++++++++++++++++++--
parquet/src/column/page.rs | 28 ++++++-
2 files changed, 177 insertions(+), 6 deletions(-)
diff --git a/parquet/src/arrow/async_reader.rs
b/parquet/src/arrow/async_reader.rs
index 19e1de9fc..640d1b81f 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -87,7 +87,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
-use parquet_format::PageType;
+use parquet_format::{PageHeader, PageType};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow::datatypes::SchemaRef;
@@ -520,6 +520,8 @@ struct InMemoryColumnChunkReader {
decompressor: Option<Box<dyn Codec>>,
offset: usize,
seen_num_values: i64,
+ // If the next page header has already been "peeked", we will cache it here
+ next_page_header: Option<PageHeader>,
}
impl InMemoryColumnChunkReader {
@@ -531,6 +533,7 @@ impl InMemoryColumnChunkReader {
decompressor,
offset: 0,
seen_num_values: 0,
+ next_page_header: None,
};
Ok(result)
}
@@ -548,10 +551,17 @@ impl PageReader for InMemoryColumnChunkReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.chunk.num_values {
let mut cursor =
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
- let page_header = read_page_header(&mut cursor)?;
+ let page_header = if let Some(page_header) =
self.next_page_header.take() {
+ // The next page header has already been peeked, so use the
cached value
+ page_header
+ } else {
+ let page_header = read_page_header(&mut cursor)?;
+ self.offset += cursor.position() as usize;
+ page_header
+ };
+
let compressed_size = page_header.compressed_page_size as usize;
- self.offset += cursor.position() as usize;
let start_offset = self.offset;
let end_offset = self.offset + compressed_size;
self.offset = end_offset;
@@ -589,11 +599,47 @@ impl PageReader for InMemoryColumnChunkReader {
}
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
- Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ while self.seen_num_values < self.chunk.num_values {
+ return if let Some(buffered_header) =
self.next_page_header.as_ref() {
+ if let Ok(page_metadata) = buffered_header.try_into() {
+ Ok(Some(page_metadata))
+ } else {
+ // For unknown page type (e.g., INDEX_PAGE), skip and read
next.
+ self.next_page_header = None;
+ continue;
+ }
+ } else {
+ let mut cursor =
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
+ let page_header = read_page_header(&mut cursor)?;
+ self.offset += cursor.position() as usize;
+
+ let page_metadata = if let Ok(page_metadata) =
(&page_header).try_into() {
+ Ok(Some(page_metadata))
+ } else {
+ // For unknown page type (e.g., INDEX_PAGE), skip and read
next.
+ continue;
+ };
+
+ self.next_page_header = Some(page_header);
+ page_metadata
+ };
+ }
+
+ Ok(None)
}
fn skip_next_page(&mut self) -> Result<()> {
- Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
+ if let Some(buffered_header) = self.next_page_header.take() {
+ // The next page header has already been peeked, so just advance
the offset
+ self.offset += buffered_header.compressed_page_size as usize;
+ } else {
+ let mut cursor =
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
+ let page_header = read_page_header(&mut cursor)?;
+ self.offset += cursor.position() as usize;
+ self.offset += page_header.compressed_page_size as usize;
+ }
+
+ Ok(())
}
}
@@ -699,4 +745,103 @@ mod tests {
]
);
}
+
+ #[tokio::test]
+ async fn test_in_memory_column_chunk_reader() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_plain.parquet", testdata);
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let metadata = crate::file::footer::parse_metadata(&data).unwrap();
+
+ let column_metadata = metadata.row_group(0).column(0);
+
+ let (start, length) = column_metadata.byte_range();
+
+ let column_data = data.slice(start as usize..(start + length) as
usize);
+
+ let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
+ num_values: column_metadata.num_values(),
+ compression: column_metadata.compression(),
+ physical_type: column_metadata.column_type(),
+ data: column_data,
+ })
+ .expect("building reader");
+
+ let first_page = reader
+ .peek_next_page()
+ .expect("peeking first page")
+ .expect("first page is empty");
+
+ assert!(first_page.is_dict);
+ assert_eq!(first_page.num_rows, 0);
+
+ let first_page = reader
+ .get_next_page()
+ .expect("getting first page")
+ .expect("first page is empty");
+
+ assert_eq!(
+ first_page.page_type(),
+ crate::basic::PageType::DICTIONARY_PAGE
+ );
+ assert_eq!(first_page.num_values(), 8);
+
+ let second_page = reader
+ .peek_next_page()
+ .expect("peeking second page")
+ .expect("second page is empty");
+
+ assert!(!second_page.is_dict);
+ assert_eq!(second_page.num_rows, 8);
+
+ let second_page = reader
+ .get_next_page()
+ .expect("getting second page")
+ .expect("second page is empty");
+
+ assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
+ assert_eq!(second_page.num_values(), 8);
+
+ let third_page = reader.peek_next_page().expect("getting third page");
+
+ assert!(third_page.is_none());
+
+ let third_page = reader.get_next_page().expect("getting third page");
+
+ assert!(third_page.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_in_memory_column_chunk_reader_skip_page() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_plain.parquet", testdata);
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let metadata = crate::file::footer::parse_metadata(&data).unwrap();
+
+ let column_metadata = metadata.row_group(0).column(0);
+
+ let (start, length) = column_metadata.byte_range();
+
+ let column_data = data.slice(start as usize..(start + length) as
usize);
+
+ let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
+ num_values: column_metadata.num_values(),
+ compression: column_metadata.compression(),
+ physical_type: column_metadata.column_type(),
+ data: column_data,
+ })
+ .expect("building reader");
+
+ reader.skip_next_page().expect("skipping first page");
+
+ let second_page = reader
+ .get_next_page()
+ .expect("getting second page")
+ .expect("second page is empty");
+
+ assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
+ assert_eq!(second_page.num_values(), 8);
+ }
}
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 78890f36a..c61e9c0b3 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -18,10 +18,11 @@
//! Contains Parquet Page definitions and page reader interface.
use crate::basic::{Encoding, PageType};
-use crate::errors::Result;
+use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
+use parquet_format::PageHeader;
/// Parquet Page definition.
///
@@ -196,6 +197,31 @@ pub struct PageMetadata {
pub is_dict: bool,
}
+impl TryFrom<&PageHeader> for PageMetadata {
+ type Error = ParquetError;
+
+ fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
+ match value.type_ {
+ parquet_format::PageType::DataPage => Ok(PageMetadata {
+ num_rows: value.data_page_header.as_ref().unwrap().num_values
as usize,
+ is_dict: false,
+ }),
+ parquet_format::PageType::DictionaryPage => Ok(PageMetadata {
+ num_rows: usize::MIN,
+ is_dict: true,
+ }),
+ parquet_format::PageType::DataPageV2 => Ok(PageMetadata {
+ num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows
as usize,
+ is_dict: false,
+ }),
+ other => Err(ParquetError::General(format!(
+ "page type {:?} cannot be converted to PageMetadata",
+ other
+ ))),
+ }
+ }
+}
+
/// API for reading pages from a column chunk.
/// This offers a iterator like API to get the next page.
pub trait PageReader: Iterator<Item = Result<Page>> + Send {