paleolimbot commented on code in PR #471: URL: https://github.com/apache/sedona-db/pull/471#discussion_r2694991774
########## rust/sedona-pointcloud/src/laz/metadata.rs: ########## @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + collections::HashMap, + error::Error, + io::{Cursor, Read}, + ops::Range, + sync::Arc, +}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::try_schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range<u64>, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc<Header>, + pub chunk_table: Vec<ChunkMeta>, + pub extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.chunk_table.capacity() * std::mem::size_of::<ChunkMeta>() + + self.extra_attributes.capacity() * std::mem::size_of::<ExtraAttribute>() + } + + fn extra_info(&self) -> HashMap<String, String> { + HashMap::new() + } +} + +/// Reader for laz file metadata in object storage. +pub struct LazMetadataReader<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + options: LazTableOptions, +} + +impl<'a> LazMetadataReader<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + file_metadata_cache: None, + options: Default::default(), + } + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// set table options + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } + + /// Fetch header + pub async fn fetch_header(&self) -> Result<Header, DataFusionError> { + fetch_header(self.store, self.object_meta) + .await + .map_err(DataFusionError::External) + } + + /// Fetch laz metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result<Arc<LazMetadata>, DataFusionError> { + let Self { + store, + object_meta, + file_metadata_cache, + options: _, + } = self; + + if let Some(las_file_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::<LazMetadata>() + .map(|laz_file_metadata| Arc::new(laz_file_metadata.to_owned())) + }) + { + return Ok(las_file_metadata); + } + + let header = self.fetch_header().await?; + let extra_attributes = extra_bytes_attributes(&header)?; + let chunk_table = chunk_table(*store, object_meta, &header).await?; + + let metadata = Arc::new(LazMetadata { + header: Arc::new(header), + chunk_table, + extra_attributes: Arc::new(extra_attributes), + }); + + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put(object_meta, metadata.clone()); + } + + Ok(metadata) + } + + /// Read and parse the schema of the laz file + pub async fn fetch_schema(&mut self) -> Result<Schema, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let schema = try_schema_from_header( + &metadata.header, + self.options.point_encoding, + self.options.extra_bytes, + )?; + + Ok(schema) + } + + /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and extracts + /// the statistics in the metadata + pub async fn fetch_statistics( + &self, + table_schema: &SchemaRef, + ) -> Result<Statistics, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let mut statistics = Statistics::new_unknown(table_schema) + .with_num_rows(Precision::Exact(metadata.header.number_of_points() as usize)) + .with_total_byte_size(Precision::Exact( + metadata + .chunk_table + .iter() + .map(|meta| meta.byte_range.end - meta.byte_range.start) + .sum::<u64>() as usize, + )); + + let bounds = metadata.header.bounds(); + for field in table_schema.fields() { + let cs = match field.name().as_str() { + "x" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.x)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.x)))) + .with_null_count(Precision::Exact(0)), + "y" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.y)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.y)))) + .with_null_count(Precision::Exact(0)), + "z" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.z)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.z)))) + .with_null_count(Precision::Exact(0)), + _ => ColumnStatistics::new_unknown(), + }; + + statistics = statistics.add_column_statistics(cs); + } + + Ok(statistics) + } +} + +pub(crate) async fn fetch_header( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, +) -> Result<Header, Box<dyn Error + Send + Sync>> { + let location = &object_meta.location; + + // Header + let bytes = store.get_range(location, 0..375).await?; + let reader = Cursor::new(bytes); + let raw_header = RawHeader::read_from(reader)?; + + let header_size = raw_header.header_size as u64; + let offset_to_point_data = raw_header.offset_to_point_data as u64; + let num_vlr = raw_header.number_of_variable_length_records; + let evlr = raw_header.evlr; + + let mut builder = Builder::new(raw_header)?; + + // VLRs + let bytes = store + .get_range(location, header_size..offset_to_point_data) + .await?; + let mut reader = Cursor::new(bytes); + + for _ in 0..num_vlr { + let vlr = RawVlr::read_from(&mut reader, false).map(Vlr::new)?; + builder.vlrs.push(vlr); + } + + reader.read_to_end(&mut builder.vlr_padding)?; + + // EVLRs + if let Some(evlr) = evlr { + let mut start = evlr.start_of_first_evlr; + + for _ in 0..evlr.number_of_evlrs { + let mut end = start + 60; + + let bytes = store.get_range(location, start..end).await?; + + end += u64::from_le_bytes(bytes[20..28].try_into()?); + + let bytes = store.get_range(location, start..end).await?; + let mut reader = Cursor::new(bytes); + let evlr = RawVlr::read_from(&mut reader, true).map(Vlr::new)?; + + builder.evlrs.push(evlr); + + start = end; + } + } + + Ok(builder.into_header()?) +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ExtraAttribute { + pub data_type: DataType, + pub no_data: Option<[u8; 8]>, + pub scale: Option<f64>, + pub offset: Option<f64>, +} + +pub(crate) fn extra_bytes_attributes( + header: &Header, +) -> Result<Vec<ExtraAttribute>, Box<dyn Error + Send + Sync>> { + let mut attributes = Vec::new(); + + for vlr in header.all_vlrs() { + if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) { + continue; + } + + for bytes in vlr.data.chunks(192) { + // data type + let data_type = match bytes[2] { + 0 => DataType::FixedSizeBinary(bytes[3] as i32), + 1 => DataType::UInt8, + 2 => DataType::Int8, + 3 => DataType::UInt16, + 4 => DataType::Int16, + 5 => DataType::UInt32, + 6 => DataType::Int32, + 7 => DataType::UInt64, + 8 => DataType::Int64, + 9 => DataType::Float32, + 10 => DataType::Float64, + 11..=30 => return Err("deprecated extra bytes data type".into()), + 31..=255 => return Err("reserved extra butes data type".into()), + }; + + // no data + let no_data = if bytes[2] != 0 && bytes[3] & 1 == 1 { + Some(bytes[40..48].try_into().unwrap()) + } else { + None + }; + + // scale + let scale = if bytes[2] != 0 && bytes[3] >> 3 & 1 == 1 { + Some(f64::from_le_bytes(bytes[112..120].try_into().unwrap())) + } else { + None + }; + + // offset + let offset = if bytes[2] != 0 && bytes[3] >> 4 & 1 == 1 { + Some(f64::from_le_bytes(bytes[136..144].try_into().unwrap())) + } else { + None + }; + + let attribute = ExtraAttribute { + data_type, + no_data, + scale, + offset, + }; + + attributes.push(attribute); + } + } + + Ok(attributes) +} + +pub(crate) async fn chunk_table( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, + header: &Header, +) -> Result<Vec<ChunkMeta>, Box<dyn Error + Send + Sync>> { + let num_points = header.number_of_points(); + let mut point_offset = 0; + + let vlr_len = header.vlrs().iter().map(|v| v.len(false)).sum::<usize>(); + let header_size = header.version().header_size() as usize + header.padding().len(); + let mut byte_offset = (header_size + vlr_len + header.vlr_padding().len()) as u64; + + let laz_vlr = header.laz_vlr()?; + + let ranges = [ + byte_offset..byte_offset + 8, + object_meta.size - 8..object_meta.size, + ]; + let bytes = store.get_ranges(&object_meta.location, &ranges).await?; + let mut table_offset = None; + + let table_offset1 = i64::from_le_bytes(bytes[0].to_vec().try_into().unwrap()) as u64; + let table_offset2 = i64::from_le_bytes(bytes[1].to_vec().try_into().unwrap()) as u64; + + if table_offset1 > byte_offset { + table_offset = Some(table_offset1); + } else if table_offset2 > byte_offset { + table_offset = Some(table_offset2); + } + + let Some(table_offset) = table_offset else { + return Err("LAZ files without chunk table not supported (yet)".into()); + }; + + if table_offset > object_meta.size { + return Err("LAZ file chunk table position is missing/bad".into()); + } + + let bytes = store + .get_range(&object_meta.location, table_offset..table_offset + 8) + .await?; + + let num_chunks = u32::from_le_bytes(bytes[4..].to_vec().try_into().unwrap()) as u64; + let range = table_offset..table_offset + 8 + 8 * num_chunks; + let bytes = store.get_range(&object_meta.location, range).await?; + + let mut reader = Cursor::new(bytes); + let variable_size = laz_vlr.uses_variable_size_chunks(); + let chunk_table = ChunkTable::read(&mut reader, variable_size)?; + assert_eq!(chunk_table.len(), num_chunks as usize); + + let mut chunks = Vec::with_capacity(num_chunks as usize); + let chunk_size = laz_vlr.chunk_size() as u64; + byte_offset += 8; + + for chunk_table_entry in &chunk_table { + let point_count = if variable_size { + chunk_table_entry.point_count + } else { + chunk_size.min(num_points - point_offset) + }; + + let chunk = ChunkMeta { + num_points: point_count, + point_offset, + byte_range: byte_offset..byte_offset + chunk_table_entry.byte_count, + }; + chunks.push(chunk); + point_offset += point_count; + byte_offset += chunk_table_entry.byte_count; + } + + Ok(chunks) +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use las::{point::Format, Builder, Reader, Writer}; + use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; + + use crate::laz::metadata::LazMetadataReader; + + #[allow(static_mut_refs)] + #[tokio::test] + async fn header_basic_e2e() { + // create laz file + static mut LAZ: Vec<u8> = Vec::new(); Review Comment: Maybe also just a tempfile unless this really needs to be static/mutable? ########## rust/sedona-pointcloud/src/laz/reader.rs: ########## @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + io::{Cursor, Read, Seek}, + sync::Arc, +}; + +use arrow_array::RecordBatch; +use datafusion_common::error::DataFusionError; +use datafusion_datasource::PartitionedFile; +use datafusion_execution::cache::cache_manager::FileMetadataCache; +use futures::{future::BoxFuture, FutureExt}; +use las::{raw::Point as RawPoint, Point}; +use laz::{ + record::{ + LayeredPointRecordDecompressor, RecordDecompressor, SequentialPointRecordDecompressor, + }, + DecompressionSelection, LasZipError, LazItem, +}; +use object_store::ObjectStore; + +use crate::laz::{ + builder::RowBuilder, + metadata::{ChunkMeta, LazMetadata, LazMetadataReader}, + options::LazTableOptions, +}; + +/// Laz file reader factory +#[derive(Debug)] +pub struct LazFileReaderFactory { + store: Arc<dyn ObjectStore>, + metadata_cache: Option<Arc<dyn FileMetadataCache>>, +} + +impl LazFileReaderFactory { + /// Create a new `LazFileReaderFactory`. + pub fn new( + store: Arc<dyn ObjectStore>, + metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + Self { + store, + metadata_cache, + } + } + + pub fn create_reader( + &self, + partitioned_file: PartitionedFile, + options: LazTableOptions, + ) -> Result<Box<LazFileReader>, DataFusionError> { + Ok(Box::new(LazFileReader { + partitioned_file, + store: self.store.clone(), + metadata_cache: self.metadata_cache.clone(), + options, + })) + } +} + +/// Reader for a laz file in object storage. +pub struct LazFileReader { + partitioned_file: PartitionedFile, + store: Arc<dyn ObjectStore>, + metadata_cache: Option<Arc<dyn FileMetadataCache>>, + pub options: LazTableOptions, +} + +impl LazFileReader { + pub fn get_metadata<'a>(&'a self) -> BoxFuture<'a, Result<Arc<LazMetadata>, DataFusionError>> { + let object_meta = self.partitioned_file.object_meta.clone(); + let metadata_cache = self.metadata_cache.clone(); + + async move { + LazMetadataReader::new(&self.store, &object_meta) + .with_file_metadata_cache(metadata_cache) + .with_options(self.options.clone()) + .fetch_metadata() + .await + } + .boxed() + } + + pub async fn get_batch(&self, chunk_meta: &ChunkMeta) -> Result<RecordBatch, DataFusionError> { + let metadata = self.get_metadata().await?; + let header = metadata.header.clone(); + + // fetch bytes + let location = &self.partitioned_file.object_meta.location; + let bytes = self + .store + .get_range(location, chunk_meta.byte_range.clone()) + .await?; + + // laz decompressor + let laz_vlr = header + .laz_vlr() + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let reader = Cursor::new(bytes); + let mut decompressor = record_decompressor(laz_vlr.items(), reader) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // record batch builder + let num_points = chunk_meta.num_points as usize; + let mut builder = RowBuilder::new(num_points, header.clone()) + .with_point_encoding(self.options.point_encoding) + .with_extra_attributes(metadata.extra_attributes.clone(), self.options.extra_bytes); + + // transform + let format = header.point_format(); + let transforms = header.transforms(); + + let out = vec![0; format.len() as usize]; + let mut buffer = Cursor::new(out); + + for _ in 0..chunk_meta.num_points { + buffer.set_position(0); + decompressor.decompress_next(buffer.get_mut())?; + + let point = RawPoint::read_from(&mut buffer, format) + .map(|raw_point| Point::new(raw_point, transforms)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + builder.append(point); + } + + let struct_array = builder.finish()?; + + Ok(RecordBatch::from(struct_array)) + } +} + +pub(super) fn record_decompressor<'a, R: Read + Seek + Send + Sync + 'a>( + items: &Vec<LazItem>, + input: R, +) -> Result<Box<dyn RecordDecompressor<R> + Send + Sync + 'a>, LasZipError> { + let first_item = items + .first() + .expect("There should be at least one LazItem to be able to create a RecordDecompressor"); + + let mut decompressor = match first_item.version() { + 1 | 2 => { + let decompressor = SequentialPointRecordDecompressor::new(input); + Box::new(decompressor) as Box<dyn RecordDecompressor<R> + Send + Sync> + } + 3 | 4 => { + let decompressor = LayeredPointRecordDecompressor::new(input); + Box::new(decompressor) as Box<dyn RecordDecompressor<R> + Send + Sync> + } + _ => { + return Err(LasZipError::UnsupportedLazItemVersion( + first_item.item_type(), + first_item.version(), + )); + } + }; + + decompressor.set_fields_from(items)?; + decompressor.set_selection(DecompressionSelection::all()); + + Ok(decompressor) +} + +#[cfg(test)] +mod tests { + use std::{io::Cursor, sync::Arc}; + + use datafusion_datasource::PartitionedFile; + use las::{point::Format, Builder, Point, Writer}; + use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; + + use crate::laz::reader::LazFileReaderFactory; + + #[allow(static_mut_refs)] + #[tokio::test] + async fn reader_basic_e2e() { + // create laz file + static mut LAZ: Vec<u8> = Vec::new(); Review Comment: Thanks for adding this! Does this really need to be static mut? For some other tests we use a temporary directory, which would also let you check a multi file read. https://github.com/apache/sedona-db/blob/90658675a6b849356599e2ae86477c0f8e60545c/rust/sedona/src/context.rs#L622 ########## rust/sedona-pointcloud/src/laz/metadata.rs: ########## @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + collections::HashMap, + error::Error, + io::{Cursor, Read}, + ops::Range, + sync::Arc, +}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::try_schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range<u64>, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc<Header>, + pub chunk_table: Vec<ChunkMeta>, + pub extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.chunk_table.capacity() * std::mem::size_of::<ChunkMeta>() + + self.extra_attributes.capacity() * std::mem::size_of::<ExtraAttribute>() + } + + fn extra_info(&self) -> HashMap<String, String> { + HashMap::new() + } +} + +/// Reader for laz file metadata in object storage. +pub struct LazMetadataReader<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + options: LazTableOptions, +} + +impl<'a> LazMetadataReader<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + file_metadata_cache: None, + options: Default::default(), + } + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// set table options + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } + + /// Fetch header + pub async fn fetch_header(&self) -> Result<Header, DataFusionError> { + fetch_header(self.store, self.object_meta) + .await + .map_err(DataFusionError::External) + } + + /// Fetch laz metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result<Arc<LazMetadata>, DataFusionError> { + let Self { + store, + object_meta, + file_metadata_cache, + options: _, + } = self; + + if let Some(las_file_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::<LazMetadata>() + .map(|laz_file_metadata| Arc::new(laz_file_metadata.to_owned())) + }) + { + return Ok(las_file_metadata); + } + + let header = self.fetch_header().await?; + let extra_attributes = extra_bytes_attributes(&header)?; + let chunk_table = chunk_table(*store, object_meta, &header).await?; + + let metadata = Arc::new(LazMetadata { + header: Arc::new(header), + chunk_table, + extra_attributes: Arc::new(extra_attributes), + }); + + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put(object_meta, metadata.clone()); + } + + Ok(metadata) + } + + /// Read and parse the schema of the laz file + pub async fn fetch_schema(&mut self) -> Result<Schema, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let schema = try_schema_from_header( + &metadata.header, + self.options.point_encoding, + self.options.extra_bytes, + )?; + + Ok(schema) + } + + /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and extracts + /// the statistics in the metadata + pub async fn fetch_statistics( + &self, + table_schema: &SchemaRef, + ) -> Result<Statistics, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let mut statistics = Statistics::new_unknown(table_schema) + .with_num_rows(Precision::Exact(metadata.header.number_of_points() as usize)) + .with_total_byte_size(Precision::Exact( + metadata + .chunk_table + .iter() + .map(|meta| meta.byte_range.end - meta.byte_range.start) + .sum::<u64>() as usize, + )); + + let bounds = metadata.header.bounds(); + for field in table_schema.fields() { + let cs = match field.name().as_str() { + "x" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.x)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.x)))) + .with_null_count(Precision::Exact(0)), + "y" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.y)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.y)))) + .with_null_count(Precision::Exact(0)), + "z" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.z)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.z)))) + .with_null_count(Precision::Exact(0)), + _ => ColumnStatistics::new_unknown(), + }; + + statistics = statistics.add_column_statistics(cs); + } + + Ok(statistics) + } +} + +pub(crate) async fn fetch_header( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, +) -> Result<Header, Box<dyn Error + Send + Sync>> { + let location = &object_meta.location; + + // Header + let bytes = store.get_range(location, 0..375).await?; + let reader = Cursor::new(bytes); + let raw_header = RawHeader::read_from(reader)?; + + let header_size = raw_header.header_size as u64; + let offset_to_point_data = raw_header.offset_to_point_data as u64; + let num_vlr = raw_header.number_of_variable_length_records; + let evlr = raw_header.evlr; + + let mut builder = Builder::new(raw_header)?; + + // VLRs + let bytes = store + .get_range(location, header_size..offset_to_point_data) + .await?; + let mut reader = Cursor::new(bytes); + + for _ in 0..num_vlr { + let vlr = RawVlr::read_from(&mut reader, false).map(Vlr::new)?; + builder.vlrs.push(vlr); + } + + reader.read_to_end(&mut builder.vlr_padding)?; + + // EVLRs + if let Some(evlr) = evlr { + let mut start = evlr.start_of_first_evlr; + + for _ in 0..evlr.number_of_evlrs { + let mut end = start + 60; + + let bytes = store.get_range(location, start..end).await?; + + end += u64::from_le_bytes(bytes[20..28].try_into()?); + + let bytes = store.get_range(location, start..end).await?; + let mut reader = Cursor::new(bytes); + let evlr = RawVlr::read_from(&mut reader, true).map(Vlr::new)?; + + builder.evlrs.push(evlr); + + start = end; + } + } + + Ok(builder.into_header()?) +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ExtraAttribute { + pub data_type: DataType, + pub no_data: Option<[u8; 8]>, + pub scale: Option<f64>, + pub offset: Option<f64>, +} + +pub(crate) fn extra_bytes_attributes( + header: &Header, +) -> Result<Vec<ExtraAttribute>, Box<dyn Error + Send + Sync>> { Review Comment: Maybe? ```suggestion ) -> Result<Vec<ExtraAttribute>, DataFusionError> { ``` ########## rust/sedona-pointcloud/src/laz/reader.rs: ########## @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + io::{Cursor, Read, Seek}, + sync::Arc, +}; + +use arrow_array::RecordBatch; +use datafusion_common::error::DataFusionError; +use datafusion_datasource::PartitionedFile; +use datafusion_execution::cache::cache_manager::FileMetadataCache; +use futures::{future::BoxFuture, FutureExt}; +use las::{raw::Point as RawPoint, Point}; +use laz::{ + record::{ + LayeredPointRecordDecompressor, RecordDecompressor, SequentialPointRecordDecompressor, + }, + DecompressionSelection, LasZipError, LazItem, +}; +use object_store::ObjectStore; + +use crate::laz::{ + builder::RowBuilder, + metadata::{ChunkMeta, LazMetadata, LazMetadataReader}, + options::LazTableOptions, +}; + +/// Laz file reader factory +#[derive(Debug)] +pub struct LazFileReaderFactory { + store: Arc<dyn ObjectStore>, + metadata_cache: Option<Arc<dyn FileMetadataCache>>, +} + +impl LazFileReaderFactory { + /// Create a new `LazFileReaderFactory`. + pub fn new( + store: Arc<dyn ObjectStore>, + metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + Self { + store, + metadata_cache, + } + } + + pub fn create_reader( + &self, + partitioned_file: PartitionedFile, + options: LazTableOptions, + ) -> Result<Box<LazFileReader>, DataFusionError> { + Ok(Box::new(LazFileReader { + partitioned_file, + store: self.store.clone(), + metadata_cache: self.metadata_cache.clone(), + options, + })) + } +} + +/// Reader for a laz file in object storage. +pub struct LazFileReader { + partitioned_file: PartitionedFile, + store: Arc<dyn ObjectStore>, + metadata_cache: Option<Arc<dyn FileMetadataCache>>, + pub options: LazTableOptions, +} + +impl LazFileReader { + pub fn get_metadata<'a>(&'a self) -> BoxFuture<'a, Result<Arc<LazMetadata>, DataFusionError>> { + let object_meta = self.partitioned_file.object_meta.clone(); + let metadata_cache = self.metadata_cache.clone(); + + async move { + LazMetadataReader::new(&self.store, &object_meta) + .with_file_metadata_cache(metadata_cache) + .with_options(self.options.clone()) + .fetch_metadata() + .await + } + .boxed() + } + + pub async fn get_batch(&self, chunk_meta: &ChunkMeta) -> Result<RecordBatch, DataFusionError> { + let metadata = self.get_metadata().await?; + let header = metadata.header.clone(); + + // fetch bytes + let location = &self.partitioned_file.object_meta.location; + let bytes = self + .store + .get_range(location, chunk_meta.byte_range.clone()) + .await?; + + // laz decompressor + let laz_vlr = header + .laz_vlr() + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let reader = Cursor::new(bytes); + let mut decompressor = record_decompressor(laz_vlr.items(), reader) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + // record batch builder + let num_points = chunk_meta.num_points as usize; + let mut builder = RowBuilder::new(num_points, header.clone()) + .with_point_encoding(self.options.point_encoding) + .with_extra_attributes(metadata.extra_attributes.clone(), self.options.extra_bytes); + + // transform + let format = header.point_format(); + let transforms = header.transforms(); + + let out = vec![0; format.len() as usize]; + let mut buffer = Cursor::new(out); + + for _ in 0..chunk_meta.num_points { + buffer.set_position(0); + decompressor.decompress_next(buffer.get_mut())?; + + let point = RawPoint::read_from(&mut buffer, format) + .map(|raw_point| Point::new(raw_point, transforms)) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + builder.append(point); + } + + let struct_array = builder.finish()?; + + Ok(RecordBatch::from(struct_array)) + } +} + +pub(super) fn record_decompressor<'a, R: Read + Seek + Send + Sync + 'a>( + items: &Vec<LazItem>, + input: R, +) -> Result<Box<dyn RecordDecompressor<R> + Send + Sync + 'a>, LasZipError> { + let first_item = items + .first() + .expect("There should be at least one LazItem to be able to create a RecordDecompressor"); + + let mut decompressor = match first_item.version() { + 1 | 2 => { + let decompressor = SequentialPointRecordDecompressor::new(input); + Box::new(decompressor) as Box<dyn RecordDecompressor<R> + Send + Sync> + } + 3 | 4 => { + let decompressor = LayeredPointRecordDecompressor::new(input); + Box::new(decompressor) as Box<dyn RecordDecompressor<R> + Send + Sync> + } + _ => { + return Err(LasZipError::UnsupportedLazItemVersion( + first_item.item_type(), + first_item.version(), + )); + } + }; + + decompressor.set_fields_from(items)?; + decompressor.set_selection(DecompressionSelection::all()); + + Ok(decompressor) +} + +#[cfg(test)] +mod tests { + use std::{io::Cursor, sync::Arc}; + + use datafusion_datasource::PartitionedFile; + use las::{point::Format, Builder, Point, Writer}; + use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; + + use crate::laz::reader::LazFileReaderFactory; + + #[allow(static_mut_refs)] + #[tokio::test] + async fn reader_basic_e2e() { + // create laz file + static mut LAZ: Vec<u8> = Vec::new(); + + let mut builder = Builder::from((1, 4)); + builder.point_format = Format::new(1).unwrap(); + builder.point_format.is_compressed = true; + let header = builder.into_header().unwrap(); + let write = unsafe { Cursor::new(&mut LAZ) }; + let mut writer = Writer::new(write, header).unwrap(); + + let point = Point { + gps_time: Some(Default::default()), + ..Default::default() + }; + writer.write_point(point).unwrap(); + + writer.close().unwrap(); + + // put to object store + let store = InMemory::new(); + let location = Path::parse("test.laz").unwrap(); + let payload = unsafe { PutPayload::from_static(&LAZ) }; + store.put(&location, payload).await.unwrap(); + + // read batch with `LazFileReader` + let laz_file_reader = LazFileReaderFactory::new(Arc::new(store), None) + .create_reader( + PartitionedFile::new(location, unsafe { LAZ.len() as u64 }), + Default::default(), + ) + .unwrap(); + let metadata = laz_file_reader.get_metadata().await.unwrap(); + + let batch = laz_file_reader + .get_batch(&metadata.chunk_table[0]) + .await + .unwrap(); + + assert_eq!(batch.num_rows(), 1); Review Comment: This level of granularity is OK if all the lower-level pieces are tested, although in this case they are mostly not. ########## rust/sedona-pointcloud/src/laz/opener.rs: ########## @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{error::DataFusionError, pruning::PrunableStatistics}; +use datafusion_datasource::{ + file_meta::FileMeta, + file_stream::{FileOpenFuture, FileOpener}, + PartitionedFile, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_pruning::PruningPredicate; +use futures::StreamExt; + +use crate::laz::{ + options::LazTableOptions, + reader::{LazFileReader, LazFileReaderFactory}, + schema::schema_from_header, +}; + +pub struct LazOpener { + /// Column indexes in `table_schema` needed by the query + pub projection: Arc<[usize]>, + /// Optional limit on the number of rows to read + pub limit: Option<usize>, + pub predicate: Option<Arc<dyn PhysicalExpr>>, + /// Factory for instantiating laz reader + pub laz_file_reader_factory: Arc<LazFileReaderFactory>, + /// Table options + pub options: LazTableOptions, +} + +impl FileOpener for LazOpener { + fn open( + &self, + _file_meta: FileMeta, + file: PartitionedFile, + ) -> Result<FileOpenFuture, DataFusionError> { + let projection = self.projection.clone(); + let limit = self.limit; + + let predicate = self.predicate.clone(); + + let laz_reader: Box<LazFileReader> = self + .laz_file_reader_factory + .create_reader(file.clone(), self.options.clone())?; + + Ok(Box::pin(async move { + let metadata = laz_reader.get_metadata().await?; + let schema = Arc::new(schema_from_header( + &metadata.header, + laz_reader.options.point_encoding, + laz_reader.options.extra_bytes, + )); + + let pruning_predicate = predicate.and_then(|physical_expr| { + PruningPredicate::try_new(physical_expr, schema.clone()).ok() + }); + + // file pruning + if let Some(pruning_predicate) = &pruning_predicate { + if let Some(statistics) = file.statistics { + let prunable_statistics = PrunableStatistics::new(vec![statistics], schema); + if let Ok(filter) = pruning_predicate.prune(&prunable_statistics) { + if !filter[0] { + return Ok(futures::stream::empty().boxed()); + } + } + } + } + + // map chunk table + let chunk_table: Vec<_> = metadata + .chunk_table + .iter() + .filter(|chunk_meta| { + file.range.as_ref().is_none_or(|range| { + let offset = chunk_meta.byte_range.start; + offset >= range.start as u64 && offset < range.end as u64 + }) + }) + .cloned() + .collect(); + + let mut row_count = 0; + + let stream = async_stream::try_stream! { + for chunk_meta in chunk_table.into_iter() { + // limit + if let Some(limit) = limit { + if row_count >= limit { + break; + } + } + + // fetch batch + let record_batch = laz_reader.get_batch(&chunk_meta).await?; + row_count += record_batch.num_rows(); + + // project + let record_batch = record_batch + .project(&projection) Review Comment: My guess is that most queries will select a small handful of columns and that decoding them all is sort of waste most of the time (but definitely a future concern as that requires reworking the approach here). ########## rust/sedona-pointcloud/src/laz/metadata.rs: ########## @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + collections::HashMap, + error::Error, + io::{Cursor, Read}, + ops::Range, + sync::Arc, +}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::try_schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range<u64>, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc<Header>, + pub chunk_table: Vec<ChunkMeta>, + pub extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.chunk_table.capacity() * std::mem::size_of::<ChunkMeta>() + + self.extra_attributes.capacity() * std::mem::size_of::<ExtraAttribute>() + } + + fn extra_info(&self) -> HashMap<String, String> { + HashMap::new() + } +} + +/// Reader for laz file metadata in object storage. +pub struct LazMetadataReader<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + options: LazTableOptions, +} + +impl<'a> LazMetadataReader<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + file_metadata_cache: None, + options: Default::default(), + } + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// set table options + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } + + /// Fetch header + pub async fn fetch_header(&self) -> Result<Header, DataFusionError> { + fetch_header(self.store, self.object_meta) + .await + .map_err(DataFusionError::External) + } + + /// Fetch laz metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result<Arc<LazMetadata>, DataFusionError> { + let Self { + store, + object_meta, + file_metadata_cache, + options: _, + } = self; + + if let Some(las_file_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::<LazMetadata>() + .map(|laz_file_metadata| Arc::new(laz_file_metadata.to_owned())) + }) + { + return Ok(las_file_metadata); + } + + let header = self.fetch_header().await?; + let extra_attributes = extra_bytes_attributes(&header)?; + let chunk_table = chunk_table(*store, object_meta, &header).await?; + + let metadata = Arc::new(LazMetadata { + header: Arc::new(header), + chunk_table, + extra_attributes: Arc::new(extra_attributes), + }); + + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put(object_meta, metadata.clone()); + } + + Ok(metadata) + } + + /// Read and parse the schema of the laz file + pub async fn fetch_schema(&mut self) -> Result<Schema, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let schema = try_schema_from_header( + &metadata.header, + self.options.point_encoding, + self.options.extra_bytes, + )?; + + Ok(schema) + } + + /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and extracts + /// the statistics in the metadata + pub async fn fetch_statistics( + &self, + table_schema: &SchemaRef, + ) -> Result<Statistics, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let mut statistics = Statistics::new_unknown(table_schema) + .with_num_rows(Precision::Exact(metadata.header.number_of_points() as usize)) + .with_total_byte_size(Precision::Exact( + metadata + .chunk_table + .iter() + .map(|meta| meta.byte_range.end - meta.byte_range.start) + .sum::<u64>() as usize, + )); + + let bounds = metadata.header.bounds(); + for field in table_schema.fields() { + let cs = match field.name().as_str() { + "x" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.x)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.x)))) + .with_null_count(Precision::Exact(0)), + "y" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.y)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.y)))) + .with_null_count(Precision::Exact(0)), + "z" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.z)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.z)))) + .with_null_count(Precision::Exact(0)), + _ => ColumnStatistics::new_unknown(), + }; + + statistics = statistics.add_column_statistics(cs); + } + + Ok(statistics) + } +} + +pub(crate) async fn fetch_header( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, +) -> Result<Header, Box<dyn Error + Send + Sync>> { + let location = &object_meta.location; + + // Header + let bytes = store.get_range(location, 0..375).await?; + let reader = Cursor::new(bytes); + let raw_header = RawHeader::read_from(reader)?; + + let header_size = raw_header.header_size as u64; + let offset_to_point_data = raw_header.offset_to_point_data as u64; + let num_vlr = raw_header.number_of_variable_length_records; + let evlr = raw_header.evlr; + + let mut builder = Builder::new(raw_header)?; + + // VLRs + let bytes = store + .get_range(location, header_size..offset_to_point_data) + .await?; + let mut reader = Cursor::new(bytes); + + for _ in 0..num_vlr { + let vlr = RawVlr::read_from(&mut reader, false).map(Vlr::new)?; + builder.vlrs.push(vlr); + } + + reader.read_to_end(&mut builder.vlr_padding)?; + + // EVLRs + if let Some(evlr) = evlr { + let mut start = evlr.start_of_first_evlr; + + for _ in 0..evlr.number_of_evlrs { + let mut end = start + 60; + + let bytes = store.get_range(location, start..end).await?; + + end += u64::from_le_bytes(bytes[20..28].try_into()?); + + let bytes = store.get_range(location, start..end).await?; + let mut reader = Cursor::new(bytes); + let evlr = RawVlr::read_from(&mut reader, true).map(Vlr::new)?; + + builder.evlrs.push(evlr); + + start = end; + } + } + + Ok(builder.into_header()?) +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ExtraAttribute { + pub data_type: DataType, + pub no_data: Option<[u8; 8]>, + pub scale: Option<f64>, + pub offset: Option<f64>, +} + +pub(crate) fn extra_bytes_attributes( + header: &Header, +) -> Result<Vec<ExtraAttribute>, Box<dyn Error + Send + Sync>> { + let mut attributes = Vec::new(); + + for vlr in header.all_vlrs() { + if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) { + continue; + } + + for bytes in vlr.data.chunks(192) { + // data type + let data_type = match bytes[2] { + 0 => DataType::FixedSizeBinary(bytes[3] as i32), + 1 => DataType::UInt8, + 2 => DataType::Int8, + 3 => DataType::UInt16, + 4 => DataType::Int16, + 5 => DataType::UInt32, + 6 => DataType::Int32, + 7 => DataType::UInt64, + 8 => DataType::Int64, + 9 => DataType::Float32, + 10 => DataType::Float64, + 11..=30 => return Err("deprecated extra bytes data type".into()), + 31..=255 => return Err("reserved extra butes data type".into()), + }; + + // no data + let no_data = if bytes[2] != 0 && bytes[3] & 1 == 1 { + Some(bytes[40..48].try_into().unwrap()) + } else { + None + }; + + // scale + let scale = if bytes[2] != 0 && bytes[3] >> 3 & 1 == 1 { + Some(f64::from_le_bytes(bytes[112..120].try_into().unwrap())) + } else { + None + }; + + // offset + let offset = if bytes[2] != 0 && bytes[3] >> 4 & 1 == 1 { + Some(f64::from_le_bytes(bytes[136..144].try_into().unwrap())) + } else { + None + }; + + let attribute = ExtraAttribute { + data_type, + no_data, + scale, + offset, + }; + + attributes.push(attribute); + } + } + + Ok(attributes) +} + +pub(crate) async fn chunk_table( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, + header: &Header, +) -> Result<Vec<ChunkMeta>, Box<dyn Error + Send + Sync>> { + let num_points = header.number_of_points(); + let mut point_offset = 0; + + let vlr_len = header.vlrs().iter().map(|v| v.len(false)).sum::<usize>(); + let header_size = header.version().header_size() as usize + header.padding().len(); + let mut byte_offset = (header_size + vlr_len + header.vlr_padding().len()) as u64; + + let laz_vlr = header.laz_vlr()?; + + let ranges = [ + byte_offset..byte_offset + 8, + object_meta.size - 8..object_meta.size, + ]; + let bytes = store.get_ranges(&object_meta.location, &ranges).await?; + let mut table_offset = None; + + let table_offset1 = i64::from_le_bytes(bytes[0].to_vec().try_into().unwrap()) as u64; + let table_offset2 = i64::from_le_bytes(bytes[1].to_vec().try_into().unwrap()) as u64; + + if table_offset1 > byte_offset { + table_offset = Some(table_offset1); + } else if table_offset2 > byte_offset { + table_offset = Some(table_offset2); + } + + let Some(table_offset) = table_offset else { + return Err("LAZ files without chunk table not supported (yet)".into()); + }; + + if table_offset > object_meta.size { + return Err("LAZ file chunk table position is missing/bad".into()); + } + + let bytes = store + .get_range(&object_meta.location, table_offset..table_offset + 8) + .await?; + + let num_chunks = u32::from_le_bytes(bytes[4..].to_vec().try_into().unwrap()) as u64; + let range = table_offset..table_offset + 8 + 8 * num_chunks; + let bytes = store.get_range(&object_meta.location, range).await?; + + let mut reader = Cursor::new(bytes); + let variable_size = laz_vlr.uses_variable_size_chunks(); + let chunk_table = ChunkTable::read(&mut reader, variable_size)?; + assert_eq!(chunk_table.len(), num_chunks as usize); + + let mut chunks = Vec::with_capacity(num_chunks as usize); + let chunk_size = laz_vlr.chunk_size() as u64; + byte_offset += 8; + + for chunk_table_entry in &chunk_table { + let point_count = if variable_size { + chunk_table_entry.point_count + } else { + chunk_size.min(num_points - point_offset) + }; + + let chunk = ChunkMeta { + num_points: point_count, + point_offset, + byte_range: byte_offset..byte_offset + chunk_table_entry.byte_count, + }; + chunks.push(chunk); + point_offset += point_count; + byte_offset += chunk_table_entry.byte_count; + } + + Ok(chunks) +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use las::{point::Format, Builder, Reader, Writer}; + use object_store::{memory::InMemory, path::Path, ObjectStore, PutPayload}; + + use crate::laz::metadata::LazMetadataReader; + + #[allow(static_mut_refs)] + #[tokio::test] + async fn header_basic_e2e() { + // create laz file + static mut LAZ: Vec<u8> = Vec::new(); + + let mut builder = Builder::from((1, 4)); + builder.point_format = Format::new(1).unwrap(); + builder.point_format.is_compressed = true; + let header = builder.into_header().unwrap(); + let write = unsafe { Cursor::new(&mut LAZ) }; + let mut writer = Writer::new(write, header).unwrap(); + + writer.close().unwrap(); + + // put to object store + let store = InMemory::new(); + let location = Path::parse("test.laz").unwrap(); + let payload = unsafe { PutPayload::from_static(&LAZ) }; + store.put(&location, payload).await.unwrap(); + + // read with `LazMetadataReader` + let object_meta = store.head(&location).await.unwrap(); + let metadata_reader = LazMetadataReader::new(&store, &object_meta); + + // read with las `Reader` + let read = unsafe { Cursor::new(&mut LAZ) }; + let reader = Reader::new(read).unwrap(); + + assert_eq!( + reader.header(), + &metadata_reader.fetch_header().await.unwrap() + ); + } Review Comment: With a dummy header (perhaps you can inline the bytes of a known test file, or perhaps you can create one using the Builder that exercises the code path for the full matrix of data types by offset/scale/nodata, or create a function that accepts `DataType`, `offset`, `scale`, `nodata`, and outputs a file with exactly one extra attribute that we can use in a parameterized test to check that it roundtrips. ########## rust/sedona-pointcloud/src/laz/metadata.rs: ########## @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + collections::HashMap, + error::Error, + io::{Cursor, Read}, + ops::Range, + sync::Arc, +}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::try_schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range<u64>, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc<Header>, + pub chunk_table: Vec<ChunkMeta>, + pub extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + self.chunk_table.capacity() * std::mem::size_of::<ChunkMeta>() + + self.extra_attributes.capacity() * std::mem::size_of::<ExtraAttribute>() + } + + fn extra_info(&self) -> HashMap<String, String> { + HashMap::new() + } +} + +/// Reader for laz file metadata in object storage. +pub struct LazMetadataReader<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + options: LazTableOptions, +} + +impl<'a> LazMetadataReader<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + file_metadata_cache: None, + options: Default::default(), + } + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// set table options + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } + + /// Fetch header + pub async fn fetch_header(&self) -> Result<Header, DataFusionError> { + fetch_header(self.store, self.object_meta) + .await + .map_err(DataFusionError::External) + } + + /// Fetch laz metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result<Arc<LazMetadata>, DataFusionError> { + let Self { + store, + object_meta, + file_metadata_cache, + options: _, + } = self; + + if let Some(las_file_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::<LazMetadata>() + .map(|laz_file_metadata| Arc::new(laz_file_metadata.to_owned())) + }) + { + return Ok(las_file_metadata); + } + + let header = self.fetch_header().await?; + let extra_attributes = extra_bytes_attributes(&header)?; + let chunk_table = chunk_table(*store, object_meta, &header).await?; + + let metadata = Arc::new(LazMetadata { + header: Arc::new(header), + chunk_table, + extra_attributes: Arc::new(extra_attributes), + }); + + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put(object_meta, metadata.clone()); + } + + Ok(metadata) + } + + /// Read and parse the schema of the laz file + pub async fn fetch_schema(&mut self) -> Result<Schema, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let schema = try_schema_from_header( + &metadata.header, + self.options.point_encoding, + self.options.extra_bytes, + )?; + + Ok(schema) + } + + /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and extracts + /// the statistics in the metadata + pub async fn fetch_statistics( + &self, + table_schema: &SchemaRef, + ) -> Result<Statistics, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let mut statistics = Statistics::new_unknown(table_schema) + .with_num_rows(Precision::Exact(metadata.header.number_of_points() as usize)) + .with_total_byte_size(Precision::Exact( + metadata + .chunk_table + .iter() + .map(|meta| meta.byte_range.end - meta.byte_range.start) + .sum::<u64>() as usize, + )); + + let bounds = metadata.header.bounds(); + for field in table_schema.fields() { + let cs = match field.name().as_str() { + "x" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.x)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.x)))) + .with_null_count(Precision::Exact(0)), + "y" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.y)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.y)))) + .with_null_count(Precision::Exact(0)), + "z" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.z)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.z)))) + .with_null_count(Precision::Exact(0)), + _ => ColumnStatistics::new_unknown(), + }; + + statistics = statistics.add_column_statistics(cs); + } + + Ok(statistics) + } +} + +pub(crate) async fn fetch_header( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, +) -> Result<Header, Box<dyn Error + Send + Sync>> { + let location = &object_meta.location; + + // Header + let bytes = store.get_range(location, 0..375).await?; + let reader = Cursor::new(bytes); + let raw_header = RawHeader::read_from(reader)?; + + let header_size = raw_header.header_size as u64; + let offset_to_point_data = raw_header.offset_to_point_data as u64; + let num_vlr = raw_header.number_of_variable_length_records; + let evlr = raw_header.evlr; + + let mut builder = Builder::new(raw_header)?; + + // VLRs + let bytes = store + .get_range(location, header_size..offset_to_point_data) + .await?; + let mut reader = Cursor::new(bytes); + + for _ in 0..num_vlr { + let vlr = RawVlr::read_from(&mut reader, false).map(Vlr::new)?; + builder.vlrs.push(vlr); + } + + reader.read_to_end(&mut builder.vlr_padding)?; + + // EVLRs + if let Some(evlr) = evlr { + let mut start = evlr.start_of_first_evlr; + + for _ in 0..evlr.number_of_evlrs { + let mut end = start + 60; + + let bytes = store.get_range(location, start..end).await?; + + end += u64::from_le_bytes(bytes[20..28].try_into()?); + + let bytes = store.get_range(location, start..end).await?; + let mut reader = Cursor::new(bytes); + let evlr = RawVlr::read_from(&mut reader, true).map(Vlr::new)?; + + builder.evlrs.push(evlr); + + start = end; + } + } + + Ok(builder.into_header()?) +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ExtraAttribute { + pub data_type: DataType, + pub no_data: Option<[u8; 8]>, + pub scale: Option<f64>, + pub offset: Option<f64>, +} + +pub(crate) fn extra_bytes_attributes( + header: &Header, +) -> Result<Vec<ExtraAttribute>, Box<dyn Error + Send + Sync>> { + let mut attributes = Vec::new(); + + for vlr in header.all_vlrs() { + if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) { + continue; + } + + for bytes in vlr.data.chunks(192) { + // data type + let data_type = match bytes[2] { + 0 => DataType::FixedSizeBinary(bytes[3] as i32), + 1 => DataType::UInt8, + 2 => DataType::Int8, + 3 => DataType::UInt16, + 4 => DataType::Int16, + 5 => DataType::UInt32, + 6 => DataType::Int32, + 7 => DataType::UInt64, + 8 => DataType::Int64, + 9 => DataType::Float32, + 10 => DataType::Float64, + 11..=30 => return Err("deprecated extra bytes data type".into()), + 31..=255 => return Err("reserved extra butes data type".into()), + }; + + // no data + let no_data = if bytes[2] != 0 && bytes[3] & 1 == 1 { + Some(bytes[40..48].try_into().unwrap()) + } else { + None + }; + + // scale + let scale = if bytes[2] != 0 && bytes[3] >> 3 & 1 == 1 { + Some(f64::from_le_bytes(bytes[112..120].try_into().unwrap())) + } else { + None + }; + + // offset + let offset = if bytes[2] != 0 && bytes[3] >> 4 & 1 == 1 { + Some(f64::from_le_bytes(bytes[136..144].try_into().unwrap())) + } else { + None + }; + + let attribute = ExtraAttribute { + data_type, + no_data, + scale, + offset, + }; + + attributes.push(attribute); + } + } + + Ok(attributes) +} + +pub(crate) async fn chunk_table( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, + header: &Header, +) -> Result<Vec<ChunkMeta>, Box<dyn Error + Send + Sync>> { Review Comment: ```suggestion ) -> Result<Vec<ChunkMeta>, DataFusionError> { ``` Probably use `plan_err!()` for these? ########## rust/sedona-pointcloud/src/laz/builder.rs: ########## @@ -0,0 +1,571 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::Debug, sync::Arc}; + +use arrow_array::{ + builder::{ + ArrayBuilder, BinaryBuilder, BooleanBuilder, FixedSizeBinaryBuilder, Float32Builder, + Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, UInt16Builder, + UInt32Builder, UInt64Builder, UInt8Builder, + }, + Array, ArrayRef, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, StructArray, + UInt16Array, UInt8Array, +}; +use arrow_buffer::ScalarBuffer; +use arrow_schema::{ArrowError, DataType}; +use geoarrow_array::{ + array::{CoordBuffer, PointArray, SeparatedCoordBuffer}, + GeoArrowArray, +}; +use geoarrow_schema::Dimension; +use las::{Header, Point}; + +use crate::laz::{ + metadata::ExtraAttribute, + options::{LasExtraBytes, LasPointEncoding}, + schema::try_schema_from_header, +}; + +#[derive(Debug)] +pub struct RowBuilder { + x: Float64Builder, + y: Float64Builder, + z: Float64Builder, + intensity: UInt16Builder, + return_number: UInt8Builder, + number_of_returns: UInt8Builder, + is_synthetic: BooleanBuilder, + is_key_point: BooleanBuilder, + is_withheld: BooleanBuilder, + is_overlap: BooleanBuilder, + scanner_channel: UInt8Builder, + scan_direction: UInt8Builder, + is_edge_of_flight_line: BooleanBuilder, + classification: UInt8Builder, + user_data: UInt8Builder, + scan_angle: Float32Builder, + point_source_id: UInt16Builder, + gps_time: Float64Builder, + red: UInt16Builder, + green: UInt16Builder, + blue: UInt16Builder, + nir: UInt16Builder, + extra: FixedSizeBinaryBuilder, + header: Arc<Header>, + point_encoding: LasPointEncoding, + extra_bytes: LasExtraBytes, + extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl RowBuilder { + pub fn new(capacity: usize, header: Arc<Header>) -> Self { + Self { + x: Float64Array::builder(capacity), + y: Float64Array::builder(capacity), + z: Float64Array::builder(capacity), + intensity: UInt16Array::builder(capacity), + return_number: UInt8Array::builder(capacity), + number_of_returns: UInt8Array::builder(capacity), + is_synthetic: BooleanArray::builder(capacity), + is_key_point: BooleanArray::builder(capacity), + is_withheld: BooleanArray::builder(capacity), + is_overlap: BooleanArray::builder(capacity), + scanner_channel: UInt8Array::builder(capacity), + scan_direction: UInt8Array::builder(capacity), + is_edge_of_flight_line: BooleanArray::builder(capacity), + classification: UInt8Array::builder(capacity), + user_data: UInt8Array::builder(capacity), + scan_angle: Float32Array::builder(capacity), + point_source_id: UInt16Array::builder(capacity), + gps_time: Float64Array::builder(capacity), + red: UInt16Array::builder(capacity), + green: UInt16Array::builder(capacity), + blue: UInt16Array::builder(capacity), + nir: UInt16Array::builder(capacity), + extra: FixedSizeBinaryBuilder::with_capacity( + capacity, + header.point_format().extra_bytes as i32, + ), + + header, + point_encoding: Default::default(), + extra_bytes: Default::default(), + extra_attributes: Arc::new(Vec::new()), + } + } + + pub fn with_point_encoding(mut self, point_encoding: LasPointEncoding) -> Self { + self.point_encoding = point_encoding; + self + } + + pub fn with_extra_attributes( + mut self, + attributes: Arc<Vec<ExtraAttribute>>, + extra_bytes: LasExtraBytes, + ) -> Self { + self.extra_attributes = attributes; + self.extra_bytes = extra_bytes; + self + } + + pub fn append(&mut self, p: Point) { + self.x.append_value(p.x); + self.y.append_value(p.y); + self.z.append_value(p.z); + self.intensity.append_option(Some(p.intensity)); + self.return_number.append_value(p.return_number); + self.number_of_returns.append_value(p.number_of_returns); + self.is_synthetic.append_value(p.is_synthetic); + self.is_key_point.append_value(p.is_key_point); + self.is_withheld.append_value(p.is_withheld); + self.is_overlap.append_value(p.is_overlap); + self.scanner_channel.append_value(p.scanner_channel); + self.scan_direction.append_value(p.scan_direction as u8); + self.is_edge_of_flight_line + .append_value(p.is_edge_of_flight_line); + self.classification.append_value(u8::from(p.classification)); + self.user_data.append_value(p.user_data); + self.scan_angle.append_value(p.scan_angle); + self.point_source_id.append_value(p.point_source_id); + if self.header.point_format().has_gps_time { + self.gps_time.append_value(p.gps_time.unwrap()); + } + if self.header.point_format().has_color { + let color = p.color.unwrap(); + self.red.append_value(color.red); + self.green.append_value(color.green); + self.blue.append_value(color.blue); + } + if self.header.point_format().has_nir { + self.nir.append_value(p.nir.unwrap()); + } + if self.header.point_format().extra_bytes > 0 { + self.extra.append_value(p.extra_bytes).unwrap(); + } + } + + /// Note: returns StructArray to allow nesting within another array if desired + pub fn finish(&mut self) -> Result<StructArray, ArrowError> { + let schema = try_schema_from_header(&self.header, self.point_encoding, self.extra_bytes)?; + + let mut columns = match self.point_encoding { + LasPointEncoding::Plain => vec![ + Arc::new(self.x.finish()) as ArrayRef, + Arc::new(self.y.finish()) as ArrayRef, + Arc::new(self.z.finish()) as ArrayRef, + ], + LasPointEncoding::Wkb => { + const POINT_SIZE: usize = 29; + + let n: usize = self.x.len(); + + let mut builder = BinaryBuilder::with_capacity(n, n * POINT_SIZE); + + let x = self.x.finish(); + let y = self.y.finish(); + let z = self.z.finish(); + + let mut wkb_bytes = [0_u8; POINT_SIZE]; + wkb_bytes[0] = 0x01; // Little-endian + wkb_bytes[1..5].copy_from_slice(&[0xE9, 0x03, 0x00, 0x00]); // Point Z type (1001) + + for i in 0..n { + let x = unsafe { x.value_unchecked(i) }; + let y = unsafe { y.value_unchecked(i) }; + let z = unsafe { z.value_unchecked(i) }; + + wkb_bytes[5..13].copy_from_slice(x.to_le_bytes().as_slice()); + wkb_bytes[13..21].copy_from_slice(y.to_le_bytes().as_slice()); + wkb_bytes[21..29].copy_from_slice(z.to_le_bytes().as_slice()); + + builder.append_value(wkb_bytes); + } + + vec![Arc::new(builder.finish()) as ArrayRef] + } + LasPointEncoding::Native => { + let buffers = [ + self.x.finish().into_parts().1, + self.y.finish().into_parts().1, + self.z.finish().into_parts().1, + ScalarBuffer::from(vec![]), + ]; + let coords = CoordBuffer::Separated(SeparatedCoordBuffer::from_array( + buffers, + Dimension::XYZ, + )?); + let points = PointArray::new(coords, None, Default::default()); + vec![points.to_array_ref()] + } + }; + + columns.extend([ + Arc::new(self.intensity.finish()) as ArrayRef, + Arc::new(self.return_number.finish()) as ArrayRef, + Arc::new(self.number_of_returns.finish()) as ArrayRef, + Arc::new(self.is_synthetic.finish()) as ArrayRef, + Arc::new(self.is_key_point.finish()) as ArrayRef, + Arc::new(self.is_withheld.finish()) as ArrayRef, + Arc::new(self.is_overlap.finish()) as ArrayRef, + Arc::new(self.scanner_channel.finish()) as ArrayRef, + Arc::new(self.scan_direction.finish()) as ArrayRef, + Arc::new(self.is_edge_of_flight_line.finish()) as ArrayRef, + Arc::new(self.classification.finish()) as ArrayRef, + Arc::new(self.user_data.finish()) as ArrayRef, + Arc::new(self.scan_angle.finish()) as ArrayRef, + Arc::new(self.point_source_id.finish()) as ArrayRef, + ]); + if self.header.point_format().has_gps_time { + columns.push(Arc::new(self.gps_time.finish()) as ArrayRef); + } + if self.header.point_format().has_color { + columns.extend([ + Arc::new(self.red.finish()) as ArrayRef, + Arc::new(self.green.finish()) as ArrayRef, + Arc::new(self.blue.finish()) as ArrayRef, + ]); + } + if self.header.point_format().has_nir { + columns.push(Arc::new(self.nir.finish()) as ArrayRef); + } + + // extra bytes + let num_extra_bytes = self.header.point_format().extra_bytes as usize; + if num_extra_bytes > 0 { + match self.extra_bytes { + LasExtraBytes::Typed => { + let extra = self.extra.finish(); + + let mut pos = 0; + + for attribute in self.extra_attributes.iter() { + pos += build_attribute(attribute, pos, &extra, &mut columns)?; + } + } + LasExtraBytes::Blob => columns.push(Arc::new(self.extra.finish())), + LasExtraBytes::Ignore => (), + } + } + + Ok(StructArray::new(schema.fields.to_owned(), columns, None)) + } +} + +fn build_attribute( + attribute: &ExtraAttribute, + pos: usize, + extra: &FixedSizeBinaryArray, + columns: &mut Vec<ArrayRef>, +) -> Result<usize, ArrowError> { + let scale = attribute.scale.unwrap_or(1.0); + let offset = attribute.offset.unwrap_or(0.0); + + let width = if let DataType::FixedSizeBinary(width) = attribute.data_type { + width as usize + } else { + attribute.data_type.primitive_width().unwrap() + }; + + let iter = extra.iter().map(|b| &b.unwrap()[pos..pos + width]); + + match &attribute.data_type { + DataType::FixedSizeBinary(_) => { + let data = FixedSizeBinaryArray::try_from_iter(iter)?; + columns.push(Arc::new(data) as ArrayRef) + } + DataType::Int8 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = d[0] as i8; + if let Some(no_data) = no_data { + if no_data == v as i64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int8Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Int16 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = i16::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as i64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int16Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Int32 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = i32::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as i64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int32Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Int64 => { + let no_data = attribute.no_data.map(i64::from_le_bytes); + + let iter = iter.map(|d| { + let v = i64::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Int64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt8 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = d[0]; + if let Some(no_data) = no_data { + if no_data == v as u64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt8Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt16 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = u16::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as u64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt16Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt32 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = u32::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as u64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt32Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::UInt64 => { + let no_data = attribute.no_data.map(u64::from_le_bytes); + + let iter = iter.map(|d| { + let v = u64::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = UInt64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Float32 => { + let no_data = attribute.no_data.map(f64::from_le_bytes); + + let iter = iter.map(|d| { + let v = f32::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v as f64 { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v as f64 * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Float32Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + DataType::Float64 => { + let no_data = attribute.no_data.map(f64::from_le_bytes); + + let iter = iter.map(|d| { + let v = f64::from_le_bytes(d.try_into().unwrap()); + if let Some(no_data) = no_data { + if no_data == v { + return None; + } + } + Some(v) + }); + + if attribute.scale.is_some() || attribute.offset.is_some() { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v.map(|v| v * scale + offset)); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } else { + let mut builder = Float64Builder::with_capacity(extra.len()); + for v in iter { + builder.append_option(v); + } + columns.push(Arc::new(builder.finish()) as ArrayRef) + } + } + + dt => { + return Err(ArrowError::ExternalError( + format!("Unsupported data type for extra bytes: `{dt}`").into(), + )) + } + } + + Ok(width) +} Review Comment: Some suggestions for tests that should live in this file: - Create a builder with zero rows, check all of the output options to ensure they give you the schema (or at least number of columns) you are expecting - The building of attributes (tests should cover each branch here). One of the nice parts about refactoring this to use GATs if you can would be that there are fewer branches to test (although probably easier to use an `rstest` parameterized test like `#[values(DataType::Int8, DataType::Int16, ...)]`). I don't think you'll need a test file for any of that (but you might need to create a mock header and mock attributes with/without offset, scale, and nodata). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
