This is an automated email from the ASF dual-hosted git repository.

prantogg pushed a commit to branch pranav/add-s3-write-support
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git

commit 51b63a1fe0ba0d7bc1d2e20ab8c6b42242b4f36e
Author: Pranav Toggi <[email protected]>
AuthorDate: Fri Jan 2 23:49:03 2026 -0800

    feat: add s3 write support
---
 README.md                           |  20 ++++
 spatialbench-cli/Cargo.toml         |   3 +-
 spatialbench-cli/src/generate.rs    |  83 +++++++++++++++
 spatialbench-cli/src/main.rs        |  37 +++++++
 spatialbench-cli/src/output_plan.rs |  38 +++++--
 spatialbench-cli/src/parquet.rs     |  92 +++++++++++++++++
 spatialbench-cli/src/plan.rs        |  10 ++
 spatialbench-cli/src/runner.rs      |  22 +++-
 spatialbench-cli/src/s3_writer.rs   | 201 ++++++++++++++++++++++++++++++++++++
 9 files changed, 491 insertions(+), 15 deletions(-)

diff --git a/README.md b/README.md
index 9020cf3..dcedb75 100644
--- a/README.md
+++ b/README.md
@@ -147,6 +147,26 @@ spatialbench-cli --scale-factor 1 --mb-per-file 256 
--output-dir sf1-parquet
 spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir sf10-parquet
 ```
 
+#### Generate Data Directly to S3
+
+You can generate data directly to Amazon S3 or S3-compatible storage by 
providing an S3 URI as the output directory:
+
+```bash
+# Set AWS credentials
+export AWS_ACCESS_KEY_ID="your-access-key"
+export AWS_SECRET_ACCESS_KEY="your-secret-key"
+export AWS_REGION="us-west-2"  # Must match your bucket's region
+
+# Generate to S3
+spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir 
s3://my-bucket/spatialbench/sf10
+
+# For S3-compatible services (MinIO, etc.)
+export AWS_ENDPOINT="http://localhost:9000";
+spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
+```
+
+The S3 writer uses streaming multipart upload, buffering data in 32MB chunks 
before uploading parts. This ensures memory-efficient generation even for large 
datasets. All output formats (Parquet, CSV, TBL) are supported, and the 
generated files are byte-for-byte identical to local generation.
+
 #### Custom Spider Configuration
 
 You can override these defaults at runtime by passing a YAML file via the 
`--config` flag:
diff --git a/spatialbench-cli/Cargo.toml b/spatialbench-cli/Cargo.toml
index a3c8e87..34fb346 100644
--- a/spatialbench-cli/Cargo.toml
+++ b/spatialbench-cli/Cargo.toml
@@ -24,10 +24,11 @@ serde = { version = "1.0.219", features = ["derive"] }
 anyhow = "1.0.99"
 serde_yaml = "0.9.33"
 datafusion = "50.2"
-object_store = { version = "0.12.4", features = ["http"] }
+object_store = { version = "0.12.4", features = ["http", "aws"] }
 arrow-array = "56"
 arrow-schema = "56"
 url = "2.5.7"
+bytes = "1.10.1"
 
 [dev-dependencies]
 assert_cmd = "2.0"
diff --git a/spatialbench-cli/src/generate.rs b/spatialbench-cli/src/generate.rs
index bbe3cef..2d4cba3 100644
--- a/spatialbench-cli/src/generate.rs
+++ b/spatialbench-cli/src/generate.rs
@@ -36,6 +36,15 @@ pub trait Sink: Send {
     fn flush(self) -> Result<(), io::Error>;
 }
 
+/// Async version of Sink for writers that need async finalization (like 
S3Writer)
+pub trait AsyncSink: Send {
+    /// Write all data from the buffer to the sink
+    fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>;
+
+    /// Complete and flush any remaining data from the sink (async)
+    fn async_flush(self) -> impl std::future::Future<Output = Result<(), 
io::Error>> + Send;
+}
+
 /// Generates data in parallel from a series of [`Source`] and writes to a 
[`Sink`]
 ///
 /// Each [`Source`] is a data generator that generates data directly into an in
@@ -135,6 +144,80 @@ where
     writer_task.await.expect("writer task panicked")
 }
 
+/// Generates data in parallel from a series of [`Source`] and writes to an 
[`AsyncSink`]
+///
+/// This is similar to generate_in_chunks but handles async finalization for 
S3Writer
+pub async fn generate_in_chunks_async<G, I, S>(
+    mut sink: S,
+    sources: I,
+    num_threads: usize,
+) -> Result<(), io::Error>
+where
+    G: Source + 'static,
+    I: Iterator<Item = G>,
+    S: AsyncSink + 'static,
+{
+    let recycler = BufferRecycler::new();
+    let mut sources = sources.peekable();
+
+    debug!("Using {num_threads} threads (async sink)");
+
+    let (tx, mut rx) = tokio::sync::mpsc::channel(num_threads);
+
+    // write the header
+    let Some(first) = sources.peek() else {
+        return Ok(()); // no sources
+    };
+    let header = first.header(Vec::new());
+    tx.send(header)
+        .await
+        .expect("tx just created, it should not be closed");
+
+    let sources_and_recyclers = sources.map(|generator| (generator, 
recycler.clone()));
+
+    let mut stream = futures::stream::iter(sources_and_recyclers)
+        .map(async |(source, recycler)| {
+            let buffer = recycler.new_buffer(1024 * 1024 * 8);
+            let mut join_set = JoinSet::new();
+            join_set.spawn(async move { source.create(buffer) });
+            join_set
+                .join_next()
+                .await
+                .expect("had one item")
+                .expect("join_next join is infallible unless task panics")
+        })
+        .buffered(num_threads)
+        .map(async |buffer| {
+            if let Err(e) = tx.send(buffer).await {
+                debug!("Error sending buffer to writer: {e}");
+            }
+        });
+
+    let captured_recycler = recycler.clone();
+    let writer_task = tokio::task::spawn(async move {
+        while let Some(buffer) = rx.recv().await {
+            sink.sink(&buffer)?;
+            captured_recycler.return_buffer(buffer);
+        }
+        // No more input, flush the sink asynchronously
+        sink.async_flush().await
+    });
+
+    // drive the stream to completion
+    while let Some(write_task) = stream.next().await {
+        if writer_task.is_finished() {
+            debug!("writer task is done early, stopping writer");
+            break;
+        }
+        write_task.await;
+    }
+    drop(stream);
+    drop(tx);
+
+    debug!("waiting for writer task to complete");
+    writer_task.await.expect("writer task panicked")
+}
+
 /// A simple buffer recycler to avoid allocating new buffers for each part
 ///
 /// Clones share the same underlying recycler, so it is not thread safe
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index e1b5c4c..8994652 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -11,6 +11,7 @@ mod output_plan;
 mod parquet;
 mod plan;
 mod runner;
+mod s3_writer;
 mod spatial_config_file;
 mod statistics;
 mod tbl;
@@ -384,6 +385,13 @@ impl IntoSize for BufWriter<File> {
     }
 }
 
+impl IntoSize for s3_writer::S3Writer {
+    fn into_size(self) -> Result<usize, io::Error> {
+        // Return the buffer size before finishing
+        Ok(self.buffer_size())
+    }
+}
+
 /// Wrapper around a buffer writer that counts the number of buffers and bytes 
written
 struct WriterSink<W: Write> {
     statistics: WriteStatistics,
@@ -410,3 +418,32 @@ impl<W: Write + Send> Sink for WriterSink<W> {
         self.inner.flush()
     }
 }
+
+/// Async wrapper for S3Writer to handle async finalization
+pub struct AsyncWriterSink {
+    statistics: WriteStatistics,
+    inner: s3_writer::S3Writer,
+}
+
+impl AsyncWriterSink {
+    pub fn new(inner: s3_writer::S3Writer) -> Self {
+        Self {
+            inner,
+            statistics: WriteStatistics::new("buffers"),
+        }
+    }
+}
+
+impl generate::AsyncSink for AsyncWriterSink {
+    fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error> {
+        self.statistics.increment_chunks(1);
+        self.statistics.increment_bytes(buffer.len());
+        self.inner.write_all(buffer)
+    }
+
+    async fn async_flush(mut self) -> Result<(), io::Error> {
+        self.inner.flush()?;
+        self.inner.finish().await?;
+        Ok(())
+    }
+}
diff --git a/spatialbench-cli/src/output_plan.rs 
b/spatialbench-cli/src/output_plan.rs
index 75e441b..5e8a969 100644
--- a/spatialbench-cli/src/output_plan.rs
+++ b/spatialbench-cli/src/output_plan.rs
@@ -18,6 +18,8 @@ pub enum OutputLocation {
     File(PathBuf),
     /// Output to stdout
     Stdout,
+    /// Output to S3
+    S3(String),
 }
 
 impl Display for OutputLocation {
@@ -31,6 +33,10 @@ impl Display for OutputLocation {
                 write!(f, "{}", file.to_string_lossy())
             }
             OutputLocation::Stdout => write!(f, "Stdout"),
+            OutputLocation::S3(uri) => {
+                // Display the S3 URI
+                write!(f, "{}", uri)
+            }
         }
     }
 }
@@ -265,17 +271,31 @@ impl OutputPlanGenerator {
                 OutputFormat::Parquet => "parquet",
             };
 
-            let mut output_path = self.output_dir.clone();
-            if let Some(part) = part {
-                // If a partition is specified, create a subdirectory for it
-                output_path.push(table.to_string());
-                self.ensure_directory_exists(&output_path)?;
-                output_path.push(format!("{table}.{part}.{extension}"));
+            // Check if output_dir is an S3 URI
+            let output_dir_str = self.output_dir.to_string_lossy();
+            if output_dir_str.starts_with("s3://") {
+                // Handle S3 path
+                let base_uri = output_dir_str.trim_end_matches('/');
+                let s3_uri = if let Some(part) = part {
+                    format!("{base_uri}/{table}/{table}.{part}.{extension}")
+                } else {
+                    format!("{base_uri}/{table}.{extension}")
+                };
+                Ok(OutputLocation::S3(s3_uri))
             } else {
-                // No partition specified, output to a single file
-                output_path.push(format!("{table}.{extension}"));
+                // Handle local filesystem path
+                let mut output_path = self.output_dir.clone();
+                if let Some(part) = part {
+                    // If a partition is specified, create a subdirectory for 
it
+                    output_path.push(table.to_string());
+                    self.ensure_directory_exists(&output_path)?;
+                    output_path.push(format!("{table}.{part}.{extension}"));
+                } else {
+                    // No partition specified, output to a single file
+                    output_path.push(format!("{table}.{extension}"));
+                }
+                Ok(OutputLocation::File(output_path))
             }
-            Ok(OutputLocation::File(output_path))
         }
     }
 
diff --git a/spatialbench-cli/src/parquet.rs b/spatialbench-cli/src/parquet.rs
index 53b60f4..d9f7222 100644
--- a/spatialbench-cli/src/parquet.rs
+++ b/spatialbench-cli/src/parquet.rs
@@ -1,5 +1,6 @@
 //! Parquet output format
 
+use crate::s3_writer::S3Writer;
 use crate::statistics::WriteStatistics;
 use arrow::datatypes::SchemaRef;
 use futures::StreamExt;
@@ -124,6 +125,97 @@ where
     Ok(())
 }
 
+/// Converts a set of RecordBatchIterators into a Parquet file for S3Writer
+///
+/// This is a specialized version that handles S3Writer's async finalization
+pub async fn generate_parquet_s3<I>(
+    writer: S3Writer,
+    iter_iter: I,
+    num_threads: usize,
+    parquet_compression: Compression,
+) -> Result<(), io::Error>
+where
+    I: Iterator<Item: RecordBatchIterator> + 'static,
+{
+    debug!(
+        "Generating Parquet for S3 with {num_threads} threads, using 
{parquet_compression} compression"
+    );
+    let mut iter_iter = iter_iter.peekable();
+
+    // get schema from the first iterator
+    let Some(first_iter) = iter_iter.peek() else {
+        return Ok(());
+    };
+    let schema = Arc::clone(first_iter.schema());
+
+    // Compute the parquet schema
+    let writer_properties = WriterProperties::builder()
+        .set_compression(parquet_compression)
+        .build();
+    let writer_properties = Arc::new(writer_properties);
+    let parquet_schema = Arc::new(
+        ArrowSchemaConverter::new()
+            .with_coerce_types(writer_properties.coerce_types())
+            .convert(&schema)
+            .unwrap(),
+    );
+
+    // create a stream that computes the data for each row group
+    let mut row_group_stream = futures::stream::iter(iter_iter)
+        .map(async |iter| {
+            let parquet_schema = Arc::clone(&parquet_schema);
+            let writer_properties = Arc::clone(&writer_properties);
+            let schema = Arc::clone(&schema);
+            tokio::task::spawn(async move {
+                encode_row_group(parquet_schema, writer_properties, schema, 
iter)
+            })
+            .await
+            .expect("Inner task panicked")
+        })
+        .buffered(num_threads);
+
+    let root_schema = parquet_schema.root_schema_ptr();
+    let writer_properties_captured = Arc::clone(&writer_properties);
+    let (tx, mut rx): (
+        Sender<Vec<ArrowColumnChunk>>,
+        Receiver<Vec<ArrowColumnChunk>>,
+    ) = tokio::sync::mpsc::channel(num_threads);
+
+    let writer_task = tokio::task::spawn_blocking(move || {
+        let mut statistics = WriteStatistics::new("row groups");
+        let mut writer =
+            SerializedFileWriter::new(writer, root_schema, 
writer_properties_captured).unwrap();
+
+        while let Some(chunks) = rx.blocking_recv() {
+            let mut row_group_writer = writer.next_row_group().unwrap();
+            for chunk in chunks {
+                chunk.append_to_row_group(&mut row_group_writer).unwrap();
+            }
+            row_group_writer.close().unwrap();
+            statistics.increment_chunks(1);
+        }
+        // Return the S3Writer for async upload
+        let s3_writer = writer.into_inner()?;
+        Ok((s3_writer, statistics)) as Result<(S3Writer, WriteStatistics), 
io::Error>
+    });
+
+    // Drive the input stream
+    while let Some(chunks) = row_group_stream.next().await {
+        if let Err(e) = tx.send(chunks).await {
+            debug!("Error sending chunks to writer: {e}");
+            break;
+        }
+    }
+    drop(tx);
+
+    // Wait for writer task and upload to S3
+    let (s3_writer, mut statistics) = writer_task.await??;
+    let size = s3_writer.finish().await?;
+    statistics.increment_bytes(size);
+
+    Ok(())
+}
+
 /// Creates the data for a particular row group
 ///
 /// Note at the moment it does not use multiple tasks/threads but it could
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index fceace9..1a38c1d 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -60,6 +60,16 @@ pub struct GenerationPlan {
 
 pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 128 * 1024 * 1024;
 
+/// Buffer size for Parquet writing (32MB)
+///
+/// This buffer size is used for:
+/// - Local file writing with BufWriter
+/// - S3 multipart upload parts
+///
+/// The 32MB size provides good performance and is well above the AWS S3
+/// minimum part size requirement of 5MB for multipart uploads.
+pub const PARQUET_BUFFER_SIZE: usize = 32 * 1024 * 1024;
+
 impl GenerationPlan {
     /// Returns a GenerationPlan number of parts to generate
     ///
diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs
index 882d1cf..8ab881c 100644
--- a/spatialbench-cli/src/runner.rs
+++ b/spatialbench-cli/src/runner.rs
@@ -1,11 +1,12 @@
 //! [`PlanRunner`] for running [`OutputPlan`]s.
 
 use crate::csv::*;
-use crate::generate::{generate_in_chunks, Source};
+use crate::generate::{generate_in_chunks, generate_in_chunks_async, Source};
 use crate::output_plan::{OutputLocation, OutputPlan};
-use crate::parquet::generate_parquet;
+use crate::parquet::{generate_parquet, generate_parquet_s3};
+use crate::s3_writer::S3Writer;
 use crate::tbl::*;
-use crate::{OutputFormat, Table, WriterSink};
+use crate::{AsyncWriterSink, OutputFormat, Table, WriterSink};
 use log::{debug, info};
 use spatialbench::generators::{
     BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator, 
VehicleGenerator,
@@ -201,6 +202,12 @@ where
             })?;
             Ok(())
         }
+        OutputLocation::S3(uri) => {
+            info!("Writing to S3: {}", uri);
+            let s3_writer = S3Writer::new(uri)?;
+            let sink = AsyncWriterSink::new(s3_writer);
+            generate_in_chunks_async(sink, sources, num_threads).await
+        }
     }
 }
 
@@ -211,7 +218,7 @@ where
 {
     match plan.output_location() {
         OutputLocation::Stdout => {
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, 
io::stdout()); // 32MB buffer
+            let writer = 
BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, io::stdout());
             generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await
         }
         OutputLocation::File(path) => {
@@ -225,7 +232,7 @@ where
             let file = std::fs::File::create(&temp_path).map_err(|err| {
                 io::Error::other(format!("Failed to create {temp_path:?}: 
{err}"))
             })?;
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 
32MB buffer
+            let writer = 
BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, file);
             generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await?;
             // rename the temp file to the final path
             std::fs::rename(&temp_path, path).map_err(|e| {
@@ -235,6 +242,11 @@ where
             })?;
             Ok(())
         }
+        OutputLocation::S3(uri) => {
+            info!("Writing parquet to S3: {}", uri);
+            let s3_writer = S3Writer::new(uri)?;
+            generate_parquet_s3(s3_writer, sources, num_threads, 
plan.parquet_compression()).await
+        }
     }
 }
 
diff --git a/spatialbench-cli/src/s3_writer.rs 
b/spatialbench-cli/src/s3_writer.rs
new file mode 100644
index 0000000..9d8ea40
--- /dev/null
+++ b/spatialbench-cli/src/s3_writer.rs
@@ -0,0 +1,201 @@
+//! S3 writer support for writing generated data directly to S3
+
+use crate::plan::PARQUET_BUFFER_SIZE;
+use bytes::Bytes;
+use log::{debug, info};
+use object_store::aws::AmazonS3Builder;
+use object_store::path::Path as ObjectPath;
+use object_store::ObjectStore;
+use std::io::{self, Write};
+use std::sync::Arc;
+use url::Url;
+
+/// Minimum part size enforced by AWS S3 for multipart uploads (except last 
part)
+const S3_MIN_PART_SIZE: usize = 5 * 1024 * 1024; // 5MB
+
+/// A writer that buffers data parts in memory and uploads to S3 when finished
+///
+/// This implementation avoids nested runtime issues by deferring all async
+/// operations to the finish() method. Parts are accumulated in memory during
+/// write() calls and uploaded in a batch during finish().
+pub struct S3Writer {
+    /// The S3 client
+    client: Arc<dyn ObjectStore>,
+    /// The path in S3 to write to
+    path: ObjectPath,
+    /// Current buffer for accumulating data
+    buffer: Vec<u8>,
+    /// Completed parts ready for upload (each is at least MIN_PART_SIZE)
+    parts: Vec<Bytes>,
+    /// Total bytes written
+    total_bytes: usize,
+}
+
+impl S3Writer {
+    /// Create a new S3 writer for the given S3 URI
+    ///
+    /// The URI should be in the format: s3://bucket/path/to/object
+    ///
+    /// Authentication is handled through AWS environment variables:
+    /// - AWS_ACCESS_KEY_ID
+    /// - AWS_SECRET_ACCESS_KEY
+    /// - AWS_REGION (optional, defaults to us-east-1)
+    /// - AWS_SESSION_TOKEN (optional, for temporary credentials)
+    /// - AWS_ENDPOINT (optional, for S3-compatible services)
+    pub fn new(uri: &str) -> Result<Self, io::Error> {
+        let url = Url::parse(uri).map_err(|e| {
+            io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Invalid S3 URI: {}", e),
+            )
+        })?;
+
+        if url.scheme() != "s3" {
+            return Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                format!("Expected s3:// URI, got: {}", url.scheme()),
+            ));
+        }
+
+        let bucket = url.host_str().ok_or_else(|| {
+            io::Error::new(io::ErrorKind::InvalidInput, "S3 URI missing bucket 
name")
+        })?;
+
+        let path = url.path().trim_start_matches('/');
+
+        debug!(
+            "Creating S3 streaming writer for bucket: {}, path: {}",
+            bucket, path
+        );
+
+        // Build the S3 client using environment variables
+        let mut builder = AmazonS3Builder::new().with_bucket_name(bucket);
+
+        // Try to get credentials from environment variables
+        if let Ok(access_key) = std::env::var("AWS_ACCESS_KEY_ID") {
+            builder = builder.with_access_key_id(access_key);
+        }
+
+        if let Ok(secret_key) = std::env::var("AWS_SECRET_ACCESS_KEY") {
+            builder = builder.with_secret_access_key(secret_key);
+        }
+
+        if let Ok(region) = std::env::var("AWS_REGION") {
+            builder = builder.with_region(region);
+        }
+
+        if let Ok(session_token) = std::env::var("AWS_SESSION_TOKEN") {
+            builder = builder.with_token(session_token);
+        }
+
+        if let Ok(endpoint) = std::env::var("AWS_ENDPOINT") {
+            builder = builder.with_endpoint(endpoint);
+        }
+
+        let client = builder
+            .build()
+            .map_err(|e| io::Error::other(format!("Failed to create S3 client: 
{}", e)))?;
+
+        info!(
+            "S3 streaming writer created successfully for bucket: {}",
+            bucket
+        );
+
+        Ok(Self {
+            client: Arc::new(client),
+            path: ObjectPath::from(path),
+            buffer: Vec::with_capacity(S3_MIN_PART_SIZE),
+            parts: Vec::new(),
+            total_bytes: 0,
+        })
+    }
+
+    /// Complete the upload by sending all buffered data to S3
+    ///
+    /// This method performs all async operations, uploading parts and 
completing
+    /// the multipart upload. It must be called from an async context.
+    pub async fn finish(mut self) -> Result<usize, io::Error> {
+        debug!("Completing S3 upload: {} bytes total", self.total_bytes);
+
+        // Add any remaining buffer data as the final part
+        if !self.buffer.is_empty() {
+            self.parts
+                .push(Bytes::from(std::mem::take(&mut self.buffer)));
+        }
+
+        // Handle small files with simple PUT
+        if self.parts.len() == 1 && self.parts[0].len() < S3_MIN_PART_SIZE {
+            debug!(
+                "Using simple PUT for small file: {} bytes",
+                self.total_bytes
+            );
+            let data = self.parts.into_iter().next().unwrap();
+            self.client
+                .put(&self.path, data.into())
+                .await
+                .map_err(|e| io::Error::other(format!("Failed to upload to S3: 
{}", e)))?;
+            info!("Successfully uploaded {} bytes to S3", self.total_bytes);
+            return Ok(self.total_bytes);
+        }
+
+        // Use multipart upload for larger files
+        debug!("Starting multipart upload for {} parts", self.parts.len());
+        let mut upload =
+            self.client.put_multipart(&self.path).await.map_err(|e| {
+                io::Error::other(format!("Failed to start multipart upload: 
{}", e))
+            })?;
+
+        // Upload all parts
+        for (i, part_data) in self.parts.into_iter().enumerate() {
+            debug!("Uploading part {} ({} bytes)", i + 1, part_data.len());
+            upload
+                .put_part(part_data.into())
+                .await
+                .map_err(|e| io::Error::other(format!("Failed to upload part 
{}: {}", i + 1, e)))?;
+        }
+
+        // Complete the multipart upload
+        upload
+            .complete()
+            .await
+            .map_err(|e| io::Error::other(format!("Failed to complete 
multipart upload: {}", e)))?;
+
+        info!(
+            "Successfully uploaded {} bytes to S3 using multipart upload",
+            self.total_bytes
+        );
+        Ok(self.total_bytes)
+    }
+
+    /// Get the total bytes written so far
+    pub fn total_bytes(&self) -> usize {
+        self.total_bytes
+    }
+
+    /// Get the buffer size (for compatibility)
+    pub fn buffer_size(&self) -> usize {
+        self.total_bytes
+    }
+}
+
+impl Write for S3Writer {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.total_bytes += buf.len();
+        self.buffer.extend_from_slice(buf);
+
+        // When buffer reaches our target part size (32MB), save it as a 
completed part
+        // No async operations here - we just move data to the parts vec
+        if self.buffer.len() >= PARQUET_BUFFER_SIZE {
+            let part_data =
+                std::mem::replace(&mut self.buffer, 
Vec::with_capacity(PARQUET_BUFFER_SIZE));
+            self.parts.push(Bytes::from(part_data));
+        }
+
+        Ok(buf.len())
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        // No-op: all data will be uploaded in finish()
+        Ok(())
+    }
+}

Reply via email to