paleolimbot commented on code in PR #471: URL: https://github.com/apache/sedona-db/pull/471#discussion_r2665398137
########## examples/sedonadb-rust-pointcloud/src/main.rs: ########## @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Because a number of methods only return Err() for not implemented, +// the compiler doesn't know how to guess which impl RecordBatchReader +// will be returned. When we implement the methods, we can remove this. + +use datafusion::common::Result; +use sedona::context::{SedonaContext, SedonaDataFrame}; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SedonaContext::new_local_interactive().await?; + let url = "https://basisdata.nl/hwh-ahn/ahn4/01_LAZ/C_69AZ1.LAZ"; + + // let _ = ctx.sql(&format!("SET laz.point_encoding = 'plain'")).await.unwrap(); + // let df = ctx.sql(&format!("SELECT x, y, z FROM \"{url}\"")).await.unwrap(); Review Comment: ```suggestion ``` Can these be removed? ########## examples/sedonadb-rust-pointcloud/src/main.rs: ########## @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// Because a number of methods only return Err() for not implemented, +// the compiler doesn't know how to guess which impl RecordBatchReader +// will be returned. When we implement the methods, we can remove this. + +use datafusion::common::Result; +use sedona::context::{SedonaContext, SedonaDataFrame}; + +#[tokio::main] +async fn main() -> Result<()> { + let ctx = SedonaContext::new_local_interactive().await?; + let url = "https://basisdata.nl/hwh-ahn/ahn4/01_LAZ/C_69AZ1.LAZ"; Review Comment: As a follow-up we can add a few .laz/.las files to https://github.com/apache/sedona-testing (which we have as a submodule here for test data). ########## rust/sedona-pointcloud/src/laz/opener.rs: ########## @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{error::DataFusionError, pruning::PrunableStatistics}; +use datafusion_datasource::{ + file_meta::FileMeta, + file_stream::{FileOpenFuture, FileOpener}, + PartitionedFile, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_pruning::PruningPredicate; +use futures::StreamExt; + +use crate::laz::{ + options::LazTableOptions, + reader::{LazFileReader, LazFileReaderFactory}, + schema::schema_from_header, +}; + +pub struct LazOpener { + /// Column indexes in `table_schema` needed by the query + pub projection: Arc<[usize]>, + /// Optional limit on the number of rows to read + pub limit: Option<usize>, + pub predicate: Option<Arc<dyn PhysicalExpr>>, + /// Factory for instantiating laz reader + pub laz_file_reader_factory: Arc<LazFileReaderFactory>, + /// Table options + pub options: LazTableOptions, +} + +impl FileOpener for LazOpener { + fn open( + &self, + _file_meta: FileMeta, + file: PartitionedFile, + ) -> Result<FileOpenFuture, DataFusionError> { + let projection = self.projection.clone(); + let limit = self.limit; + + let predicate = self.predicate.clone(); + + let laz_reader: Box<LazFileReader> = self + .laz_file_reader_factory + .create_reader(file.clone(), self.options.clone())?; + + Ok(Box::pin(async move { + let metadata = laz_reader.get_metadata().await?; + let schema = Arc::new(schema_from_header( + &metadata.header, + laz_reader.options.point_encoding, + laz_reader.options.extra_bytes, + )); + + let pruning_predicate = predicate.and_then(|physical_expr| { + PruningPredicate::try_new(physical_expr, schema.clone()).ok() + }); + + // file pruning + if let Some(pruning_predicate) = &pruning_predicate { + if let Some(statistics) = file.statistics { + let prunable_statistics = PrunableStatistics::new(vec![statistics], schema); + if let Ok(filter) = pruning_predicate.prune(&prunable_statistics) { + if !filter[0] { + return Ok(futures::stream::empty().boxed()); + } + } + } + } + + // map chunk table + let chunk_table: Vec<_> = metadata + .chunk_table + .iter() + .filter(|chunk_meta| { + file.range.as_ref().is_none_or(|range| { + let offset = chunk_meta.byte_range.start; + offset >= range.start as u64 && offset < range.end as u64 + }) + }) + .cloned() + .collect(); + + let mut row_count = 0; + + let stream = async_stream::try_stream! { + for chunk_meta in chunk_table.into_iter() { + // limit + if let Some(limit) = limit { + if row_count >= limit { + break; + } + } + + // fetch batch + let record_batch = laz_reader.get_batch(&chunk_meta).await?; + row_count += record_batch.num_rows(); + + // project + let record_batch = record_batch + .project(&projection) Review Comment: Follow-up territory, but this could probably save some decoding effort if it pushed the projection into the builder. ########## rust/sedona-pointcloud/src/laz/metadata.rs: ########## @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, error::Error, io::Cursor, ops::Range, sync::Arc}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range<u64>, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc<Header>, + pub chunk_table: Vec<ChunkMeta>, + pub extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + // TODO: proper size + std::mem::size_of_val(self) + } + + fn extra_info(&self) -> HashMap<String, String> { + HashMap::new() + } +} + +/// Reader for laz file metadata in object storage. +pub struct LazMetadataReader<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + options: LazTableOptions, +} + +impl<'a> LazMetadataReader<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + file_metadata_cache: None, + options: Default::default(), + } + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// set table options + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } + + /// Fetch header + pub async fn fetch_header(&self) -> Result<Header, DataFusionError> { + fetch_header(self.store, self.object_meta, false) + .await + .map_err(DataFusionError::External) + } + + /// Fetch laz metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result<Arc<LazMetadata>, DataFusionError> { + let Self { + store, + object_meta, + file_metadata_cache, + options: _, + } = self; + + if let Some(las_file_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::<LazMetadata>() + .map(|laz_file_metadata| Arc::new(laz_file_metadata.to_owned())) + }) + { + return Ok(las_file_metadata); + } + + let header = self.fetch_header().await?; + let extra_attributes = extra_bytes_attributes(&header); + let chunk_table = chunk_table(*store, object_meta, &header).await.unwrap(); + + let metadata = Arc::new(LazMetadata { + header: Arc::new(header), + chunk_table, + extra_attributes: Arc::new(extra_attributes), + }); + + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put(object_meta, metadata.clone()); + } + + Ok(metadata) + } + + /// Read and parse the schema of the laz file + pub async fn fetch_schema(&mut self) -> Result<Schema, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let schema = schema_from_header( + &metadata.header, + self.options.point_encoding, + self.options.extra_bytes, + ); + + Ok(schema) + } + + /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and extracts + /// the statistics in the metadata + pub async fn fetch_statistics( + &self, + table_schema: &SchemaRef, + ) -> Result<Statistics, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let mut statistics = Statistics::new_unknown(table_schema) + .with_num_rows(Precision::Exact(metadata.header.number_of_points() as usize)) + .with_total_byte_size(Precision::Exact( + metadata + .chunk_table + .iter() + .map(|meta| meta.byte_range.end - meta.byte_range.start) + .sum::<u64>() as usize, + )); + + let bounds = metadata.header.bounds(); + for field in table_schema.fields() { + let cs = match field.name().as_str() { Review Comment: For `"geometry"` we haven't currently fudged `ColumnStatistics` to store a bbox, but in an early version of the GeoParquet reader I serialized `sedona_expr::GeoStatistics` as JSON as the "sum". Perhaps it's just a separate method here to return `GeoStatistics`, which is what you'll need to prune using `SpatialFilter`. ########## rust/sedona-pointcloud/src/laz/opener.rs: ########## @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{error::DataFusionError, pruning::PrunableStatistics}; +use datafusion_datasource::{ + file_meta::FileMeta, + file_stream::{FileOpenFuture, FileOpener}, + PartitionedFile, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_pruning::PruningPredicate; +use futures::StreamExt; + +use crate::laz::{ + options::LazTableOptions, + reader::{LazFileReader, LazFileReaderFactory}, + schema::schema_from_header, +}; + +pub struct LazOpener { + /// Column indexes in `table_schema` needed by the query + pub projection: Arc<[usize]>, + /// Optional limit on the number of rows to read + pub limit: Option<usize>, + pub predicate: Option<Arc<dyn PhysicalExpr>>, + /// Factory for instantiating laz reader + pub laz_file_reader_factory: Arc<LazFileReaderFactory>, + /// Table options + pub options: LazTableOptions, +} + +impl FileOpener for LazOpener { + fn open( + &self, + _file_meta: FileMeta, + file: PartitionedFile, + ) -> Result<FileOpenFuture, DataFusionError> { + let projection = self.projection.clone(); + let limit = self.limit; + + let predicate = self.predicate.clone(); + + let laz_reader: Box<LazFileReader> = self + .laz_file_reader_factory + .create_reader(file.clone(), self.options.clone())?; + + Ok(Box::pin(async move { + let metadata = laz_reader.get_metadata().await?; + let schema = Arc::new(schema_from_header( + &metadata.header, + laz_reader.options.point_encoding, + laz_reader.options.extra_bytes, + )); + + let pruning_predicate = predicate.and_then(|physical_expr| { + PruningPredicate::try_new(physical_expr, schema.clone()).ok() + }); + + // file pruning + if let Some(pruning_predicate) = &pruning_predicate { + if let Some(statistics) = file.statistics { + let prunable_statistics = PrunableStatistics::new(vec![statistics], schema); + if let Ok(filter) = pruning_predicate.prune(&prunable_statistics) { + if !filter[0] { + return Ok(futures::stream::empty().boxed()); + } + } + } + } + + // map chunk table + let chunk_table: Vec<_> = metadata + .chunk_table + .iter() + .filter(|chunk_meta| { + file.range.as_ref().is_none_or(|range| { + let offset = chunk_meta.byte_range.start; + offset >= range.start as u64 && offset < range.end as u64 + }) + }) + .cloned() + .collect(); + + let mut row_count = 0; + + let stream = async_stream::try_stream! { + for chunk_meta in chunk_table.into_iter() { + // limit + if let Some(limit) = limit { + if row_count >= limit { + break; + } + } + + // fetch batch + let record_batch = laz_reader.get_batch(&chunk_meta).await?; Review Comment: Does this fetch respect the requested batch size? (I think it's in the partitioned file/meta, which merged in DataFusion 51) ########## rust/sedona-pointcloud/src/laz/builder.rs: ########## @@ -0,0 +1,569 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::Debug, sync::Arc}; + +use arrow_array::{ + builder::{ + ArrayBuilder, BooleanBuilder, FixedSizeBinaryBuilder, Float32Builder, Float64Builder, + Int16Builder, Int32Builder, Int64Builder, Int8Builder, UInt16Builder, UInt32Builder, + UInt64Builder, UInt8Builder, + }, + Array, ArrayRef, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, StructArray, + UInt16Array, UInt8Array, +}; +use arrow_buffer::ScalarBuffer; +use arrow_schema::DataType; +use geoarrow_array::{ + array::{CoordBuffer, PointArray, SeparatedCoordBuffer}, + builder::WkbBuilder, + capacity::WkbCapacity, + GeoArrowArray, +}; +use geoarrow_schema::{Dimension, WkbType}; +use las::{Header, Point}; + +use crate::laz::{ + metadata::ExtraAttribute, + options::{LasExtraBytes, LasPointEncoding}, + schema::schema_from_header, +}; + +#[derive(Debug)] +pub struct RowBuilder { + x: Float64Builder, + y: Float64Builder, + z: Float64Builder, + intensity: UInt16Builder, + return_number: UInt8Builder, + number_of_returns: UInt8Builder, + is_synthetic: BooleanBuilder, + is_key_point: BooleanBuilder, + is_withheld: BooleanBuilder, + is_overlap: BooleanBuilder, + scanner_channel: UInt8Builder, + scan_direction: UInt8Builder, + is_edge_of_flight_line: BooleanBuilder, + classification: UInt8Builder, + user_data: UInt8Builder, + scan_angle: Float32Builder, + point_source_id: UInt16Builder, + gps_time: Float64Builder, + red: UInt16Builder, + green: UInt16Builder, + blue: UInt16Builder, + nir: UInt16Builder, + extra: FixedSizeBinaryBuilder, + header: Arc<Header>, + point_encoding: LasPointEncoding, + extra_bytes: LasExtraBytes, + extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl RowBuilder { + pub fn new(capacity: usize, header: Arc<Header>) -> Self { + Self { + x: Float64Array::builder(capacity), + y: Float64Array::builder(capacity), + z: Float64Array::builder(capacity), + intensity: UInt16Array::builder(capacity), + return_number: UInt8Array::builder(capacity), + number_of_returns: UInt8Array::builder(capacity), + is_synthetic: BooleanArray::builder(capacity), + is_key_point: BooleanArray::builder(capacity), + is_withheld: BooleanArray::builder(capacity), + is_overlap: BooleanArray::builder(capacity), + scanner_channel: UInt8Array::builder(capacity), + scan_direction: UInt8Array::builder(capacity), + is_edge_of_flight_line: BooleanArray::builder(capacity), + classification: UInt8Array::builder(capacity), + user_data: UInt8Array::builder(capacity), + scan_angle: Float32Array::builder(capacity), + point_source_id: UInt16Array::builder(capacity), + gps_time: Float64Array::builder(capacity), + red: UInt16Array::builder(capacity), + green: UInt16Array::builder(capacity), + blue: UInt16Array::builder(capacity), + nir: UInt16Array::builder(capacity), + extra: FixedSizeBinaryBuilder::with_capacity( + capacity, + header.point_format().extra_bytes as i32, + ), + + header, + point_encoding: Default::default(), + extra_bytes: Default::default(), + extra_attributes: Arc::new(Vec::new()), + } + } + + pub fn with_point_encoding(mut self, point_encoding: LasPointEncoding) -> Self { + self.point_encoding = point_encoding; + self + } + + pub fn with_extra_attributes( + mut self, + attributes: Arc<Vec<ExtraAttribute>>, + extra_bytes: LasExtraBytes, + ) -> Self { + self.extra_attributes = attributes; + self.extra_bytes = extra_bytes; + self + } + + pub fn append(&mut self, p: Point) { + self.x.append_value(p.x); + self.y.append_value(p.y); + self.z.append_value(p.z); + self.intensity.append_option(Some(p.intensity)); + self.return_number.append_value(p.return_number); + self.number_of_returns.append_value(p.number_of_returns); + self.is_synthetic.append_value(p.is_synthetic); + self.is_key_point.append_value(p.is_key_point); + self.is_withheld.append_value(p.is_withheld); + self.is_overlap.append_value(p.is_overlap); + self.scanner_channel.append_value(p.scanner_channel); + self.scan_direction.append_value(p.scan_direction as u8); + self.is_edge_of_flight_line + .append_value(p.is_edge_of_flight_line); + self.classification.append_value(u8::from(p.classification)); + self.user_data.append_value(p.user_data); + self.scan_angle.append_value(p.scan_angle); + self.point_source_id.append_value(p.point_source_id); + if self.header.point_format().has_gps_time { + self.gps_time.append_value(p.gps_time.unwrap()); + } + if self.header.point_format().has_color { + let color = p.color.unwrap(); + self.red.append_value(color.red); + self.green.append_value(color.green); + self.blue.append_value(color.blue); + } + if self.header.point_format().has_nir { + self.nir.append_value(p.nir.unwrap()); + } + if self.header.point_format().extra_bytes > 0 { + self.extra.append_value(p.extra_bytes).unwrap(); + } + } + + /// Note: returns StructArray to allow nesting within another array if desired + pub fn finish(&mut self) -> StructArray { + let schema = schema_from_header(&self.header, self.point_encoding, self.extra_bytes); + + let mut columns = match self.point_encoding { + LasPointEncoding::Plain => vec![ + Arc::new(self.x.finish()) as ArrayRef, + Arc::new(self.y.finish()) as ArrayRef, + Arc::new(self.z.finish()) as ArrayRef, + ], + LasPointEncoding::Wkb => { + const POINT_SIZE: usize = 29; + + let n: usize = self.x.len(); + + let capacity = WkbCapacity::new(POINT_SIZE * n, 4 * n); + let mut builder = WkbBuilder::<i32>::with_capacity(WkbType::default(), capacity); + + let x = self.x.finish(); + let y = self.y.finish(); + let z = self.z.finish(); + + let mut wkb_bytes = [0_u8; POINT_SIZE]; + wkb_bytes[0] = 0x01; // Little-endian + wkb_bytes[1..5].copy_from_slice(&[0xE9, 0x03, 0x00, 0x00]); // Point Z type (1001) + + for i in 0..n { + let x = unsafe { x.value_unchecked(i) }; + let y = unsafe { y.value_unchecked(i) }; + let z = unsafe { z.value_unchecked(i) }; + + wkb_bytes[5..13].copy_from_slice(x.to_le_bytes().as_slice()); + wkb_bytes[13..21].copy_from_slice(y.to_le_bytes().as_slice()); + wkb_bytes[21..29].copy_from_slice(z.to_le_bytes().as_slice()); + + unsafe { builder.push_wkb_unchecked(Some(&wkb_bytes)) }; + } Review Comment: This can probably use an `arrow_array` `BinaryBuilder` since all we're doing is pushing raw bytes into an array ########## rust/sedona-pointcloud/src/laz/options.rs: ########## @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::Display, str::FromStr}; + +use datafusion_common::{ + config::{ConfigExtension, ConfigField, Visit}, + error::DataFusionError, + extensions_options, +}; + +/// Geometry representation +#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] +pub enum LasPointEncoding { Review Comment: This would benefit from some docstrings to explain how this option affects the output schema. For the purposes of SedonaDB, it might make sense to have the default be `Wkb` so that it works with `ST_Intersects()` pruning when doing `SELECT * FROM 'foofy.laz' WHERE ST_Intersects(...)`. This can also be something we apply at a higher level (e.g., as a default in the Python bindings). ########## Cargo.toml: ########## @@ -83,12 +85,14 @@ datafusion = { version = "50.2.0", default-features = false } datafusion-catalog = { version = "50.2.0" } datafusion-common = { version = "50.2.0", default-features = false } datafusion-common-runtime = { version = "50.2.0", default-features = false } +datafusion-datasource = { version = "50.2.0", default-features = false } Review Comment: Apologies that I updated to DataFusion 51 in a recent PR 😬 (there are a few datasource related changes that may force some minor changes here) ########## rust/sedona-pointcloud/src/laz/opener.rs: ########## @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{error::DataFusionError, pruning::PrunableStatistics}; +use datafusion_datasource::{ + file_meta::FileMeta, + file_stream::{FileOpenFuture, FileOpener}, + PartitionedFile, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_pruning::PruningPredicate; +use futures::StreamExt; + +use crate::laz::{ + options::LazTableOptions, + reader::{LazFileReader, LazFileReaderFactory}, + schema::schema_from_header, +}; + +pub struct LazOpener { + /// Column indexes in `table_schema` needed by the query + pub projection: Arc<[usize]>, + /// Optional limit on the number of rows to read + pub limit: Option<usize>, + pub predicate: Option<Arc<dyn PhysicalExpr>>, + /// Factory for instantiating laz reader + pub laz_file_reader_factory: Arc<LazFileReaderFactory>, + /// Table options + pub options: LazTableOptions, +} + +impl FileOpener for LazOpener { + fn open( + &self, + _file_meta: FileMeta, + file: PartitionedFile, + ) -> Result<FileOpenFuture, DataFusionError> { + let projection = self.projection.clone(); + let limit = self.limit; + + let predicate = self.predicate.clone(); + + let laz_reader: Box<LazFileReader> = self + .laz_file_reader_factory + .create_reader(file.clone(), self.options.clone())?; + + Ok(Box::pin(async move { + let metadata = laz_reader.get_metadata().await?; + let schema = Arc::new(schema_from_header( + &metadata.header, + laz_reader.options.point_encoding, + laz_reader.options.extra_bytes, + )); + + let pruning_predicate = predicate.and_then(|physical_expr| { + PruningPredicate::try_new(physical_expr, schema.clone()).ok() + }); + + // file pruning + if let Some(pruning_predicate) = &pruning_predicate { + if let Some(statistics) = file.statistics { + let prunable_statistics = PrunableStatistics::new(vec![statistics], schema); + if let Ok(filter) = pruning_predicate.prune(&prunable_statistics) { + if !filter[0] { + return Ok(futures::stream::empty().boxed()); + } + } + } + } Review Comment: Does this handle file pruning based on `st_intersects()` and friends? Our helper for that is: https://github.com/apache/sedona-db/blob/e80fb37b2a15a9708420fef0cc575b9e5274972e/rust/sedona-geoparquet/src/file_opener.rs#L128 ########## rust/sedona-pointcloud/src/laz/schema.rs: ########## @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema}; +use geoarrow_schema::{CoordType, Crs, Dimension, Metadata, PointType, WkbType}; +use las::Header; + +use crate::laz::options::{LasExtraBytes, LasPointEncoding}; + +// Arrow schema for LAS points +pub fn schema_from_header( + header: &Header, + point_encoding: LasPointEncoding, + extra_bytes: LasExtraBytes, +) -> Schema { + let crs = header + .get_wkt_crs_bytes() + .and_then(|b| String::from_utf8(b.to_vec()).ok().map(Crs::from_wkt2_2019)) + .unwrap_or_default(); Review Comment: I'll have to add WKT2 CRS support to SedonaDB's CRS helper (I think right now it will error). ########## rust/sedona-pointcloud/src/laz/opener.rs: ########## @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{error::DataFusionError, pruning::PrunableStatistics}; +use datafusion_datasource::{ + file_meta::FileMeta, + file_stream::{FileOpenFuture, FileOpener}, + PartitionedFile, +}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_pruning::PruningPredicate; +use futures::StreamExt; + +use crate::laz::{ + options::LazTableOptions, + reader::{LazFileReader, LazFileReaderFactory}, + schema::schema_from_header, +}; + +pub struct LazOpener { + /// Column indexes in `table_schema` needed by the query + pub projection: Arc<[usize]>, + /// Optional limit on the number of rows to read + pub limit: Option<usize>, + pub predicate: Option<Arc<dyn PhysicalExpr>>, + /// Factory for instantiating laz reader + pub laz_file_reader_factory: Arc<LazFileReaderFactory>, + /// Table options + pub options: LazTableOptions, +} + +impl FileOpener for LazOpener { + fn open( + &self, + _file_meta: FileMeta, + file: PartitionedFile, + ) -> Result<FileOpenFuture, DataFusionError> { + let projection = self.projection.clone(); + let limit = self.limit; + + let predicate = self.predicate.clone(); + + let laz_reader: Box<LazFileReader> = self + .laz_file_reader_factory + .create_reader(file.clone(), self.options.clone())?; + + Ok(Box::pin(async move { + let metadata = laz_reader.get_metadata().await?; + let schema = Arc::new(schema_from_header( + &metadata.header, + laz_reader.options.point_encoding, + laz_reader.options.extra_bytes, + )); + + let pruning_predicate = predicate.and_then(|physical_expr| { + PruningPredicate::try_new(physical_expr, schema.clone()).ok() + }); + + // file pruning + if let Some(pruning_predicate) = &pruning_predicate { + if let Some(statistics) = file.statistics { + let prunable_statistics = PrunableStatistics::new(vec![statistics], schema); + if let Ok(filter) = pruning_predicate.prune(&prunable_statistics) { + if !filter[0] { + return Ok(futures::stream::empty().boxed()); + } + } + } + } + + // map chunk table + let chunk_table: Vec<_> = metadata + .chunk_table + .iter() + .filter(|chunk_meta| { + file.range.as_ref().is_none_or(|range| { + let offset = chunk_meta.byte_range.start; + offset >= range.start as u64 && offset < range.end as u64 + }) + }) + .cloned() + .collect(); + + let mut row_count = 0; + + let stream = async_stream::try_stream! { + for chunk_meta in chunk_table.into_iter() { + // limit + if let Some(limit) = limit { + if row_count >= limit { + break; + } + } + + // fetch batch + let record_batch = laz_reader.get_batch(&chunk_meta).await?; + row_count += record_batch.num_rows(); + + // project + let record_batch = record_batch + .project(&projection) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + + yield record_batch + } + + }; + + // let stream = futures::stream::iter(batches); + Review Comment: ```suggestion ``` ########## rust/sedona-pointcloud/src/laz/format.rs: ########## @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, fmt, sync::Arc}; + +use arrow_schema::{Schema, SchemaRef}; +use datafusion_catalog::{memory::DataSourceExec, Session}; +use datafusion_common::{ + config::ExtensionOptions, error::DataFusionError, internal_err, + parsers::CompressionTypeVariant, GetExt, Statistics, +}; +use datafusion_datasource::{ + file::FileSource, + file_compression_type::FileCompressionType, + file_format::{FileFormat, FileFormatFactory}, + file_scan_config::FileScanConfig, +}; +use datafusion_physical_plan::ExecutionPlan; +use futures::{StreamExt, TryStreamExt}; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{metadata::LazMetadataReader, options::LazTableOptions, source::LazSource}; + +const DEFAULT_LAZ_EXTENSION: &str = ".laz"; + +/// Factory struct used to create [LazFormat] +#[derive(Default)] +pub struct LazFormatFactory { + // inner options for LAZ + pub options: Option<LazTableOptions>, +} + +impl LazFormatFactory { + /// Creates an instance of [LazFormatFactory] + pub fn new() -> Self { + Self { options: None } + } + + /// Creates an instance of [LazFormatFactory] with customized default options + pub fn new_with(options: LazTableOptions) -> Self { + Self { + options: Some(options), + } + } +} + +impl FileFormatFactory for LazFormatFactory { + fn create( + &self, + state: &dyn Session, + format_options: &HashMap<String, String>, + ) -> Result<Arc<dyn FileFormat>, DataFusionError> { + let mut laz_options = state + .config_options() + .extensions + .get::<LazTableOptions>() + .or_else(|| state.table_options().extensions.get::<LazTableOptions>()) + .cloned() + .or(self.options.clone()) + .unwrap_or_default(); + + for (k, v) in format_options { + laz_options.set(k, v)?; + } + + Ok(Arc::new(LazFormat::default().with_options(laz_options))) + } + + fn default(&self) -> Arc<dyn FileFormat> { + Arc::new(LazFormat::default()) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl GetExt for LazFormatFactory { + fn get_ext(&self) -> String { + // Removes the dot, i.e. ".laz" -> "laz" + DEFAULT_LAZ_EXTENSION[1..].to_string() + } +} + +impl fmt::Debug for LazFormatFactory { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LazFormatFactory") + .field("LazFormatFactory", &self.options) + .finish() + } +} + +/// The LAZ `FileFormat` implementation +#[derive(Debug, Default)] +pub struct LazFormat { + pub options: LazTableOptions, +} + +impl LazFormat { + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } +} + +#[async_trait::async_trait] +impl FileFormat for LazFormat { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_ext(&self) -> String { + LazFormatFactory::new().get_ext() + // DEFAULT_LAZ_EXTENSION[1..].to_string() Review Comment: ```suggestion LazFormatFactory::new().get_ext() ``` ########## rust/sedona-pointcloud/src/laz/options.rs: ########## @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fmt::Display, str::FromStr}; + +use datafusion_common::{ + config::{ConfigExtension, ConfigField, Visit}, + error::DataFusionError, + extensions_options, +}; + +/// Geometry representation +#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] +pub enum LasPointEncoding { + #[default] + Plain, + Wkb, + Nativ, Review Comment: ```suggestion Native, ``` ########## rust/sedona-pointcloud/src/laz/metadata.rs: ########## @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, error::Error, io::Cursor, ops::Range, sync::Arc}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range<u64>, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc<Header>, + pub chunk_table: Vec<ChunkMeta>, + pub extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + // TODO: proper size + std::mem::size_of_val(self) Review Comment: + `chunk_table` capacity in bytes + `extra_attributes` capacity in bytes? ########## rust/sedona-pointcloud/src/laz/schema.rs: ########## @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow_schema::{DataType, Field, Schema}; +use geoarrow_schema::{CoordType, Crs, Dimension, Metadata, PointType, WkbType}; +use las::Header; + +use crate::laz::options::{LasExtraBytes, LasPointEncoding}; + +// Arrow schema for LAS points +pub fn schema_from_header( + header: &Header, + point_encoding: LasPointEncoding, + extra_bytes: LasExtraBytes, +) -> Schema { + let crs = header + .get_wkt_crs_bytes() + .and_then(|b| String::from_utf8(b.to_vec()).ok().map(Crs::from_wkt2_2019)) + .unwrap_or_default(); + + let mut fields = match point_encoding { + LasPointEncoding::Plain => vec![ + Field::new("x", DataType::Float64, false), + Field::new("y", DataType::Float64, false), + Field::new("z", DataType::Float64, false), + ], + LasPointEncoding::Wkb => { + let point_type = WkbType::new(Arc::new(Metadata::new(crs, None))); + vec![Field::new("geometry", DataType::Binary, false).with_extension_type(point_type)] + } + LasPointEncoding::Nativ => { + let point_type = PointType::new(Dimension::XYZ, Arc::new(Metadata::new(crs, None))) + .with_coord_type(CoordType::Separated); + vec![point_type.to_field("geometry", false)] Review Comment: No need to change this, but I'll have to add `SedonaType::GeoArrowNative` or `SedonaType::GenericExtension` for this to work with SedonaDB (or else it will give an unsupported type error). ########## rust/sedona-pointcloud/src/laz/metadata.rs: ########## @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, collections::HashMap, error::Error, io::Cursor, ops::Range, sync::Arc}; + +use arrow_schema::{DataType, Schema, SchemaRef}; +use datafusion_common::{ + error::DataFusionError, scalar::ScalarValue, stats::Precision, ColumnStatistics, Statistics, +}; +use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache}; +use las::{ + raw::{Header as RawHeader, Vlr as RawVlr}, + Builder, Header, Vlr, +}; +use laz::laszip::ChunkTable; +use object_store::{ObjectMeta, ObjectStore}; + +use crate::laz::{options::LazTableOptions, schema::schema_from_header}; + +/// Laz chunk metadata +#[derive(Debug, Clone)] +pub struct ChunkMeta { + pub num_points: u64, + pub point_offset: u64, + pub byte_range: Range<u64>, +} + +/// Laz metadata +#[derive(Debug, Clone)] +pub struct LazMetadata { + pub header: Arc<Header>, + pub chunk_table: Vec<ChunkMeta>, + pub extra_attributes: Arc<Vec<ExtraAttribute>>, +} + +impl FileMetadata for LazMetadata { + fn as_any(&self) -> &dyn Any { + self + } + + fn memory_size(&self) -> usize { + // TODO: proper size + std::mem::size_of_val(self) + } + + fn extra_info(&self) -> HashMap<String, String> { + HashMap::new() + } +} + +/// Reader for laz file metadata in object storage. +pub struct LazMetadataReader<'a> { + store: &'a dyn ObjectStore, + object_meta: &'a ObjectMeta, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + options: LazTableOptions, +} + +impl<'a> LazMetadataReader<'a> { + pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self { + Self { + store, + object_meta, + file_metadata_cache: None, + options: Default::default(), + } + } + + /// set file metadata cache + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + ) -> Self { + self.file_metadata_cache = file_metadata_cache; + self + } + + /// set table options + pub fn with_options(mut self, options: LazTableOptions) -> Self { + self.options = options; + self + } + + /// Fetch header + pub async fn fetch_header(&self) -> Result<Header, DataFusionError> { + fetch_header(self.store, self.object_meta, false) + .await + .map_err(DataFusionError::External) + } + + /// Fetch laz metadata from the remote object store + pub async fn fetch_metadata(&self) -> Result<Arc<LazMetadata>, DataFusionError> { + let Self { + store, + object_meta, + file_metadata_cache, + options: _, + } = self; + + if let Some(las_file_metadata) = file_metadata_cache + .as_ref() + .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta)) + .and_then(|file_metadata| { + file_metadata + .as_any() + .downcast_ref::<LazMetadata>() + .map(|laz_file_metadata| Arc::new(laz_file_metadata.to_owned())) + }) + { + return Ok(las_file_metadata); + } + + let header = self.fetch_header().await?; + let extra_attributes = extra_bytes_attributes(&header); + let chunk_table = chunk_table(*store, object_meta, &header).await.unwrap(); + + let metadata = Arc::new(LazMetadata { + header: Arc::new(header), + chunk_table, + extra_attributes: Arc::new(extra_attributes), + }); + + if let Some(file_metadata_cache) = file_metadata_cache { + file_metadata_cache.put(object_meta, metadata.clone()); + } + + Ok(metadata) + } + + /// Read and parse the schema of the laz file + pub async fn fetch_schema(&mut self) -> Result<Schema, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let schema = schema_from_header( + &metadata.header, + self.options.point_encoding, + self.options.extra_bytes, + ); + + Ok(schema) + } + + /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and extracts + /// the statistics in the metadata + pub async fn fetch_statistics( + &self, + table_schema: &SchemaRef, + ) -> Result<Statistics, DataFusionError> { + let metadata = self.fetch_metadata().await?; + + let mut statistics = Statistics::new_unknown(table_schema) + .with_num_rows(Precision::Exact(metadata.header.number_of_points() as usize)) + .with_total_byte_size(Precision::Exact( + metadata + .chunk_table + .iter() + .map(|meta| meta.byte_range.end - meta.byte_range.start) + .sum::<u64>() as usize, + )); + + let bounds = metadata.header.bounds(); + for field in table_schema.fields() { + let cs = match field.name().as_str() { + "x" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.x)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.x)))) + .with_null_count(Precision::Exact(0)), + "y" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.y)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.y)))) + .with_null_count(Precision::Exact(0)), + "z" => ColumnStatistics::new_unknown() + .with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.z)))) + .with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.z)))) + .with_null_count(Precision::Exact(0)), + _ => ColumnStatistics::new_unknown(), + }; + + statistics = statistics.add_column_statistics(cs); + } + + Ok(statistics) + } +} + +pub(crate) async fn fetch_header( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, + include_evlr: bool, +) -> Result<Header, Box<dyn Error + Send + Sync>> { + let location = &object_meta.location; + + // Header + let bytes = store.get_range(location, 0..375).await?; + let reader = Cursor::new(bytes); + let raw_header = RawHeader::read_from(reader)?; + + let header_size = raw_header.header_size as u64; + let num_vlr = raw_header.number_of_variable_length_records; + let offset_to_point_data = raw_header.offset_to_point_data as u64; + let evlr = raw_header.evlr; + + // VLRs + let bytes = store + .get_range(location, header_size..offset_to_point_data) + .await?; + let mut reader = Cursor::new(bytes); + + let mut builder = Builder::new(raw_header)?; + let mut position = 0; + for _ in 0..num_vlr { + let vlr = RawVlr::read_from(&mut reader, false).map(Vlr::new)?; + position += vlr.len(false) as u64; + builder.vlrs.push(vlr); + } + + builder.padding = vec![0; (offset_to_point_data - (header_size + position)) as usize]; + + // EVLRs + if include_evlr { + if let Some(evlr) = evlr { + let mut start = evlr.start_of_first_evlr; + + for _ in 0..evlr.number_of_evlrs { + let mut end = start + 60; + + let bytes = store.get_range(location, start..end).await?; + + end += u64::from_le_bytes(bytes[20..28].try_into()?); + + let bytes = store.get_range(location, start..end).await?; + let mut reader = Cursor::new(bytes); + let evlr = RawVlr::read_from(&mut reader, true).map(Vlr::new)?; + + builder.evlrs.push(evlr); + + start = end; + } + } + } + + Ok(builder.into_header()?) +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ExtraAttribute { + pub data_type: DataType, + pub no_data: Option<[u8; 8]>, + pub scale: Option<f64>, + pub offset: Option<f64>, +} + +pub(crate) fn extra_bytes_attributes(header: &Header) -> Vec<ExtraAttribute> { + let mut attributes = Vec::new(); + + for vlr in header.all_vlrs() { + if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) { + continue; + } + + for bytes in vlr.data.chunks(192) { + // data type + let data_type = match bytes[2] { + 0 => DataType::FixedSizeBinary(bytes[3] as i32), + 1 => DataType::UInt8, + 2 => DataType::Int8, + 3 => DataType::UInt16, + 4 => DataType::Int16, + 5 => DataType::UInt32, + 6 => DataType::Int32, + 7 => DataType::UInt64, + 8 => DataType::Int64, + 9 => DataType::Float32, + 10 => DataType::Float64, + 11..=30 => panic!("deprecated extra bytes data type"), + 31..=255 => panic!("reserved extra butes data type"), Review Comment: These panics (and the unwraps elsewhere in this file) should probably return errors instead of panics (they could occur in normal usage when reading a fuzzed/corrupted file). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
