This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new e8d4ed31 feat(rust/sedona-pointcloud): Initial LAZ format support 
(#471)
e8d4ed31 is described below

commit e8d4ed31067ec31b098bfe66363d3dd95ee49bb2
Author: Balthasar Teuscher <[email protected]>
AuthorDate: Tue Feb 10 18:49:22 2026 +0100

    feat(rust/sedona-pointcloud): Initial LAZ format support (#471)
    
    Co-authored-by: Dewey Dunnington <[email protected]>
    Co-authored-by: Dewey Dunnington <[email protected]>
---
 .github/workflows/examples.yml                |   3 +
 Cargo.lock                                    | 130 +++++
 Cargo.toml                                    |   5 +
 examples/sedonadb-rust-pointcloud/Cargo.toml  |  31 ++
 examples/sedonadb-rust-pointcloud/src/main.rs |  31 ++
 rust/sedona-pointcloud/Cargo.toml             |  59 +++
 rust/sedona-pointcloud/src/laz/builder.rs     | 660 ++++++++++++++++++++++++++
 rust/sedona-pointcloud/src/laz/format.rs      | 291 ++++++++++++
 rust/sedona-pointcloud/src/laz/metadata.rs    | 452 ++++++++++++++++++
 rust/sedona-pointcloud/src/laz/mod.rs         |  25 +
 rust/sedona-pointcloud/src/laz/opener.rs      | 156 ++++++
 rust/sedona-pointcloud/src/laz/options.rs     | 138 ++++++
 rust/sedona-pointcloud/src/laz/reader.rs      | 232 +++++++++
 rust/sedona-pointcloud/src/laz/schema.rs      | 160 +++++++
 rust/sedona-pointcloud/src/laz/source.rs      | 153 ++++++
 rust/sedona-pointcloud/src/lib.rs             |  19 +
 rust/sedona-pointcloud/src/options.rs         | 101 ++++
 rust/sedona-pointcloud/tests/data/extra.laz   | Bin 0 -> 7018 bytes
 rust/sedona-pointcloud/tests/data/generate.py |  91 ++++
 rust/sedona/Cargo.toml                        |   2 +
 rust/sedona/src/context.rs                    |  16 +
 21 files changed, 2755 insertions(+)

diff --git a/.github/workflows/examples.yml b/.github/workflows/examples.yml
index 263e75d8..b68aa645 100644
--- a/.github/workflows/examples.yml
+++ b/.github/workflows/examples.yml
@@ -88,3 +88,6 @@ jobs:
           sed -i "s|git = \"https://github.com/apache/sedona-db.git\";|git = 
\"https://github.com/${REPO}.git\";, rev = \"${REV}\"|" Cargo.toml
           cat Cargo.toml
           cargo run
+
+      - name: Build and run examples/sedonadb-rust-pointcloud
+        run: cargo run -p sedonadb-rust-pointcloud-example
diff --git a/Cargo.lock b/Cargo.lock
index ced38880..041c4d12 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -534,6 +534,28 @@ dependencies = [
  "abi_stable",
 ]
 
+[[package]]
+name = "async-stream"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.114",
+]
+
 [[package]]
 name = "async-trait"
 version = "0.1.89"
@@ -1187,8 +1209,10 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2"
 dependencies = [
  "iana-time-zone",
+ "js-sys",
  "num-traits",
  "serde",
+ "wasm-bindgen",
  "windows-link",
 ]
 
@@ -2801,6 +2825,35 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "geoarrow-array"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "dc1cc4106ac0a0a512c398961ce95d8150475c84a84e17c4511c3643fa120a17"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-schema",
+ "geo-traits",
+ "geoarrow-schema",
+ "num-traits",
+ "wkb",
+ "wkt 0.14.0",
+]
+
+[[package]]
+name = "geoarrow-schema"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e97be4e9f523f92bd6a0e0458323f4b783d073d011664decd8dbf05651704f34"
+dependencies = [
+ "arrow-schema",
+ "geo-traits",
+ "serde",
+ "serde_json",
+ "thiserror 1.0.69",
+]
+
 [[package]]
 name = "geographiclib-rs"
 version = "0.2.5"
@@ -3508,6 +3561,42 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "las"
+version = "0.9.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "501d35a98a0bf1e7d30cd61899741af22f853110021be78175926cc870a39dfd"
+dependencies = [
+ "byteorder",
+ "chrono",
+ "laz",
+ "log",
+ "num-traits",
+ "thiserror 2.0.17",
+ "uuid",
+]
+
+[[package]]
+name = "las-crs"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7b25b1e192564ebf5b563e821c4924ce97bd703f445e851117db2e656499c21c"
+dependencies = [
+ "las",
+ "log",
+ "thiserror 2.0.17",
+]
+
+[[package]]
+name = "laz"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7390b35d7f7e1f22bdbe5ef4e19a4b9f8ba44927cb55d267e42cafc180fca48c"
+dependencies = [
+ "byteorder",
+ "num-traits",
+]
+
 [[package]]
 name = "leb128fmt"
 version = "0.1.0"
@@ -5031,6 +5120,7 @@ dependencies = [
  "sedona-geometry",
  "sedona-geoparquet",
  "sedona-geos",
+ "sedona-pointcloud",
  "sedona-proj",
  "sedona-raster-functions",
  "sedona-s2geography",
@@ -5346,6 +5436,37 @@ dependencies = [
  "which",
 ]
 
+[[package]]
+name = "sedona-pointcloud"
+version = "0.3.0"
+dependencies = [
+ "arrow-array",
+ "arrow-buffer",
+ "arrow-schema",
+ "async-stream",
+ "async-trait",
+ "bytes",
+ "datafusion",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-datasource",
+ "datafusion-execution",
+ "datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "datafusion-pruning",
+ "futures",
+ "geoarrow-array",
+ "geoarrow-schema",
+ "las",
+ "las-crs",
+ "laz",
+ "object_store",
+ "sedona-expr",
+ "sedona-geometry",
+ "tempfile",
+ "tokio",
+]
+
 [[package]]
 name = "sedona-proj"
 version = "0.3.0"
@@ -5563,6 +5684,15 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "sedonadb-rust-pointcloud-example"
+version = "0.0.1"
+dependencies = [
+ "sedona",
+ "sedona-pointcloud",
+ "tokio",
+]
+
 [[package]]
 name = "sedonadbr"
 version = "0.3.0"
diff --git a/Cargo.toml b/Cargo.toml
index eee05333..1602ddc3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,6 +34,7 @@ members = [
     "rust/sedona-geo",
     "rust/sedona-geometry",
     "rust/sedona-geoparquet",
+    "rust/sedona-pointcloud",
     "rust/sedona-raster",
     "rust/sedona-raster-functions",
     "rust/sedona-schema",
@@ -41,6 +42,7 @@ members = [
     "rust/sedona-testing",
     "rust/sedona",
     "sedona-cli",
+    "examples/sedonadb-rust-pointcloud",
 ]
 resolver = "2"
 
@@ -81,6 +83,7 @@ datafusion = { version = "51.0.0", default-features = false }
 datafusion-catalog = { version = "51.0.0" }
 datafusion-common = { version = "51.0.0", default-features = false }
 datafusion-common-runtime = { version = "51.0.0", default-features = false }
+datafusion-datasource = { version = "51.0.0", default-features = false }
 datafusion-datasource-parquet = { version = "51.0.0" }
 datafusion-execution = { version = "51.0.0", default-features = false }
 datafusion-expr = { version = "51.0.0" }
@@ -88,6 +91,7 @@ datafusion-ffi = {  version = "51.0.0" }
 datafusion-functions-nested = { version = "51.0.0" }
 datafusion-physical-expr = { version = "51.0.0" }
 datafusion-physical-plan = { version = "51.0.0" }
+datafusion-pruning = { version = "51.0.0" }
 dirs = "6.0.0"
 env_logger = "0.11"
 log = "^0.4"
@@ -143,6 +147,7 @@ sedona-geo-generic-alg = { version = "0.3.0", path = 
"rust/sedona-geo-generic-al
 sedona-geo-traits-ext = { version = "0.3.0", path = 
"rust/sedona-geo-traits-ext" }
 sedona-geometry = { version = "0.3.0", path = "rust/sedona-geometry" }
 sedona-geoparquet = { version = "0.3.0", path = "rust/sedona-geoparquet" }
+sedona-pointcloud = { version = "0.3.0", path = "rust/sedona-pointcloud" }
 sedona-raster = { version = "0.3.0", path = "rust/sedona-raster" }
 sedona-raster-functions = { version = "0.3.0", path = 
"rust/sedona-raster-functions" }
 sedona-schema = { version = "0.3.0", path = "rust/sedona-schema" }
diff --git a/examples/sedonadb-rust-pointcloud/Cargo.toml 
b/examples/sedonadb-rust-pointcloud/Cargo.toml
new file mode 100644
index 00000000..c3186cac
--- /dev/null
+++ b/examples/sedonadb-rust-pointcloud/Cargo.toml
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedonadb-rust-pointcloud-example"
+version = "0.0.1"
+authors.workspace = true
+license.workspace = true
+description = "Apache SedonaDB Rust API Example"
+edition.workspace = true
+rust-version.workspace = true
+publish = false
+
+[dependencies]
+sedona = { workspace = true, features = ["pointcloud"] }
+sedona-pointcloud = { workspace = true }
+tokio = { workspace = true, features = ["rt-multi-thread"]}
diff --git a/examples/sedonadb-rust-pointcloud/src/main.rs 
b/examples/sedonadb-rust-pointcloud/src/main.rs
new file mode 100644
index 00000000..48ad5449
--- /dev/null
+++ b/examples/sedonadb-rust-pointcloud/src/main.rs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// Because a number of methods only return Err() for not implemented,
+// the compiler doesn't know how to guess which impl RecordBatchReader
+// will be returned. When we implement the methods, we can remove this.
+
+use sedona::context::{SedonaContext, SedonaDataFrame};
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let ctx = SedonaContext::new_local_interactive().await?;
+    let url = "rust/sedona-pointcloud/tests/data/extra.laz";
+    let df = ctx.sql(&format!("SELECT geometry FROM \"{url}\" ")).await?;
+    let output = df.show_sedona(&ctx, Some(5), Default::default()).await?;
+    println!("{output}");
+    Ok(())
+}
diff --git a/rust/sedona-pointcloud/Cargo.toml 
b/rust/sedona-pointcloud/Cargo.toml
new file mode 100644
index 00000000..82aaeaeb
--- /dev/null
+++ b/rust/sedona-pointcloud/Cargo.toml
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedona-pointcloud"
+version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+keywords.workspace = true
+categories.workspace = true
+
+[dependencies]
+arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
+arrow-schema = { workspace = true }
+async-stream = "0.3.6"
+async-trait = { workspace = true }
+bytes = { workspace = true }
+datafusion-catalog = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-datasource = { workspace = true }
+datafusion-execution = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+datafusion-pruning = { workspace = true }
+futures = { workspace = true }
+geoarrow-array = { version = "0.7.0" }
+geoarrow-schema = { version = "0.7.0" }
+las = { version = "0.9.10", features = ["laz"] }
+las-crs = { version = "1.0.0" }
+laz = "0.12.0"
+object_store = { workspace = true }
+sedona-expr = { workspace = true }
+sedona-geometry = { workspace = true }
+
+[dev-dependencies]
+datafusion = { workspace = true, features = ["sql"] }
+tempfile = { workspace = true }
+tokio = { workspace = true }
diff --git a/rust/sedona-pointcloud/src/laz/builder.rs 
b/rust/sedona-pointcloud/src/laz/builder.rs
new file mode 100644
index 00000000..0b39a40a
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/builder.rs
@@ -0,0 +1,660 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt::Debug, sync::Arc};
+
+use arrow_array::{
+    builder::{
+        ArrayBuilder, BinaryBuilder, BooleanBuilder, FixedSizeBinaryBuilder, 
Float32Builder,
+        Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, 
UInt16Builder,
+        UInt32Builder, UInt64Builder, UInt8Builder,
+    },
+    Array, ArrayRef, BooleanArray, FixedSizeBinaryArray, Float32Array, 
Float64Array, StructArray,
+    UInt16Array, UInt8Array,
+};
+use arrow_buffer::ScalarBuffer;
+use arrow_schema::{ArrowError, DataType};
+use geoarrow_array::{
+    array::{CoordBuffer, PointArray, SeparatedCoordBuffer},
+    GeoArrowArray,
+};
+use geoarrow_schema::Dimension;
+use las::{Header, Point};
+
+use crate::{
+    laz::{metadata::ExtraAttribute, options::LasExtraBytes, 
schema::try_schema_from_header},
+    options::GeometryEncoding,
+};
+
+#[derive(Debug)]
+pub struct RowBuilder {
+    x: Float64Builder,
+    y: Float64Builder,
+    z: Float64Builder,
+    intensity: UInt16Builder,
+    return_number: UInt8Builder,
+    number_of_returns: UInt8Builder,
+    is_synthetic: BooleanBuilder,
+    is_key_point: BooleanBuilder,
+    is_withheld: BooleanBuilder,
+    is_overlap: BooleanBuilder,
+    scanner_channel: UInt8Builder,
+    scan_direction: UInt8Builder,
+    is_edge_of_flight_line: BooleanBuilder,
+    classification: UInt8Builder,
+    user_data: UInt8Builder,
+    scan_angle: Float32Builder,
+    point_source_id: UInt16Builder,
+    gps_time: Float64Builder,
+    red: UInt16Builder,
+    green: UInt16Builder,
+    blue: UInt16Builder,
+    nir: UInt16Builder,
+    extra: FixedSizeBinaryBuilder,
+    header: Arc<Header>,
+    geometry_encoding: GeometryEncoding,
+    extra_bytes: LasExtraBytes,
+    extra_attributes: Arc<Vec<ExtraAttribute>>,
+}
+
+impl RowBuilder {
+    pub fn new(capacity: usize, header: Arc<Header>) -> Self {
+        Self {
+            x: Float64Array::builder(capacity),
+            y: Float64Array::builder(capacity),
+            z: Float64Array::builder(capacity),
+            intensity: UInt16Array::builder(capacity),
+            return_number: UInt8Array::builder(capacity),
+            number_of_returns: UInt8Array::builder(capacity),
+            is_synthetic: BooleanArray::builder(capacity),
+            is_key_point: BooleanArray::builder(capacity),
+            is_withheld: BooleanArray::builder(capacity),
+            is_overlap: BooleanArray::builder(capacity),
+            scanner_channel: UInt8Array::builder(capacity),
+            scan_direction: UInt8Array::builder(capacity),
+            is_edge_of_flight_line: BooleanArray::builder(capacity),
+            classification: UInt8Array::builder(capacity),
+            user_data: UInt8Array::builder(capacity),
+            scan_angle: Float32Array::builder(capacity),
+            point_source_id: UInt16Array::builder(capacity),
+            gps_time: Float64Array::builder(capacity),
+            red: UInt16Array::builder(capacity),
+            green: UInt16Array::builder(capacity),
+            blue: UInt16Array::builder(capacity),
+            nir: UInt16Array::builder(capacity),
+            extra: FixedSizeBinaryBuilder::with_capacity(
+                capacity,
+                header.point_format().extra_bytes as i32,
+            ),
+
+            header,
+            geometry_encoding: Default::default(),
+            extra_bytes: Default::default(),
+            extra_attributes: Arc::new(Vec::new()),
+        }
+    }
+
+    pub fn with_geometry_encoding(mut self, geometry_encoding: 
GeometryEncoding) -> Self {
+        self.geometry_encoding = geometry_encoding;
+        self
+    }
+
+    pub fn with_extra_attributes(
+        mut self,
+        attributes: Arc<Vec<ExtraAttribute>>,
+        extra_bytes: LasExtraBytes,
+    ) -> Self {
+        self.extra_attributes = attributes;
+        self.extra_bytes = extra_bytes;
+        self
+    }
+
+    pub fn append(&mut self, p: Point) {
+        self.x.append_value(p.x);
+        self.y.append_value(p.y);
+        self.z.append_value(p.z);
+        self.intensity.append_option(Some(p.intensity));
+        self.return_number.append_value(p.return_number);
+        self.number_of_returns.append_value(p.number_of_returns);
+        self.is_synthetic.append_value(p.is_synthetic);
+        self.is_key_point.append_value(p.is_key_point);
+        self.is_withheld.append_value(p.is_withheld);
+        self.is_overlap.append_value(p.is_overlap);
+        self.scanner_channel.append_value(p.scanner_channel);
+        self.scan_direction.append_value(p.scan_direction as u8);
+        self.is_edge_of_flight_line
+            .append_value(p.is_edge_of_flight_line);
+        self.classification.append_value(u8::from(p.classification));
+        self.user_data.append_value(p.user_data);
+        self.scan_angle.append_value(p.scan_angle);
+        self.point_source_id.append_value(p.point_source_id);
+        if self.header.point_format().has_gps_time {
+            self.gps_time.append_value(p.gps_time.unwrap());
+        }
+        if self.header.point_format().has_color {
+            let color = p.color.unwrap();
+            self.red.append_value(color.red);
+            self.green.append_value(color.green);
+            self.blue.append_value(color.blue);
+        }
+        if self.header.point_format().has_nir {
+            self.nir.append_value(p.nir.unwrap());
+        }
+        if self.header.point_format().extra_bytes > 0 {
+            self.extra.append_value(p.extra_bytes).unwrap();
+        }
+    }
+
+    /// Note: returns StructArray to allow nesting within another array if 
desired
+    pub fn finish(&mut self) -> Result<StructArray, ArrowError> {
+        let schema =
+            try_schema_from_header(&self.header, self.geometry_encoding, 
self.extra_bytes)?;
+
+        let mut columns = match self.geometry_encoding {
+            GeometryEncoding::Plain => vec![
+                Arc::new(self.x.finish()) as ArrayRef,
+                Arc::new(self.y.finish()) as ArrayRef,
+                Arc::new(self.z.finish()) as ArrayRef,
+            ],
+            GeometryEncoding::Wkb => {
+                const POINT_SIZE: usize = 29;
+
+                let n: usize = self.x.len();
+
+                let mut builder = BinaryBuilder::with_capacity(n, n * 
POINT_SIZE);
+
+                let x = self.x.finish();
+                let y = self.y.finish();
+                let z = self.z.finish();
+
+                let mut wkb_bytes = [0_u8; POINT_SIZE];
+                wkb_bytes[0] = 0x01; // Little-endian
+                wkb_bytes[1..5].copy_from_slice(&[0xE9, 0x03, 0x00, 0x00]); // 
Point Z type (1001)
+
+                for i in 0..n {
+                    let x = unsafe { x.value_unchecked(i) };
+                    let y = unsafe { y.value_unchecked(i) };
+                    let z = unsafe { z.value_unchecked(i) };
+
+                    
wkb_bytes[5..13].copy_from_slice(x.to_le_bytes().as_slice());
+                    
wkb_bytes[13..21].copy_from_slice(y.to_le_bytes().as_slice());
+                    
wkb_bytes[21..29].copy_from_slice(z.to_le_bytes().as_slice());
+
+                    builder.append_value(wkb_bytes);
+                }
+
+                vec![Arc::new(builder.finish()) as ArrayRef]
+            }
+            GeometryEncoding::Native => {
+                let buffers = [
+                    self.x.finish().into_parts().1,
+                    self.y.finish().into_parts().1,
+                    self.z.finish().into_parts().1,
+                    ScalarBuffer::from(vec![]),
+                ];
+                let coords = 
CoordBuffer::Separated(SeparatedCoordBuffer::from_array(
+                    buffers,
+                    Dimension::XYZ,
+                )?);
+                let points = PointArray::new(coords, None, Default::default());
+                vec![points.to_array_ref()]
+            }
+        };
+
+        columns.extend([
+            Arc::new(self.intensity.finish()) as ArrayRef,
+            Arc::new(self.return_number.finish()) as ArrayRef,
+            Arc::new(self.number_of_returns.finish()) as ArrayRef,
+            Arc::new(self.is_synthetic.finish()) as ArrayRef,
+            Arc::new(self.is_key_point.finish()) as ArrayRef,
+            Arc::new(self.is_withheld.finish()) as ArrayRef,
+            Arc::new(self.is_overlap.finish()) as ArrayRef,
+            Arc::new(self.scanner_channel.finish()) as ArrayRef,
+            Arc::new(self.scan_direction.finish()) as ArrayRef,
+            Arc::new(self.is_edge_of_flight_line.finish()) as ArrayRef,
+            Arc::new(self.classification.finish()) as ArrayRef,
+            Arc::new(self.user_data.finish()) as ArrayRef,
+            Arc::new(self.scan_angle.finish()) as ArrayRef,
+            Arc::new(self.point_source_id.finish()) as ArrayRef,
+        ]);
+        if self.header.point_format().has_gps_time {
+            columns.push(Arc::new(self.gps_time.finish()) as ArrayRef);
+        }
+        if self.header.point_format().has_color {
+            columns.extend([
+                Arc::new(self.red.finish()) as ArrayRef,
+                Arc::new(self.green.finish()) as ArrayRef,
+                Arc::new(self.blue.finish()) as ArrayRef,
+            ]);
+        }
+        if self.header.point_format().has_nir {
+            columns.push(Arc::new(self.nir.finish()) as ArrayRef);
+        }
+
+        // extra bytes
+        let num_extra_bytes = self.header.point_format().extra_bytes as usize;
+        if num_extra_bytes > 0 {
+            match self.extra_bytes {
+                LasExtraBytes::Typed => {
+                    let extra = self.extra.finish();
+
+                    let mut pos = 0;
+
+                    for attribute in self.extra_attributes.iter() {
+                        pos += build_attribute(attribute, pos, &extra, &mut 
columns)?;
+                    }
+                }
+                LasExtraBytes::Blob => 
columns.push(Arc::new(self.extra.finish())),
+                LasExtraBytes::Ignore => (),
+            }
+        }
+
+        Ok(StructArray::new(schema.fields.to_owned(), columns, None))
+    }
+}
+
+fn build_attribute(
+    attribute: &ExtraAttribute,
+    pos: usize,
+    extra: &FixedSizeBinaryArray,
+    columns: &mut Vec<ArrayRef>,
+) -> Result<usize, ArrowError> {
+    let scale = attribute.scale.unwrap_or(1.0);
+    let offset = attribute.offset.unwrap_or(0.0);
+
+    let width = if let DataType::FixedSizeBinary(width) = attribute.data_type {
+        width as usize
+    } else {
+        attribute.data_type.primitive_width().unwrap()
+    };
+
+    let iter = extra.iter().map(|b| &b.unwrap()[pos..pos + width]);
+
+    match &attribute.data_type {
+        DataType::FixedSizeBinary(_) => {
+            let data = FixedSizeBinaryArray::try_from_iter(iter)?;
+            columns.push(Arc::new(data) as ArrayRef)
+        }
+        DataType::Int8 => {
+            let mut builder = Int8Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(i64::from_le_bytes);
+
+            for d in iter {
+                let mut v = i8::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v as i64 {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as i8;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::Int16 => {
+            let mut builder = Int16Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(i64::from_le_bytes);
+
+            for d in iter {
+                let mut v = i16::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v as i64 {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as i16;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::Int32 => {
+            let mut builder = Int32Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(i64::from_le_bytes);
+
+            for d in iter {
+                let mut v = i32::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v as i64 {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as i32;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::Int64 => {
+            let mut builder = Int64Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(i64::from_le_bytes);
+
+            for d in iter {
+                let mut v = i64::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as i64;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::UInt8 => {
+            let mut builder = UInt8Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(u64::from_le_bytes);
+
+            for d in iter {
+                let mut v = u8::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v as u64 {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as u8;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::UInt16 => {
+            let mut builder = UInt16Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(u64::from_le_bytes);
+
+            for d in iter {
+                let mut v = u16::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v as u64 {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as u16;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::UInt32 => {
+            let mut builder = UInt32Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(u64::from_le_bytes);
+
+            for d in iter {
+                let mut v = u32::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v as u64 {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as u32;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::UInt64 => {
+            let mut builder = UInt64Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(u64::from_le_bytes);
+
+            for d in iter {
+                let mut v = u64::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as u64;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::Float32 => {
+            let mut builder = Float32Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(f64::from_le_bytes);
+
+            for d in iter {
+                let mut v = f32::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v as f64 {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = (v as f64 * scale + offset) as f32;
+                }
+                builder.append_value(v)
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+        DataType::Float64 => {
+            let mut builder = Float64Builder::with_capacity(extra.len());
+            let no_data = attribute.no_data.map(f64::from_le_bytes);
+
+            for d in iter {
+                let mut v = f64::from_le_bytes(d.try_into().unwrap());
+                if let Some(no_data) = no_data {
+                    if no_data == v {
+                        builder.append_null();
+                        continue;
+                    }
+                }
+                if attribute.scale.is_some() || attribute.offset.is_some() {
+                    v = v * scale + offset;
+                }
+                builder.append_value(v);
+            }
+
+            columns.push(Arc::new(builder.finish()) as ArrayRef)
+        }
+
+        dt => {
+            return Err(ArrowError::ExternalError(
+                format!("Unsupported data type for extra bytes: 
`{dt}`").into(),
+            ))
+        }
+    }
+
+    Ok(width)
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{fs::File, sync::Arc};
+
+    use arrow_array::{
+        cast::AsArray,
+        types::{
+            Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, 
Int8Type, UInt16Type,
+            UInt32Type, UInt64Type, UInt8Type,
+        },
+    };
+    use datafusion_datasource::PartitionedFile;
+    use las::{point::Format, Builder, Writer};
+    use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
+
+    use crate::{
+        laz::{options::LasExtraBytes, reader::LazFileReaderFactory},
+        options::PointcloudOptions,
+    };
+
+    #[tokio::test]
+    async fn point_formats() {
+        let tmpdir = tempfile::tempdir().unwrap();
+
+        for format in 0..=10 {
+            let tmp_path = tmpdir.path().join("format.laz");
+            let tmp_file = File::create(&tmp_path).unwrap();
+
+            // create laz file
+            let mut builder = Builder::from((1, 4));
+            builder.point_format = Format::new(format).unwrap();
+            builder.point_format.is_compressed = true;
+            let header = builder.into_header().unwrap();
+            let mut writer = Writer::new(tmp_file, header).unwrap();
+            writer.close().unwrap();
+
+            // read batch with `LazFileReader`
+            let store = LocalFileSystem::new();
+            let location = Path::from_filesystem_path(tmp_path).unwrap();
+            let object = store.head(&location).await.unwrap();
+
+            let laz_file_reader = LazFileReaderFactory::new(Arc::new(store), 
None)
+                .create_reader(
+                    PartitionedFile::new(location, object.size),
+                    PointcloudOptions::default(),
+                )
+                .unwrap();
+            let metadata = laz_file_reader.get_metadata().await.unwrap();
+
+            let batch = laz_file_reader
+                .get_batch(&metadata.chunk_table[0])
+                .await
+                .unwrap();
+
+            match format {
+                0 => assert_eq!(batch.num_columns(), 17),
+                1 | 4 | 6 | 9 => assert_eq!(batch.num_columns(), 18),
+                2 => assert_eq!(batch.num_columns(), 20),
+                3 | 5 | 7 => assert_eq!(batch.num_columns(), 21),
+                8 | 10 => assert_eq!(batch.num_columns(), 22),
+                _ => unreachable!(),
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn extra_attributes() {
+        // file with extra attributes generated with `tests/data/generate.py`
+        let extra_path = "tests/data/extra.laz";
+
+        // read batch with `LazFileReader`
+        let store = LocalFileSystem::new();
+        let location = Path::from_filesystem_path(extra_path).unwrap();
+        let object = store.head(&location).await.unwrap();
+
+        let laz_file_reader = LazFileReaderFactory::new(Arc::new(store), None)
+            .create_reader(
+                PartitionedFile::new(location, object.size),
+                
PointcloudOptions::default().with_las_extra_bytes(LasExtraBytes::Typed),
+            )
+            .unwrap();
+        let metadata = laz_file_reader.get_metadata().await.unwrap();
+
+        let batch = laz_file_reader
+            .get_batch(&metadata.chunk_table[0])
+            .await
+            .unwrap();
+        assert_eq!(batch.num_rows(), 1);
+        assert_eq!(batch.num_columns(), 48);
+
+        let schema = batch.schema();
+        for field in schema.fields() {
+            let name = field.name();
+
+            match name.split_once('_') {
+                Some((dt, kind)) => {
+                    if !["plain", "scaled", "nodata"].contains(&kind) {
+                        continue;
+                    }
+
+                    let array = batch.column_by_name(name).unwrap();
+
+                    if kind == "nodata" {
+                        assert_eq!(array.null_count(), 1);
+                        continue;
+                    }
+
+                    assert_eq!(array.null_count(), 0);
+
+                    match dt {
+                        "uint8" => {
+                            let array = array.as_primitive::<UInt8Type>();
+                            assert_eq!(array.value(0), 21u8);
+                        }
+                        "int8" => {
+                            let array = array.as_primitive::<Int8Type>();
+                            assert_eq!(array.value(0), 21i8);
+                        }
+                        "uint16" => {
+                            let array = array.as_primitive::<UInt16Type>();
+                            assert_eq!(array.value(0), 21u16);
+                        }
+                        "int16" => {
+                            let array = array.as_primitive::<Int16Type>();
+                            assert_eq!(array.value(0), 21i16);
+                        }
+                        "uint32" => {
+                            let array = array.as_primitive::<UInt32Type>();
+                            assert_eq!(array.value(0), 21u32);
+                        }
+                        "int32" => {
+                            let array = array.as_primitive::<Int32Type>();
+                            assert_eq!(array.value(0), 21i32);
+                        }
+                        "uint64" => {
+                            let array = array.as_primitive::<UInt64Type>();
+                            assert_eq!(array.value(0), 21u64);
+                        }
+                        "int64" => {
+                            let array = array.as_primitive::<Int64Type>();
+                            assert_eq!(array.value(0), 21i64);
+                        }
+                        "float32" => {
+                            let array = array.as_primitive::<Float32Type>();
+                            assert_eq!(array.value(0), 21f32);
+                        }
+                        "float64" => {
+                            let array = array.as_primitive::<Float64Type>();
+                            assert_eq!(array.value(0), 21f64);
+                        }
+                        _ => unreachable!("unexpected data type"),
+                    }
+                }
+                None => continue,
+            }
+        }
+    }
+}
diff --git a/rust/sedona-pointcloud/src/laz/format.rs 
b/rust/sedona-pointcloud/src/laz/format.rs
new file mode 100644
index 00000000..088d0ca6
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/format.rs
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, collections::HashMap, fmt, sync::Arc};
+
+use arrow_schema::{Schema, SchemaRef};
+use datafusion_catalog::{memory::DataSourceExec, Session};
+use datafusion_common::{
+    config::ExtensionOptions, error::DataFusionError, internal_err,
+    parsers::CompressionTypeVariant, GetExt, Statistics,
+};
+use datafusion_datasource::{
+    file::FileSource,
+    file_compression_type::FileCompressionType,
+    file_format::{FileFormat, FileFormatFactory},
+    file_scan_config::FileScanConfig,
+};
+use datafusion_physical_plan::ExecutionPlan;
+use futures::{StreamExt, TryStreamExt};
+use object_store::{ObjectMeta, ObjectStore};
+
+use crate::{
+    laz::{metadata::LazMetadataReader, source::LazSource},
+    options::PointcloudOptions,
+};
+
+const DEFAULT_LAZ_EXTENSION: &str = ".laz";
+
+/// Factory struct used to create [LazFormat]
+#[derive(Default)]
+pub struct LazFormatFactory {
+    // inner options for LAZ
+    pub options: Option<PointcloudOptions>,
+}
+
+impl LazFormatFactory {
+    /// Creates an instance of [LazFormatFactory]
+    pub fn new() -> Self {
+        Self { options: None }
+    }
+
+    /// Creates an instance of [LazFormatFactory] with customized default 
options
+    pub fn new_with(options: PointcloudOptions) -> Self {
+        Self {
+            options: Some(options),
+        }
+    }
+}
+
+impl FileFormatFactory for LazFormatFactory {
+    fn create(
+        &self,
+        state: &dyn Session,
+        format_options: &HashMap<String, String>,
+    ) -> Result<Arc<dyn FileFormat>, DataFusionError> {
+        let mut options = state
+            .config_options()
+            .extensions
+            .get::<PointcloudOptions>()
+            .or_else(|| 
state.table_options().extensions.get::<PointcloudOptions>())
+            .cloned()
+            .or(self.options.clone())
+            .unwrap_or_default();
+
+        for (k, v) in format_options {
+            options.set(k, v)?;
+        }
+
+        Ok(Arc::new(LazFormat::default().with_options(options)))
+    }
+
+    fn default(&self) -> Arc<dyn FileFormat> {
+        Arc::new(LazFormat::default())
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+}
+
+impl GetExt for LazFormatFactory {
+    fn get_ext(&self) -> String {
+        // Removes the dot, i.e. ".laz" -> "laz"
+        DEFAULT_LAZ_EXTENSION[1..].to_string()
+    }
+}
+
+impl fmt::Debug for LazFormatFactory {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("LazFormatFactory")
+            .field("LazFormatFactory", &self.options)
+            .finish()
+    }
+}
+
+/// The LAZ `FileFormat` implementation
+#[derive(Debug, Default)]
+pub struct LazFormat {
+    pub options: PointcloudOptions,
+}
+
+impl LazFormat {
+    pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+        self.options = options;
+        self
+    }
+}
+
+#[async_trait::async_trait]
+impl FileFormat for LazFormat {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_ext(&self) -> String {
+        LazFormatFactory::new().get_ext()
+    }
+
+    fn get_ext_with_compression(
+        &self,
+        file_compression_type: &FileCompressionType,
+    ) -> Result<String, DataFusionError> {
+        let ext = self.get_ext();
+        match file_compression_type.get_variant() {
+            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
+            _ => internal_err!("Laz FileFormat does not support compression."),
+        }
+    }
+
+    fn compression_type(&self) -> Option<FileCompressionType> {
+        Some(FileCompressionType::UNCOMPRESSED)
+    }
+
+    async fn infer_schema(
+        &self,
+        state: &dyn Session,
+        store: &Arc<dyn ObjectStore>,
+        objects: &[ObjectMeta],
+    ) -> Result<SchemaRef, DataFusionError> {
+        let file_metadata_cache = 
state.runtime_env().cache_manager.get_file_metadata_cache();
+
+        let mut schemas: Vec<_> = futures::stream::iter(objects)
+            .map(|object_meta| async {
+                let loc_path = object_meta.location.clone();
+
+                let schema = LazMetadataReader::new(store, object_meta)
+                    
.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
+                    .with_options(self.options.clone())
+                    .fetch_schema()
+                    .await?;
+
+                Ok::<_, DataFusionError>((loc_path, schema))
+            })
+            .boxed() // Workaround 
https://github.com/rust-lang/rust/issues/64552
+            // fetch schemas concurrently, if requested
+            .buffered(state.config_options().execution.meta_fetch_concurrency)
+            .try_collect()
+            .await?;
+
+        schemas.sort_by(|(location1, _), (location2, _)| 
location1.cmp(location2));
+
+        let schemas = schemas
+            .into_iter()
+            .map(|(_, schema)| schema)
+            .collect::<Vec<_>>();
+
+        let schema = Schema::try_merge(schemas)?;
+
+        Ok(Arc::new(schema))
+    }
+
+    async fn infer_stats(
+        &self,
+        state: &dyn Session,
+        store: &Arc<dyn ObjectStore>,
+        table_schema: SchemaRef,
+        object: &ObjectMeta,
+    ) -> Result<Statistics, DataFusionError> {
+        let file_metadata_cache = 
state.runtime_env().cache_manager.get_file_metadata_cache();
+        LazMetadataReader::new(store, object)
+            .with_options(self.options.clone())
+            .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
+            .fetch_statistics(&table_schema)
+            .await
+    }
+
+    async fn create_physical_plan(
+        &self,
+        _state: &dyn Session,
+        conf: FileScanConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
+        Ok(DataSourceExec::from_data_source(conf))
+    }
+
+    fn file_source(&self) -> Arc<dyn FileSource> {
+        Arc::new(LazSource::default().with_options(self.options.clone()))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::{collections::HashMap, fs::File, sync::Arc};
+
+    use datafusion::{execution::SessionStateBuilder, prelude::SessionContext};
+    use datafusion_datasource::file_format::FileFormatFactory;
+    use las::{point::Format, Builder, Writer};
+
+    use crate::laz::format::{LazFormat, LazFormatFactory};
+
+    fn setup_context() -> SessionContext {
+        let file_format = Arc::new(LazFormatFactory::new());
+
+        let mut state = SessionStateBuilder::new().build();
+        state.register_file_format(file_format, true).unwrap();
+
+        SessionContext::new_with_state(state).enable_url_table()
+    }
+
+    #[tokio::test]
+    async fn laz_format_factory() {
+        let ctx = SessionContext::new();
+        let format_factory = Arc::new(LazFormatFactory::new());
+        let dyn_format = format_factory
+            .create(&ctx.state(), &HashMap::new())
+            .unwrap();
+        assert!(dyn_format.as_any().downcast_ref::<LazFormat>().is_some());
+    }
+
+    #[tokio::test]
+    async fn projection() {
+        let ctx = setup_context();
+        let df = ctx
+            .sql("SELECT x, y, z FROM 'tests/data/extra.laz'")
+            .await
+            .unwrap();
+
+        assert_eq!(df.schema().fields().len(), 3);
+    }
+
+    #[tokio::test]
+    async fn multiple_files() {
+        let tmpdir = tempfile::tempdir().unwrap();
+
+        for i in 0..4 {
+            let tmp_path = tmpdir.path().join(format!("tmp{i}.laz"));
+            let tmp_file = File::create(&tmp_path).unwrap();
+
+            // create laz file with one point
+            let mut builder = Builder::from((1, 4));
+            builder.point_format = Format::new(0).unwrap();
+            builder.point_format.is_compressed = true;
+            let header = builder.into_header().unwrap();
+            let mut writer = Writer::new(tmp_file, header).unwrap();
+            writer.write_point(Default::default()).unwrap();
+            writer.close().unwrap();
+        }
+
+        let ctx = setup_context();
+        let table = tmpdir.path().to_str().unwrap();
+        let df = ctx.sql(&format!("SELECT * FROM '{table}'",)).await.unwrap();
+
+        assert_eq!(df.count().await.unwrap(), 4);
+    }
+
+    #[tokio::test]
+    async fn file_that_does_not_exist() {
+        let ctx = setup_context();
+        let err = ctx
+            .sql("SELECT * FROM 'nonexisting.laz'")
+            .await
+            .unwrap_err();
+        assert_eq!(
+            err.message(),
+            "Error during planning: table 'datafusion.public.nonexisting.laz' 
not found"
+        );
+    }
+}
diff --git a/rust/sedona-pointcloud/src/laz/metadata.rs 
b/rust/sedona-pointcloud/src/laz/metadata.rs
new file mode 100644
index 00000000..3c4fbfa9
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/metadata.rs
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    any::Any,
+    collections::HashMap,
+    error::Error,
+    io::{Cursor, Read},
+    ops::Range,
+    sync::Arc,
+};
+
+use arrow_schema::{DataType, Schema, SchemaRef};
+use datafusion_common::{
+    error::DataFusionError, scalar::ScalarValue, stats::Precision, 
ColumnStatistics, Statistics,
+};
+use datafusion_execution::cache::cache_manager::{FileMetadata, 
FileMetadataCache};
+use las::{
+    raw::{Header as RawHeader, Vlr as RawVlr},
+    Builder, Header, Vlr,
+};
+use laz::laszip::ChunkTable;
+use object_store::{ObjectMeta, ObjectStore};
+
+use crate::{laz::schema::try_schema_from_header, options::PointcloudOptions};
+
+/// Laz chunk metadata
+#[derive(Debug, Clone)]
+pub struct ChunkMeta {
+    pub num_points: u64,
+    pub point_offset: u64,
+    pub byte_range: Range<u64>,
+}
+
+/// Laz metadata
+#[derive(Debug, Clone)]
+pub struct LazMetadata {
+    pub header: Arc<Header>,
+    pub chunk_table: Vec<ChunkMeta>,
+    pub extra_attributes: Arc<Vec<ExtraAttribute>>,
+}
+
+impl FileMetadata for LazMetadata {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn memory_size(&self) -> usize {
+        std::mem::size_of::<Header>()
+            + self
+                .header
+                .all_vlrs()
+                .map(|vlr| vlr.data.len())
+                .sum::<usize>()
+            + self.chunk_table.capacity() * std::mem::size_of::<ChunkMeta>()
+            + self.extra_attributes.capacity() * 
std::mem::size_of::<ExtraAttribute>()
+    }
+
+    fn extra_info(&self) -> HashMap<String, String> {
+        HashMap::new()
+    }
+}
+
+/// Reader for laz file metadata in object storage.
+pub struct LazMetadataReader<'a> {
+    store: &'a dyn ObjectStore,
+    object_meta: &'a ObjectMeta,
+    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+    options: PointcloudOptions,
+}
+
+impl<'a> LazMetadataReader<'a> {
+    pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> 
Self {
+        Self {
+            store,
+            object_meta,
+            file_metadata_cache: None,
+            options: Default::default(),
+        }
+    }
+
+    /// set file metadata cache
+    pub fn with_file_metadata_cache(
+        mut self,
+        file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+    ) -> Self {
+        self.file_metadata_cache = file_metadata_cache;
+        self
+    }
+
+    /// set table options
+    pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+        self.options = options;
+        self
+    }
+
+    /// Fetch header
+    pub async fn fetch_header(&self) -> Result<Header, DataFusionError> {
+        fetch_header(self.store, self.object_meta)
+            .await
+            .map_err(DataFusionError::External)
+    }
+
+    /// Fetch laz metadata from the remote object store
+    pub async fn fetch_metadata(&self) -> Result<Arc<LazMetadata>, 
DataFusionError> {
+        let Self {
+            store,
+            object_meta,
+            file_metadata_cache,
+            options: _,
+        } = self;
+
+        if let Some(las_file_metadata) = file_metadata_cache
+            .as_ref()
+            .and_then(|file_metadata_cache| 
file_metadata_cache.get(object_meta))
+            .and_then(|file_metadata| {
+                file_metadata
+                    .as_any()
+                    .downcast_ref::<LazMetadata>()
+                    .map(|laz_file_metadata| 
Arc::new(laz_file_metadata.to_owned()))
+            })
+        {
+            return Ok(las_file_metadata);
+        }
+
+        let header = self.fetch_header().await?;
+        let extra_attributes = extra_bytes_attributes(&header)?;
+        let chunk_table = chunk_table(*store, object_meta, &header).await?;
+
+        let metadata = Arc::new(LazMetadata {
+            header: Arc::new(header),
+            chunk_table,
+            extra_attributes: Arc::new(extra_attributes),
+        });
+
+        if let Some(file_metadata_cache) = file_metadata_cache {
+            file_metadata_cache.put(object_meta, metadata.clone());
+        }
+
+        Ok(metadata)
+    }
+
+    /// Read and parse the schema of the laz file
+    pub async fn fetch_schema(&mut self) -> Result<Schema, DataFusionError> {
+        let metadata = self.fetch_metadata().await?;
+
+        let schema = try_schema_from_header(
+            &metadata.header,
+            self.options.geometry_encoding,
+            self.options.las.extra_bytes,
+        )?;
+
+        Ok(schema)
+    }
+
+    /// Fetch the metadata from the laz file via [`Self::fetch_metadata`] and 
extracts
+    /// the statistics in the metadata
+    pub async fn fetch_statistics(
+        &self,
+        table_schema: &SchemaRef,
+    ) -> Result<Statistics, DataFusionError> {
+        let metadata = self.fetch_metadata().await?;
+
+        let mut statistics = Statistics::new_unknown(table_schema)
+            .with_num_rows(Precision::Exact(metadata.header.number_of_points() 
as usize))
+            .with_total_byte_size(Precision::Exact(
+                metadata
+                    .chunk_table
+                    .iter()
+                    .map(|meta| meta.byte_range.end - meta.byte_range.start)
+                    .sum::<u64>() as usize,
+            ));
+
+        let bounds = metadata.header.bounds();
+        for field in table_schema.fields() {
+            let cs = match field.name().as_str() {
+                "x" => ColumnStatistics::new_unknown()
+                    
.with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.x))))
+                    
.with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.x))))
+                    .with_null_count(Precision::Exact(0)),
+                "y" => ColumnStatistics::new_unknown()
+                    
.with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.y))))
+                    
.with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.y))))
+                    .with_null_count(Precision::Exact(0)),
+                "z" => ColumnStatistics::new_unknown()
+                    
.with_min_value(Precision::Exact(ScalarValue::Float64(Some(bounds.min.z))))
+                    
.with_max_value(Precision::Exact(ScalarValue::Float64(Some(bounds.max.z))))
+                    .with_null_count(Precision::Exact(0)),
+                _ => ColumnStatistics::new_unknown(),
+            };
+
+            statistics = statistics.add_column_statistics(cs);
+        }
+
+        Ok(statistics)
+    }
+}
+
+pub(crate) async fn fetch_header(
+    store: &(impl ObjectStore + ?Sized),
+    object_meta: &ObjectMeta,
+) -> Result<Header, Box<dyn Error + Send + Sync>> {
+    let location = &object_meta.location;
+
+    // Header
+    let bytes = store.get_range(location, 0..375).await?;
+    let reader = Cursor::new(bytes);
+    let raw_header = RawHeader::read_from(reader)?;
+
+    let header_size = raw_header.header_size as u64;
+    let offset_to_point_data = raw_header.offset_to_point_data as u64;
+    let num_vlr = raw_header.number_of_variable_length_records;
+    let evlr = raw_header.evlr;
+
+    let mut builder = Builder::new(raw_header)?;
+
+    // VLRs
+    let bytes = store
+        .get_range(location, header_size..offset_to_point_data)
+        .await?;
+    let mut reader = Cursor::new(bytes);
+
+    for _ in 0..num_vlr {
+        let vlr = RawVlr::read_from(&mut reader, false).map(Vlr::new)?;
+        builder.vlrs.push(vlr);
+    }
+
+    reader.read_to_end(&mut builder.vlr_padding)?;
+
+    // EVLRs
+    if let Some(evlr) = evlr {
+        let mut start = evlr.start_of_first_evlr;
+
+        for _ in 0..evlr.number_of_evlrs {
+            let mut end = start + 60;
+
+            let bytes = store.get_range(location, start..end).await?;
+
+            end += u64::from_le_bytes(bytes[20..28].try_into()?);
+
+            let bytes = store.get_range(location, start..end).await?;
+            let mut reader = Cursor::new(bytes);
+            let evlr = RawVlr::read_from(&mut reader, true).map(Vlr::new)?;
+
+            builder.evlrs.push(evlr);
+
+            start = end;
+        }
+    }
+
+    Ok(builder.into_header()?)
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct ExtraAttribute {
+    pub data_type: DataType,
+    pub no_data: Option<[u8; 8]>,
+    pub scale: Option<f64>,
+    pub offset: Option<f64>,
+}
+
+pub(crate) fn extra_bytes_attributes(
+    header: &Header,
+) -> Result<Vec<ExtraAttribute>, Box<dyn Error + Send + Sync>> {
+    let mut attributes = Vec::new();
+
+    for vlr in header.all_vlrs() {
+        if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) {
+            continue;
+        }
+
+        for bytes in vlr.data.chunks(192) {
+            // data type
+            let data_type = match bytes[2] {
+                0 => DataType::FixedSizeBinary(bytes[3] as i32),
+                1 => DataType::UInt8,
+                2 => DataType::Int8,
+                3 => DataType::UInt16,
+                4 => DataType::Int16,
+                5 => DataType::UInt32,
+                6 => DataType::Int32,
+                7 => DataType::UInt64,
+                8 => DataType::Int64,
+                9 => DataType::Float32,
+                10 => DataType::Float64,
+                11..=30 => return Err("deprecated extra bytes data 
type".into()),
+                31..=255 => return Err("reserved extra butes data 
type".into()),
+            };
+
+            // no data
+            let no_data = if bytes[2] != 0 && bytes[3] & 1 == 1 {
+                Some(bytes[40..48].try_into().unwrap())
+            } else {
+                None
+            };
+
+            // scale
+            let scale = if bytes[2] != 0 && bytes[3] >> 3 & 1 == 1 {
+                Some(f64::from_le_bytes(bytes[112..120].try_into().unwrap()))
+            } else {
+                None
+            };
+
+            // offset
+            let offset = if bytes[2] != 0 && bytes[3] >> 4 & 1 == 1 {
+                Some(f64::from_le_bytes(bytes[136..144].try_into().unwrap()))
+            } else {
+                None
+            };
+
+            let attribute = ExtraAttribute {
+                data_type,
+                no_data,
+                scale,
+                offset,
+            };
+
+            attributes.push(attribute);
+        }
+    }
+
+    Ok(attributes)
+}
+
+pub(crate) async fn chunk_table(
+    store: &(impl ObjectStore + ?Sized),
+    object_meta: &ObjectMeta,
+    header: &Header,
+) -> Result<Vec<ChunkMeta>, Box<dyn Error + Send + Sync>> {
+    let num_points = header.number_of_points();
+    let mut point_offset = 0;
+
+    let vlr_len = header.vlrs().iter().map(|v| v.len(false)).sum::<usize>();
+    let header_size = header.version().header_size() as usize + 
header.padding().len();
+    let mut byte_offset = (header_size + vlr_len + header.vlr_padding().len()) 
as u64;
+
+    let laz_vlr = header.laz_vlr()?;
+
+    let ranges = [
+        byte_offset..byte_offset + 8,
+        object_meta.size - 8..object_meta.size,
+    ];
+    let bytes = store.get_ranges(&object_meta.location, &ranges).await?;
+    let mut table_offset = None;
+
+    let table_offset1 = 
i64::from_le_bytes(bytes[0].to_vec().try_into().unwrap()) as u64;
+    let table_offset2 = 
i64::from_le_bytes(bytes[1].to_vec().try_into().unwrap()) as u64;
+
+    if table_offset1 > byte_offset {
+        table_offset = Some(table_offset1);
+    } else if table_offset2 > byte_offset {
+        table_offset = Some(table_offset2);
+    }
+
+    let Some(table_offset) = table_offset else {
+        return Err("LAZ files without chunk table not supported (yet)".into());
+    };
+
+    if table_offset > object_meta.size {
+        return Err("LAZ file chunk table position is missing/bad".into());
+    }
+
+    let bytes = store
+        .get_range(&object_meta.location, table_offset..table_offset + 8)
+        .await?;
+
+    let num_chunks = 
u32::from_le_bytes(bytes[4..].to_vec().try_into().unwrap()) as u64;
+    let range = table_offset..table_offset + 8 + 8 * num_chunks;
+    let bytes = store.get_range(&object_meta.location, range).await?;
+
+    let mut reader = Cursor::new(bytes);
+    let variable_size = laz_vlr.uses_variable_size_chunks();
+    let chunk_table = ChunkTable::read(&mut reader, variable_size)?;
+    assert_eq!(chunk_table.len(), num_chunks as usize);
+
+    let mut chunks = Vec::with_capacity(num_chunks as usize);
+    let chunk_size = laz_vlr.chunk_size() as u64;
+    byte_offset += 8;
+
+    for chunk_table_entry in &chunk_table {
+        let point_count = if variable_size {
+            chunk_table_entry.point_count
+        } else {
+            chunk_size.min(num_points - point_offset)
+        };
+
+        let chunk = ChunkMeta {
+            num_points: point_count,
+            point_offset,
+            byte_range: byte_offset..byte_offset + 
chunk_table_entry.byte_count,
+        };
+        chunks.push(chunk);
+        point_offset += point_count;
+        byte_offset += chunk_table_entry.byte_count;
+    }
+
+    Ok(chunks)
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::File;
+
+    use las::{point::Format, Builder, Reader, Writer};
+    use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
+
+    use crate::laz::metadata::LazMetadataReader;
+
+    #[tokio::test]
+    async fn header_basic_e2e() {
+        let tmpdir = tempfile::tempdir().unwrap();
+
+        let tmp_path = tmpdir.path().join("tmp.laz");
+        let tmp_file = File::create(&tmp_path).unwrap();
+
+        // create laz file
+        let mut builder = Builder::from((1, 4));
+        builder.point_format = Format::new(1).unwrap();
+        builder.point_format.is_compressed = true;
+        let header = builder.into_header().unwrap();
+        let mut writer = Writer::new(tmp_file, header).unwrap();
+        writer.close().unwrap();
+
+        // read with `LazMetadataReader`
+        let store = LocalFileSystem::new();
+        let location = Path::from_filesystem_path(&tmp_path).unwrap();
+        let object_meta = store.head(&location).await.unwrap();
+        let metadata_reader = LazMetadataReader::new(&store, &object_meta);
+
+        // read with las `Reader`
+        let reader = Reader::from_path(&tmp_path).unwrap();
+
+        assert_eq!(
+            reader.header(),
+            &metadata_reader.fetch_header().await.unwrap()
+        );
+    }
+}
diff --git a/rust/sedona-pointcloud/src/laz/mod.rs 
b/rust/sedona-pointcloud/src/laz/mod.rs
new file mode 100644
index 00000000..b6fb48cd
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod builder;
+pub mod format;
+pub mod metadata;
+pub mod opener;
+pub mod options;
+pub mod reader;
+pub mod schema;
+pub mod source;
diff --git a/rust/sedona-pointcloud/src/laz/opener.rs 
b/rust/sedona-pointcloud/src/laz/opener.rs
new file mode 100644
index 00000000..fefa118c
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/opener.rs
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion_common::{error::DataFusionError, pruning::PrunableStatistics};
+use datafusion_datasource::{
+    file_stream::{FileOpenFuture, FileOpener},
+    PartitionedFile,
+};
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_pruning::PruningPredicate;
+use futures::StreamExt;
+
+use sedona_expr::spatial_filter::SpatialFilter;
+use sedona_geometry::bounding_box::BoundingBox;
+
+use crate::{
+    laz::{
+        reader::{LazFileReader, LazFileReaderFactory},
+        schema::try_schema_from_header,
+    },
+    options::PointcloudOptions,
+};
+
+pub struct LazOpener {
+    /// Column indexes in `table_schema` needed by the query
+    pub projection: Arc<[usize]>,
+    /// Optional limit on the number of rows to read
+    pub limit: Option<usize>,
+    pub predicate: Option<Arc<dyn PhysicalExpr>>,
+    /// Factory for instantiating laz reader
+    pub laz_file_reader_factory: Arc<LazFileReaderFactory>,
+    /// Table options
+    pub options: PointcloudOptions,
+    /// Target batch size
+    pub(crate) batch_size: usize,
+}
+
+impl FileOpener for LazOpener {
+    fn open(&self, file: PartitionedFile) -> Result<FileOpenFuture, 
DataFusionError> {
+        let projection = self.projection.clone();
+        let limit = self.limit;
+        let batch_size = self.batch_size;
+
+        let predicate = self.predicate.clone();
+
+        let laz_reader: Box<LazFileReader> = self
+            .laz_file_reader_factory
+            .create_reader(file.clone(), self.options.clone())?;
+
+        Ok(Box::pin(async move {
+            let metadata = laz_reader.get_metadata().await?;
+            let schema = Arc::new(try_schema_from_header(
+                &metadata.header,
+                laz_reader.options.geometry_encoding,
+                laz_reader.options.las.extra_bytes,
+            )?);
+
+            let pruning_predicate = predicate.and_then(|physical_expr| {
+                PruningPredicate::try_new(physical_expr, schema.clone()).ok()
+            });
+
+            // file pruning
+            if let Some(pruning_predicate) = &pruning_predicate {
+                // based on spatial filter
+                let spatial_filter = 
SpatialFilter::try_from_expr(pruning_predicate.orig_expr())?;
+                let bounds = metadata.header.bounds();
+                let bbox = BoundingBox::xyzm(
+                    (bounds.min.x, bounds.max.x),
+                    (bounds.min.y, bounds.max.y),
+                    Some((bounds.min.z, bounds.max.z).into()),
+                    None,
+                );
+                if !spatial_filter.filter_bbox("geometry").intersects(&bbox) {
+                    return Ok(futures::stream::empty().boxed());
+                }
+                // based on file statistics
+                if let Some(statistics) = file.statistics {
+                    let prunable_statistics = 
PrunableStatistics::new(vec![statistics], schema);
+                    if let Ok(filter) = 
pruning_predicate.prune(&prunable_statistics) {
+                        if !filter[0] {
+                            return Ok(futures::stream::empty().boxed());
+                        }
+                    }
+                }
+            }
+
+            // map chunk table
+            let chunk_table: Vec<_> = metadata
+                .chunk_table
+                .iter()
+                .filter(|chunk_meta| {
+                    file.range.as_ref().is_none_or(|range| {
+                        let offset = chunk_meta.byte_range.start;
+                        offset >= range.start as u64 && offset < range.end as 
u64
+                    })
+                })
+                .cloned()
+                .collect();
+
+            let mut row_count = 0;
+
+            let stream = async_stream::try_stream! {
+                for chunk_meta in chunk_table.into_iter() {
+                    // limit
+                    if let Some(limit) = limit {
+                        if row_count >= limit {
+                            break;
+                        }
+                    }
+
+                    // fetch batch
+                    let record_batch = 
laz_reader.get_batch(&chunk_meta).await?;
+                    let num_rows = record_batch.num_rows();
+                    row_count += num_rows;
+
+                    // project
+                    let record_batch = record_batch
+                        .project(&projection)
+                        .map_err(|e| DataFusionError::ArrowError(Box::new(e), 
None))?;
+
+                    // adhere to target batch size
+                    let mut offset = 0;
+
+                    loop {
+                        let length = batch_size.min(num_rows - offset);
+                        yield record_batch.slice(offset, length);
+
+                        offset += batch_size;
+                        if offset >= num_rows {
+                            break;
+                        }
+                    }
+                }
+
+            };
+
+            Ok(Box::pin(stream) as _)
+        }))
+    }
+}
diff --git a/rust/sedona-pointcloud/src/laz/options.rs 
b/rust/sedona-pointcloud/src/laz/options.rs
new file mode 100644
index 00000000..124b9104
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/options.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt::Display, str::FromStr};
+
+use datafusion_common::{
+    config::{ConfigField, Visit},
+    config_namespace,
+    error::DataFusionError,
+};
+
+/// LAS extra bytes handling
+#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
+pub enum LasExtraBytes {
+    /// Resolve to typed and named attributes
+    Typed,
+    /// Keep as binary blob
+    Blob,
+    /// Drop/ignore extrabytes
+    #[default]
+    Ignore,
+}
+
+impl Display for LasExtraBytes {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            LasExtraBytes::Typed => f.write_str("typed"),
+            LasExtraBytes::Blob => f.write_str("blob"),
+            LasExtraBytes::Ignore => f.write_str("ignore"),
+        }
+    }
+}
+
+impl FromStr for LasExtraBytes {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "typed" => Ok(Self::Typed),
+            "blob" => Ok(Self::Blob),
+            "ignore" => Ok(Self::Ignore),
+            s => Err(format!("Unable to parse from `{s}`")),
+        }
+    }
+}
+
+impl ConfigField for LasExtraBytes {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static 
str) {
+        v.some(
+            &format!("{key}.extra_bytes"),
+            self,
+            "Specify extra bytes handling",
+        );
+    }
+
+    fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> {
+        *self = value.parse().map_err(DataFusionError::Configuration)?;
+        Ok(())
+    }
+}
+
+config_namespace! {
+    /// The LAS config options
+    pub struct LasOptions {
+        pub extra_bytes: LasExtraBytes, default = LasExtraBytes::default()
+    }
+
+}
+
+impl LasOptions {
+    pub fn with_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
+        self.extra_bytes = extra_bytes;
+        self
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::sync::Arc;
+
+    use datafusion::{
+        execution::SessionStateBuilder,
+        prelude::{SessionConfig, SessionContext},
+    };
+
+    use crate::{laz::format::LazFormatFactory, options::PointcloudOptions};
+
+    fn setup_context() -> SessionContext {
+        let file_format = Arc::new(LazFormatFactory::new());
+
+        let config = 
SessionConfig::new().with_option_extension(PointcloudOptions::default());
+        let mut state = SessionStateBuilder::new().with_config(config).build();
+        state.register_file_format(file_format, true).unwrap();
+
+        SessionContext::new_with_state(state).enable_url_table()
+    }
+
+    #[tokio::test]
+    async fn projection() {
+        let ctx = setup_context();
+
+        // default options
+        let df = ctx
+            .sql("SELECT x, y, z FROM 'tests/data/extra.laz'")
+            .await
+            .unwrap();
+
+        assert_eq!(df.schema().fields().len(), 3);
+
+        // overwrite options
+        ctx.sql("SET pointcloud.geometry_encoding = 'wkb'")
+            .await
+            .unwrap();
+        ctx.sql("SET pointcloud.las.extra_bytes = 'blob'")
+            .await
+            .unwrap();
+        let df = ctx
+            .sql("SELECT geometry, extra_bytes FROM 'tests/data/extra.laz'")
+            .await
+            .unwrap();
+
+        assert_eq!(df.schema().fields().len(), 2);
+    }
+}
diff --git a/rust/sedona-pointcloud/src/laz/reader.rs 
b/rust/sedona-pointcloud/src/laz/reader.rs
new file mode 100644
index 00000000..79c52ad1
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/reader.rs
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    io::{Cursor, Read, Seek},
+    ops::Range,
+    sync::Arc,
+};
+
+use arrow_array::RecordBatch;
+use bytes::Bytes;
+use datafusion_common::error::DataFusionError;
+use datafusion_datasource::PartitionedFile;
+use datafusion_execution::cache::cache_manager::FileMetadataCache;
+use futures::{future::BoxFuture, FutureExt};
+use las::{raw::Point as RawPoint, Point};
+use laz::{
+    record::{
+        LayeredPointRecordDecompressor, RecordDecompressor, 
SequentialPointRecordDecompressor,
+    },
+    DecompressionSelection, LasZipError, LazItem,
+};
+use object_store::{Error, ObjectStore};
+
+use crate::{
+    laz::{
+        builder::RowBuilder,
+        metadata::{ChunkMeta, LazMetadata, LazMetadataReader},
+    },
+    options::PointcloudOptions,
+};
+
+/// Laz file reader factory
+#[derive(Debug)]
+pub struct LazFileReaderFactory {
+    store: Arc<dyn ObjectStore>,
+    metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+}
+
+impl LazFileReaderFactory {
+    /// Create a new `LazFileReaderFactory`.
+    pub fn new(
+        store: Arc<dyn ObjectStore>,
+        metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+    ) -> Self {
+        Self {
+            store,
+            metadata_cache,
+        }
+    }
+
+    pub fn create_reader(
+        &self,
+        partitioned_file: PartitionedFile,
+        options: PointcloudOptions,
+    ) -> Result<Box<LazFileReader>, DataFusionError> {
+        Ok(Box::new(LazFileReader {
+            partitioned_file,
+            store: self.store.clone(),
+            metadata_cache: self.metadata_cache.clone(),
+            options,
+        }))
+    }
+}
+
+/// Reader for a laz file in object storage.
+pub struct LazFileReader {
+    partitioned_file: PartitionedFile,
+    store: Arc<dyn ObjectStore>,
+    metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+    pub options: PointcloudOptions,
+}
+
+impl LazFileReader {
+    pub fn get_metadata<'a>(&'a self) -> BoxFuture<'a, 
Result<Arc<LazMetadata>, DataFusionError>> {
+        let object_meta = self.partitioned_file.object_meta.clone();
+        let metadata_cache = self.metadata_cache.clone();
+
+        async move {
+            LazMetadataReader::new(&self.store, &object_meta)
+                .with_file_metadata_cache(metadata_cache)
+                .with_options(self.options.clone())
+                .fetch_metadata()
+                .await
+        }
+        .boxed()
+    }
+
+    async fn get_bytes(&self, range: Range<u64>) -> Result<Bytes, Error> {
+        let location = &self.partitioned_file.object_meta.location;
+        self.store.get_range(location, range).await
+    }
+
+    pub async fn get_batch(&self, chunk_meta: &ChunkMeta) -> 
Result<RecordBatch, DataFusionError> {
+        let metadata = self.get_metadata().await?;
+        let header = metadata.header.clone();
+
+        // fetch bytes
+        let bytes = self.get_bytes(chunk_meta.byte_range.clone()).await?;
+
+        // laz decompressor
+        let laz_vlr = header
+            .laz_vlr()
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+        let reader = Cursor::new(bytes);
+        let mut decompressor = record_decompressor(laz_vlr.items(), reader)
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        // record batch builder
+        let num_points = chunk_meta.num_points as usize;
+        let mut builder = RowBuilder::new(num_points, header.clone())
+            .with_geometry_encoding(self.options.geometry_encoding)
+            .with_extra_attributes(
+                metadata.extra_attributes.clone(),
+                self.options.las.extra_bytes,
+            );
+
+        // transform
+        let format = header.point_format();
+        let transforms = header.transforms();
+
+        let out = vec![0; format.len() as usize];
+        let mut buffer = Cursor::new(out);
+
+        for _ in 0..chunk_meta.num_points {
+            buffer.set_position(0);
+            decompressor.decompress_next(buffer.get_mut())?;
+
+            let point = RawPoint::read_from(&mut buffer, format)
+                .map(|raw_point| Point::new(raw_point, transforms))
+                .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+            builder.append(point);
+        }
+
+        let struct_array = builder.finish()?;
+
+        Ok(RecordBatch::from(struct_array))
+    }
+}
+
+fn record_decompressor<'a, R: Read + Seek + Send + Sync + 'a>(
+    items: &Vec<LazItem>,
+    input: R,
+) -> Result<Box<dyn RecordDecompressor<R> + Send + Sync + 'a>, LasZipError> {
+    let first_item = items
+        .first()
+        .expect("There should be at least one LazItem to be able to create a 
RecordDecompressor");
+
+    let mut decompressor = match first_item.version() {
+        1 | 2 => {
+            let decompressor = SequentialPointRecordDecompressor::new(input);
+            Box::new(decompressor) as Box<dyn RecordDecompressor<R> + Send + 
Sync>
+        }
+        3 | 4 => {
+            let decompressor = LayeredPointRecordDecompressor::new(input);
+            Box::new(decompressor) as Box<dyn RecordDecompressor<R> + Send + 
Sync>
+        }
+        _ => {
+            return Err(LasZipError::UnsupportedLazItemVersion(
+                first_item.item_type(),
+                first_item.version(),
+            ));
+        }
+    };
+
+    decompressor.set_fields_from(items)?;
+    decompressor.set_selection(DecompressionSelection::all());
+
+    Ok(decompressor)
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{fs::File, sync::Arc};
+
+    use datafusion_datasource::PartitionedFile;
+    use las::{point::Format, Builder, Writer};
+    use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
+
+    use crate::laz::reader::LazFileReaderFactory;
+
+    #[tokio::test]
+    async fn reader_basic_e2e() {
+        let tmpdir = tempfile::tempdir().unwrap();
+
+        // create laz file with one point
+        let tmp_path = tmpdir.path().join("one.laz");
+        let tmp_file = File::create(&tmp_path).unwrap();
+        let mut builder = Builder::from((1, 4));
+        builder.point_format = Format::new(0).unwrap();
+        builder.point_format.is_compressed = true;
+        let header = builder.into_header().unwrap();
+        let mut writer = Writer::new(tmp_file, header).unwrap();
+        writer.write_point(Default::default()).unwrap();
+        writer.close().unwrap();
+
+        // read batch with `LazFileReader`
+        let store = LocalFileSystem::new();
+        let location = Path::from_filesystem_path(tmp_path).unwrap();
+        let object = store.head(&location).await.unwrap();
+
+        let laz_file_reader = LazFileReaderFactory::new(Arc::new(store), None)
+            .create_reader(
+                PartitionedFile::new(location, object.size),
+                Default::default(),
+            )
+            .unwrap();
+        let metadata = laz_file_reader.get_metadata().await.unwrap();
+
+        let batch = laz_file_reader
+            .get_batch(&metadata.chunk_table[0])
+            .await
+            .unwrap();
+
+        assert_eq!(batch.num_rows(), 1);
+    }
+}
diff --git a/rust/sedona-pointcloud/src/laz/schema.rs 
b/rust/sedona-pointcloud/src/laz/schema.rs
new file mode 100644
index 00000000..bdcca623
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/schema.rs
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_schema::{ArrowError, DataType, Field, Schema};
+use geoarrow_schema::{CoordType, Crs, Dimension, Metadata, PointType, WkbType};
+use las::Header;
+use las_crs::{get_epsg_from_geotiff_crs, get_epsg_from_wkt_crs_bytes};
+
+use crate::{laz::options::LasExtraBytes, options::GeometryEncoding};
+
+// Arrow schema for LAS points
+pub fn try_schema_from_header(
+    header: &Header,
+    geometry_encoding: GeometryEncoding,
+    extra_bytes: LasExtraBytes,
+) -> Result<Schema, ArrowError> {
+    let epsg_crs = if header.has_wkt_crs() {
+        header
+            .get_wkt_crs_bytes()
+            .and_then(|bytes| get_epsg_from_wkt_crs_bytes(bytes).ok())
+    } else {
+        header
+            .get_geotiff_crs()
+            .map(|gtc| gtc.and_then(|gtc| 
get_epsg_from_geotiff_crs(&gtc).ok()))
+            .unwrap_or_default()
+    };
+
+    let crs = epsg_crs
+        .map(|epsg_crs| Crs::from_authority_code(format!("EPSG:{}", 
epsg_crs.get_horizontal())))
+        .unwrap_or_default();
+
+    let mut fields = match geometry_encoding {
+        GeometryEncoding::Plain => vec![
+            Field::new("x", DataType::Float64, false),
+            Field::new("y", DataType::Float64, false),
+            Field::new("z", DataType::Float64, false),
+        ],
+        GeometryEncoding::Wkb => {
+            let point_type = WkbType::new(Arc::new(Metadata::new(crs, None)));
+            vec![Field::new("geometry", DataType::Binary, 
false).with_extension_type(point_type)]
+        }
+        GeometryEncoding::Native => {
+            let point_type = PointType::new(Dimension::XYZ, 
Arc::new(Metadata::new(crs, None)))
+                .with_coord_type(CoordType::Separated);
+            vec![point_type.to_field("geometry", false)]
+        }
+    };
+    fields.extend_from_slice(&[
+        Field::new("intensity", DataType::UInt16, true),
+        Field::new("return_number", DataType::UInt8, false),
+        Field::new("number_of_returns", DataType::UInt8, false),
+        Field::new("is_synthetic", DataType::Boolean, false),
+        Field::new("is_key_point", DataType::Boolean, false),
+        Field::new("is_withheld", DataType::Boolean, false),
+        Field::new("is_overlap", DataType::Boolean, false),
+        Field::new("scanner_channel", DataType::UInt8, false),
+        Field::new("scan_direction", DataType::UInt8, false),
+        Field::new("is_edge_of_flight_line", DataType::Boolean, false),
+        Field::new("classification", DataType::UInt8, false),
+        Field::new("user_data", DataType::UInt8, false),
+        Field::new("scan_angle", DataType::Float32, false),
+        Field::new("point_source_id", DataType::UInt16, false),
+    ]);
+    if header.point_format().has_gps_time {
+        fields.push(Field::new("gps_time", DataType::Float64, false));
+    }
+    if header.point_format().has_color {
+        fields.extend([
+            Field::new("red", DataType::UInt16, false),
+            Field::new("green", DataType::UInt16, false),
+            Field::new("blue", DataType::UInt16, false),
+        ])
+    }
+    if header.point_format().has_nir {
+        fields.push(Field::new("nir", DataType::UInt16, false));
+    }
+
+    // extra bytes
+    if header.point_format().extra_bytes > 0 {
+        match extra_bytes {
+            LasExtraBytes::Typed => fields.extend(extra_bytes_fields(header)?),
+            LasExtraBytes::Blob => fields.push(Field::new(
+                "extra_bytes",
+                DataType::FixedSizeBinary(header.point_format().extra_bytes as 
i32),
+                false,
+            )),
+            LasExtraBytes::Ignore => (),
+        }
+    }
+
+    Ok(Schema::new(fields))
+}
+
+fn extra_bytes_fields(header: &Header) -> Result<Vec<Field>, ArrowError> {
+    let mut fields = Vec::new();
+
+    for vlr in header.all_vlrs() {
+        if !(vlr.user_id == "LASF_Spec" && vlr.record_id == 4) {
+            continue;
+        }
+
+        for bytes in vlr.data.chunks(192) {
+            // name
+            let name = std::str::from_utf8(&bytes[4..36])?;
+            let name = name.trim_end_matches(char::from(0));
+
+            // data type
+            let data_type = match bytes[2] {
+                0 => DataType::FixedSizeBinary(bytes[3] as i32),
+                1 => DataType::UInt8,
+                2 => DataType::Int8,
+                3 => DataType::UInt16,
+                4 => DataType::Int16,
+                5 => DataType::UInt32,
+                6 => DataType::Int32,
+                7 => DataType::UInt64,
+                8 => DataType::Int64,
+                9 => DataType::Float32,
+                10 => DataType::Float64,
+                11..=30 => {
+                    return Err(ArrowError::ExternalError(
+                        "deprecated extra bytes data type".into(),
+                    ));
+                }
+                31..=255 => {
+                    return Err(ArrowError::ExternalError(
+                        "reserved extra bytes data type".into(),
+                    ));
+                }
+            };
+
+            // nullability
+            let nullable = if bytes[2] != 0 && bytes[3] & 1 == 1 {
+                true // data bit is valid and set
+            } else {
+                false
+            };
+
+            fields.push(Field::new(name, data_type, nullable));
+        }
+    }
+
+    Ok(fields)
+}
diff --git a/rust/sedona-pointcloud/src/laz/source.rs 
b/rust/sedona-pointcloud/src/laz/source.rs
new file mode 100644
index 00000000..a8a0e720
--- /dev/null
+++ b/rust/sedona-pointcloud/src/laz/source.rs
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use datafusion_common::{config::ConfigOptions, error::DataFusionError, 
internal_err, Statistics};
+use datafusion_datasource::{
+    file::FileSource, file_scan_config::FileScanConfig, 
file_stream::FileOpener, TableSchema,
+};
+use datafusion_physical_expr::{conjunction, PhysicalExpr};
+use datafusion_physical_plan::{
+    filter_pushdown::{FilterPushdownPropagation, PushedDown},
+    metrics::ExecutionPlanMetricsSet,
+};
+use object_store::ObjectStore;
+
+use crate::{
+    laz::{opener::LazOpener, reader::LazFileReaderFactory},
+    options::PointcloudOptions,
+};
+
+#[derive(Clone, Default, Debug)]
+pub struct LazSource {
+    /// Optional metrics
+    metrics: ExecutionPlanMetricsSet,
+    /// The schema of the file.
+    pub(crate) table_schema: Option<TableSchema>,
+    /// Optional predicate for row filtering during parquet scan
+    pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
+    /// Laz file reader factory
+    pub(crate) laz_file_reader_factory: Option<Arc<LazFileReaderFactory>>,
+    /// Batch size configuration
+    pub(crate) batch_size: Option<usize>,
+    pub(crate) projected_statistics: Option<Statistics>,
+    pub(crate) options: PointcloudOptions,
+}
+
+impl LazSource {
+    pub fn with_options(mut self, options: PointcloudOptions) -> Self {
+        self.options = options;
+        self
+    }
+}
+
+impl FileSource for LazSource {
+    fn create_file_opener(
+        &self,
+        object_store: Arc<dyn ObjectStore>,
+        base_config: &FileScanConfig,
+        _partition: usize,
+    ) -> Arc<dyn FileOpener> {
+        let projection = base_config
+            .file_column_projection_indices()
+            .unwrap_or_else(|| 
(0..base_config.projected_file_schema().fields().len()).collect());
+
+        let laz_file_reader_factory = self
+            .laz_file_reader_factory
+            .clone()
+            .unwrap_or_else(|| 
Arc::new(LazFileReaderFactory::new(object_store, None)));
+
+        Arc::new(LazOpener {
+            projection: Arc::from(projection),
+            batch_size: self.batch_size.expect("Must be set"),
+            limit: base_config.limit,
+            predicate: self.predicate.clone(),
+            laz_file_reader_factory,
+            options: self.options.clone(),
+        })
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
+        let mut conf = self.clone();
+        conf.batch_size = Some(batch_size);
+        Arc::new(conf)
+    }
+
+    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
+        let mut conf = self.clone();
+        conf.table_schema = Some(schema);
+        Arc::new(conf)
+    }
+
+    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
+        Arc::new(Self { ..self.clone() })
+    }
+
+    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
+        let mut conf = self.clone();
+        conf.projected_statistics = Some(statistics);
+        Arc::new(conf)
+    }
+
+    fn metrics(&self) -> &ExecutionPlanMetricsSet {
+        &self.metrics
+    }
+
+    fn statistics(&self) -> Result<Statistics, DataFusionError> {
+        let Some(statistics) = &self.projected_statistics else {
+            return internal_err!("projected_statistics must be set");
+        };
+
+        if self.filter().is_some() {
+            Ok(statistics.clone().to_inexact())
+        } else {
+            Ok(statistics.clone())
+        }
+    }
+
+    fn file_type(&self) -> &str {
+        "laz"
+    }
+
+    fn try_pushdown_filters(
+        &self,
+        filters: Vec<Arc<dyn PhysicalExpr>>,
+        _config: &ConfigOptions,
+    ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>, 
DataFusionError> {
+        let mut source = self.clone();
+
+        let predicate = match source.predicate {
+            Some(predicate) => 
conjunction(std::iter::once(predicate).chain(filters.clone())),
+            None => conjunction(filters.clone()),
+        };
+
+        source.predicate = Some(predicate);
+        let source = Arc::new(source);
+
+        // Tell our parents that they still have to handle the filters (they 
will only be used for stats pruning).
+        Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
+            PushedDown::No;
+            filters.len()
+        ])
+        .with_updated_node(source))
+    }
+}
diff --git a/rust/sedona-pointcloud/src/lib.rs 
b/rust/sedona-pointcloud/src/lib.rs
new file mode 100644
index 00000000..5c9acc71
--- /dev/null
+++ b/rust/sedona-pointcloud/src/lib.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod laz;
+pub mod options;
diff --git a/rust/sedona-pointcloud/src/options.rs 
b/rust/sedona-pointcloud/src/options.rs
new file mode 100644
index 00000000..ce8cc263
--- /dev/null
+++ b/rust/sedona-pointcloud/src/options.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{fmt::Display, str::FromStr};
+
+use datafusion_common::{
+    config::{ConfigExtension, ConfigField, Visit},
+    error::DataFusionError,
+    extensions_options,
+};
+
+use crate::laz::options::{LasExtraBytes, LasOptions};
+
+/// Geometry representation
+#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)]
+pub enum GeometryEncoding {
+    /// Use plain coordinates as three fields `x`, `y`, `z` with datatype 
Float64 encoding.
+    #[default]
+    Plain,
+    /// Resolves the coordinates to a fields `geometry` with WKB encoding.
+    Wkb,
+    /// Resolves the coordinates to a fields `geometry` with separated 
GeoArrow encoding.
+    Native,
+}
+
+impl Display for GeometryEncoding {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            GeometryEncoding::Plain => f.write_str("plain"),
+            GeometryEncoding::Wkb => f.write_str("wkb"),
+            GeometryEncoding::Native => f.write_str("native"),
+        }
+    }
+}
+
+impl FromStr for GeometryEncoding {
+    type Err = String;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "plain" => Ok(Self::Plain),
+            "wkb" => Ok(Self::Wkb),
+            "native" => Ok(Self::Native),
+            s => Err(format!("Unable to parse from `{s}`")),
+        }
+    }
+}
+
+impl ConfigField for GeometryEncoding {
+    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static 
str) {
+        v.some(
+            &format!("{key}.geometry_encoding"),
+            self,
+            "Specify point geometry encoding",
+        );
+    }
+
+    fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> {
+        *self = value.parse().map_err(DataFusionError::Configuration)?;
+        Ok(())
+    }
+}
+
+extensions_options! {
+    /// Pointcloud configuration options
+    pub struct PointcloudOptions {
+        pub geometry_encoding: GeometryEncoding, default = 
GeometryEncoding::default()
+        pub las: LasOptions, default = LasOptions::default()
+    }
+
+}
+
+impl ConfigExtension for PointcloudOptions {
+    const PREFIX: &'static str = "pointcloud";
+}
+
+impl PointcloudOptions {
+    pub fn with_geometry_encoding(mut self, geometry_encoding: 
GeometryEncoding) -> Self {
+        self.geometry_encoding = geometry_encoding;
+        self
+    }
+
+    pub fn with_las_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self {
+        self.las.extra_bytes = extra_bytes;
+        self
+    }
+}
diff --git a/rust/sedona-pointcloud/tests/data/extra.laz 
b/rust/sedona-pointcloud/tests/data/extra.laz
new file mode 100644
index 00000000..016c6c29
Binary files /dev/null and b/rust/sedona-pointcloud/tests/data/extra.laz differ
diff --git a/rust/sedona-pointcloud/tests/data/generate.py 
b/rust/sedona-pointcloud/tests/data/generate.py
new file mode 100644
index 00000000..bf0af497
--- /dev/null
+++ b/rust/sedona-pointcloud/tests/data/generate.py
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from pathlib import Path
+import laspy
+import numpy as np
+
+
+# Some links for reference
+#
+# * ASPRS: <https://www.asprs.org/>
+# * ASPRS (GitHub): <https://github.com/ASPRSorg/LAS>
+# * OGC LAS Specification Standard: <https://www.ogc.org/standards/las/>
+# * LAZ Specification 1.4: 
<https://downloads.rapidlasso.de/doc/LAZ_Specification_1.4_R1.pdf>
+
+
+DATA_DIR = Path(__file__).resolve().parent
+
+
+LAS_VERSIONS = [f"1.{p}" for p in range(5)]  # 1.0 - 1.4
+POINT_FORMAT = list(range(11))  # 0 - 10 (>= 6 for LAS 1.4+)
+
+DATA_TYPES = [
+    "uint8",
+    "int8",
+    "uint16",
+    "int16",
+    "uint32",
+    "int32",
+    "uint64",
+    "int64",
+    "float32",
+    "float64",
+]
+
+# Pragmatic choice
+version = LAS_VERSIONS[4]
+point_format = POINT_FORMAT[6]
+
+# Header
+header = laspy.LasHeader(point_format=point_format, version=version)
+header.offsets = np.array([1.0, 1.0, 1.0])
+header.scales = np.array([0.1, 0.1, 0.1])
+
+# Extra attributes
+for dt in DATA_TYPES:
+    name = f"{dt}_plain"
+    header.add_extra_dim(laspy.point.format.ExtraBytesParams(name, dt, "", 
None, None))
+
+    name = f"{dt}_scaled"
+    header.add_extra_dim(
+        laspy.point.format.ExtraBytesParams(name, dt, "", [10.0], [0.1])
+    )
+
+    name = f"{dt}_nodata"
+    header.add_extra_dim(
+        laspy.point.format.ExtraBytesParams(name, dt, "", None, None, [42])
+    )
+
+# Write laz with one point
+extra_path = DATA_DIR.joinpath("extra.laz")
+with laspy.open(extra_path, mode="w", header=header, do_compress=True) as 
writer:
+    point_record = laspy.ScaleAwarePointRecord.zeros(point_count=1, 
header=header)
+    point_record.x = [0.5]
+    point_record.y = [0.5]
+    point_record.z = [0.5]
+
+    for dt in DATA_TYPES:
+        name = f"{dt}_plain"
+        point_record[name] = [21]
+
+        name = f"{dt}_scaled"
+        point_record[name] = [21]
+
+        name = f"{dt}_nodata"
+        point_record[name] = [42]
+
+    writer.write_points(point_record)
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index 1172f77a..199a783e 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -39,6 +39,7 @@ geo = ["dep:sedona-geo"]
 geos = ["dep:sedona-geos"]
 tg = ["dep:sedona-tg"]
 http = ["object_store/http"]
+pointcloud = ["dep:sedona-pointcloud"]
 proj = ["sedona-proj/proj-sys"]
 spatial-join = ["dep:sedona-spatial-join"]
 s2geography = ["dep:sedona-s2geography"]
@@ -73,6 +74,7 @@ sedona-geo = { workspace = true, optional = true }
 sedona-geometry = { workspace = true }
 sedona-geoparquet = { workspace = true }
 sedona-geos = { workspace = true, optional = true }
+sedona-pointcloud = { workspace = true, optional = true }
 sedona-proj = { workspace = true }
 sedona-raster-functions = { workspace = true }
 sedona-schema = { workspace = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index 6e31b855..a931bcbb 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -52,6 +52,11 @@ use sedona_geoparquet::{
     format::GeoParquetFormatFactory,
     provider::{geoparquet_listing_table, GeoParquetReadOptions},
 };
+#[cfg(feature = "pointcloud")]
+use sedona_pointcloud::{
+    laz::{format::LazFormatFactory, options::LasExtraBytes},
+    options::{GeometryEncoding, PointcloudOptions},
+};
 
 /// Sedona SessionContext wrapper
 ///
@@ -84,6 +89,13 @@ impl SedonaContext {
         // variables.
         let session_config = 
SessionConfig::from_env()?.with_information_schema(true);
         let session_config = add_sedona_option_extension(session_config);
+        #[cfg(feature = "pointcloud")]
+        let session_config = session_config.with_option_extension(
+            PointcloudOptions::default()
+                .with_geometry_encoding(GeometryEncoding::Wkb)
+                .with_las_extra_bytes(LasExtraBytes::Typed),
+        );
+
         let rt_builder = RuntimeEnvBuilder::new();
         let runtime_env = rt_builder.build_arc()?;
 
@@ -101,6 +113,10 @@ impl SedonaContext {
 
         let mut state = state_builder.build();
         state.register_file_format(Arc::new(GeoParquetFormatFactory::new()), 
true)?;
+        #[cfg(feature = "pointcloud")]
+        {
+            state.register_file_format(Arc::new(LazFormatFactory::new()), 
false)?;
+        }
 
         // Enable dynamic file query (i.e., select * from 'filename')
         let ctx = SessionContext::new_with_state(state).enable_url_table();

Reply via email to