This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 4096719cb feat: add convenience API for bulk ingest (#4116)
4096719cb is described below

commit 4096719cbf78a951d145605f54fb68485fd3d0bd
Author: Kent Wu <[email protected]>
AuthorDate: Tue Mar 17 21:36:37 2026 -0400

    feat: add convenience API for bulk ingest (#4116)
    
    This PR adds a higher-level API for bulk ingest on AdbcConnection,
    wrapping the low-level statement option + bind + execute_update pattern.
    
    ```ts
    await conn.ingest('my_table', data)
    await conn.ingest('my_table', data, { mode: IngestMode.Append })
    await conn.ingest('my_table', data, { mode: IngestMode.Replace, dbSchema: 
'public' })
    ```
    
    Also fixes the underlying `bind()` implementation to use
    `AdbcStatementBindStream` instead of `AdbcStatementBind`, removing the
    previous single-batch limitation. `bind()` now accepts `Tables` with any
    number of batches.
    
    Changes
    - `AdbcConnection.ingest(tableName, data, options?)` convenience method
      - `IngestMode` and `IngestOptions` exported types
    - `AdbcStatement.bind()` now accepts `Table` only (was `RecordBatch |
    Table`)
    - `AdbcStatement.bind()` upgraded to use `AdbcStatementBindStream` in
    the Rust layer, removing the single-batch limitation
    
    
    **Test Plan**
    ```sh
    npm test
    ```
    
    
    closes #4100
---
 javascript/__test__/bind.spec.ts   |  37 +++++++++++---
 javascript/__test__/ingest.spec.ts | 101 +++++++++++++++++++++++++++++++++++++
 javascript/lib/index.ts            |  48 ++++++++++--------
 javascript/lib/types.ts            |  56 ++++++++++++++++++--
 javascript/src/client.rs           |  18 +------
 5 files changed, 213 insertions(+), 47 deletions(-)

diff --git a/javascript/__test__/bind.spec.ts b/javascript/__test__/bind.spec.ts
index 2599b5272..a4467ecd0 100644
--- a/javascript/__test__/bind.spec.ts
+++ b/javascript/__test__/bind.spec.ts
@@ -70,17 +70,40 @@ test('statement: bind and query data', async () => {
   assert.strictEqual(rowCount, 1)
 })
 
-test('statement: bind multi-batch table throws descriptive error', async () => 
{
-  // Both batches must share the same schema instance for Table to accept them
+test('statement: bind empty table inserts 0 rows', async () => {
+  const empty = tableFromArrays({ id: [] as number[], name: [] as string[] })
+  assert.strictEqual(empty.numRows, 0)
+
+  const stmt2 = await conn.createStatement()
+  await stmt2.setSqlQuery('INSERT INTO bind_test (id, name) VALUES (?, ?)')
+  await stmt2.bind(empty)
+  const affected = await stmt2.executeUpdate()
+  assert.strictEqual(affected, 0)
+  await stmt2.close()
+})
+
+test('statement: bind single-batch table', async () => {
+  const data = tableFromArrays({ id: [100], name: ['single'] })
+  assert.strictEqual(data.batches.length, 1)
+
+  const stmt2 = await conn.createStatement()
+  await stmt2.setSqlQuery('INSERT INTO bind_test (id, name) VALUES (?, ?)')
+  await stmt2.bind(data)
+  const affected = await stmt2.executeUpdate()
+  assert.strictEqual(affected, 1)
+  await stmt2.close()
+})
+
+test('statement: bind multi-batch table', async () => {
   const base = tableFromArrays({ id: [10], name: ['first'] })
   const batch1 = base.batches[0]
-  const batch2 = base.batches[0] // same schema, reused to construct a 
multi-batch Table
-  const multiTable = new Table([batch1, batch2])
+  const multiTable = new Table([batch1, batch1])
   assert.strictEqual(multiTable.batches.length, 2)
 
   const stmt2 = await conn.createStatement()
-  const error = await stmt2.bind(multiTable).catch((e) => e)
-  assert.ok(error instanceof Error)
-  assert.match(error.message, /bind\(\).*batches|batches.*bind\(\)/i)
+  await stmt2.setSqlQuery('INSERT INTO bind_test (id, name) VALUES (?, ?)')
+  await stmt2.bind(multiTable)
+  const affected = await stmt2.executeUpdate()
+  assert.strictEqual(affected, 2)
   await stmt2.close()
 })
diff --git a/javascript/__test__/ingest.spec.ts 
b/javascript/__test__/ingest.spec.ts
new file mode 100644
index 000000000..2cfc8c907
--- /dev/null
+++ b/javascript/__test__/ingest.spec.ts
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { test, before, after } from 'node:test'
+import assert from 'node:assert/strict'
+import { createSqliteDatabase } from './test_utils'
+import { AdbcDatabase, AdbcConnection, IngestMode } from '../lib/index.js'
+import { tableFromArrays, Table } from 'apache-arrow'
+
+let db: AdbcDatabase
+let conn: AdbcConnection
+
+before(async () => {
+  db = await createSqliteDatabase()
+  conn = await db.connect()
+})
+
+after(async () => {
+  try {
+    await conn?.close()
+    await db?.close()
+  } catch {
+    // ignore
+  }
+})
+
+test('ingest: create mode inserts data into a new table', async () => {
+  const data = tableFromArrays({ id: [1, 2, 3], name: ['alice', 'bob', 
'carol'] })
+  const rowCount = await conn.ingest('ingest_create', data)
+  assert.strictEqual(rowCount, 3)
+
+  const result = await conn.query('SELECT id, name FROM ingest_create ORDER BY 
id')
+  assert.strictEqual(result.numRows, 3)
+  assert.strictEqual(result.getChildAt(0)?.get(0), 1)
+  assert.strictEqual(result.getChildAt(1)?.get(0), 'alice')
+})
+
+test('ingest: create mode fails if table already exists', async () => {
+  const data = tableFromArrays({ id: [1] })
+  await conn.ingest('ingest_create_dup', data)
+  await assert.rejects(() => conn.ingest('ingest_create_dup', data))
+})
+
+test('ingest: append mode adds rows to an existing table', async () => {
+  const initial = tableFromArrays({ id: [1], name: ['alice'] })
+  await conn.ingest('ingest_append', initial)
+
+  const more = tableFromArrays({ id: [2], name: ['bob'] })
+  const rowCount = await conn.ingest('ingest_append', more, { mode: 
IngestMode.Append })
+  assert.strictEqual(rowCount, 1)
+
+  const result = await conn.query('SELECT id FROM ingest_append ORDER BY id')
+  assert.strictEqual(result.numRows, 2)
+})
+
+test('ingest: replace mode drops and recreates the table', async () => {
+  const initial = tableFromArrays({ id: [1, 2, 3] })
+  await conn.ingest('ingest_replace', initial)
+
+  const replacement = tableFromArrays({ id: [99] })
+  await conn.ingest('ingest_replace', replacement, { mode: IngestMode.Replace 
})
+
+  const result = await conn.query('SELECT id FROM ingest_replace')
+  assert.strictEqual(result.numRows, 1)
+  assert.strictEqual(result.getChildAt(0)?.get(0), 99)
+})
+
+test('ingest: multi-batch table inserts all batches', async () => {
+  const batch = tableFromArrays({ id: [1], name: ['alice'] }).batches[0]
+  const data = new Table([batch, batch])
+  assert.strictEqual(data.batches.length, 2)
+
+  const rowCount = await conn.ingest('ingest_multi_batch', data)
+  assert.strictEqual(rowCount, 2)
+
+  const result = await conn.query('SELECT id FROM ingest_multi_batch')
+  assert.strictEqual(result.numRows, 2)
+})
+
+test('ingest: create_append mode creates table if not exists then appends', 
async () => {
+  const data = tableFromArrays({ id: [1] })
+  await conn.ingest('ingest_create_append', data, { mode: 
IngestMode.CreateAppend })
+  await conn.ingest('ingest_create_append', data, { mode: 
IngestMode.CreateAppend })
+
+  const result = await conn.query('SELECT id FROM ingest_create_append')
+  assert.strictEqual(result.numRows, 2)
+})
diff --git a/javascript/lib/index.ts b/javascript/lib/index.ts
index 0215b104a..baa84326a 100644
--- a/javascript/lib/index.ts
+++ b/javascript/lib/index.ts
@@ -23,10 +23,11 @@ import type {
   AdbcStatement as AdbcStatementInterface,
   ConnectOptions,
   GetObjectsOptions,
+  IngestOptions,
 } from './types.js'
-import { LoadFlags, ObjectDepth, InfoCode } from './types.js'
+import { LoadFlags, ObjectDepth, InfoCode, IngestMode } from './types.js'
 
-import { RecordBatchReader, RecordBatch, Table, tableToIPC, Schema } from 
'apache-arrow'
+import { RecordBatch, RecordBatchReader, Table, tableToIPC, Schema } from 
'apache-arrow'
 import { AdbcError } from './error.js'
 
 // Safely define Symbol.asyncDispose for compatibility with Node.js 
environments older than v21.
@@ -75,8 +76,8 @@ async function iteratorToReader(iterator: NativeIterator): 
Promise<RecordBatchRe
 }
 
 // Export Options types, constants, and Error class
-export type { ConnectOptions, GetObjectsOptions }
-export { AdbcError, LoadFlags, ObjectDepth, InfoCode }
+export type { ConnectOptions, GetObjectsOptions, IngestOptions }
+export { AdbcError, LoadFlags, ObjectDepth, InfoCode, IngestMode }
 
 /**
  * Represents an ADBC Database.
@@ -214,6 +215,27 @@ export class AdbcConnection implements 
AdbcConnectionInterface {
     }
   }
 
+  async ingest(tableName: string, data: Table, options?: IngestOptions): 
Promise<number> {
+    const stmt = await this.createStatement()
+    try {
+      stmt.setOption('adbc.ingest.target_table', tableName)
+      stmt.setOption('adbc.ingest.mode', options?.mode ?? IngestMode.Create)
+      if (options?.catalog !== undefined) {
+        stmt.setOption('adbc.ingest.target_catalog', options.catalog)
+      }
+      if (options?.dbSchema !== undefined) {
+        stmt.setOption('adbc.ingest.target_db_schema', options.dbSchema)
+      }
+      if (options?.temporary === true) {
+        stmt.setOption('adbc.ingest.temporary', 'true')
+      }
+      await stmt.bind(data)
+      return await stmt.executeUpdate()
+    } finally {
+      await stmt.close()
+    }
+  }
+
   async execute(sql: string, params?: Table): Promise<number> {
     const stmt = await this.createStatement()
     try {
@@ -306,23 +328,9 @@ export class AdbcStatement implements 
AdbcStatementInterface {
     }
   }
 
-  async bind(data: RecordBatch | Table): Promise<void> {
+  async bind(data: Table): Promise<void> {
     try {
-      let table: Table
-      if (data instanceof Table) {
-        table = data
-      } else {
-        table = new Table(data)
-      }
-
-      if (table.batches.length > 1) {
-        throw new Error(
-          `bind() requires a single-batch Table or RecordBatch, but received 
${table.batches.length} batches. ` +
-            `Concatenate the table into one batch first (e.g. 
tableFromArrays(...)).`,
-        )
-      }
-
-      const ipcBytes = tableToIPC(table, 'stream')
+      const ipcBytes = tableToIPC(data, 'stream')
       await this._inner.bind(Buffer.from(ipcBytes))
     } catch (e) {
       throw AdbcError.fromError(e)
diff --git a/javascript/lib/types.ts b/javascript/lib/types.ts
index 524d98d73..59fb7487e 100644
--- a/javascript/lib/types.ts
+++ b/javascript/lib/types.ts
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-import { RecordBatch, RecordBatchReader, Table, Schema } from 'apache-arrow'
+import { RecordBatchReader, Table, Schema } from 'apache-arrow'
 
 /**
  * Bitmask flags controlling how the driver manager resolves a driver name.
@@ -144,6 +144,41 @@ export interface ConnectOptions {
   databaseOptions?: Record<string, string>
 }
 
+/**
+ * Ingestion modes for the `ingest` convenience method.
+ *
+ * These correspond to the `adbc.ingest.mode.*` option values in the ADBC spec.
+ *
+ * @example
+ * await conn.ingest('my_table', data, { mode: IngestMode.Append })
+ */
+export const IngestMode = {
+  /** Append to an existing table. Fails if the table does not exist. */
+  Append: 'adbc.ingest.mode.append',
+  /** Create a new table and insert. Fails if the table already exists. */
+  Create: 'adbc.ingest.mode.create',
+  /** Create the table if it does not exist, then append. */
+  CreateAppend: 'adbc.ingest.mode.create_append',
+  /** Drop the existing table (if any) and recreate it, then insert. */
+  Replace: 'adbc.ingest.mode.replace',
+} as const
+export type IngestMode = (typeof IngestMode)[keyof typeof IngestMode]
+
+/** Options for the `ingest` convenience method. */
+export interface IngestOptions {
+  /**
+   * How to handle an existing table.
+   * Defaults to {@link IngestMode.Create}.
+   */
+  mode?: IngestMode
+  /** The catalog to create/locate the target table in (optional). */
+  catalog?: string
+  /** The database schema to create/locate the target table in (optional). */
+  dbSchema?: string
+  /** Whether to ingest into a temporary table (optional). */
+  temporary?: boolean
+}
+
 /** Options for getObjects metadata call. */
 export interface GetObjectsOptions {
   /**
@@ -282,6 +317,20 @@ export interface AdbcConnection {
    */
   queryStream(sql: string, params?: Table): Promise<RecordBatchReader>
 
+  /**
+   * Ingest Arrow data into a database table.
+   *
+   * Convenience method that sets the ingestion options, binds the data, and
+   * calls executeUpdate. Depending on the driver, this can avoid per-row
+   * overhead compared to a prepare-bind-insert loop.
+   *
+   * @param tableName The target table name.
+   * @param data Arrow Table to ingest.
+   * @param options Ingestion options (mode, catalog, dbSchema, temporary).
+   * @returns A Promise resolving to the number of rows ingested, or -1 if 
unknown.
+   */
+  ingest(tableName: string, data: Table, options?: IngestOptions): 
Promise<number>
+
   /**
    * Execute a SQL statement (INSERT, UPDATE, DELETE, DDL) and return the row 
count.
    *
@@ -352,12 +401,11 @@ export interface AdbcStatement {
   /**
    * Bind parameters or data for ingestion.
    *
-   * This binds an Arrow RecordBatch or Table to the statement.
    * This is used for bulk ingestion or parameterized queries.
    *
-   * @param data Arrow RecordBatch or Table containing the data to bind.
+   * @param data Arrow Table containing the data to bind.
    */
-  bind(data: RecordBatch | Table): Promise<void>
+  bind(data: Table): Promise<void>
 
   /**
    * Close the statement and release resources.
diff --git a/javascript/src/client.rs b/javascript/src/client.rs
index 3bc2e8dbb..b6bc9ac1e 100644
--- a/javascript/src/client.rs
+++ b/javascript/src/client.rs
@@ -282,23 +282,9 @@ impl AdbcStatementCore {
   }
 
   pub fn bind(&mut self, c_data: Vec<u8>) -> Result<()> {
-    let mut reader =
+    let reader =
       StreamReader::try_new(std::io::Cursor::new(c_data), 
None).map_err(ClientError::Arrow)?;
-    let batch = match reader.next() {
-      Some(Ok(b)) => b,
-      Some(Err(e)) => return Err(ClientError::Arrow(e)),
-      None => {
-        return Err(ClientError::Other(
-          "bind() received an empty record batch stream".to_string(),
-        ))
-      }
-    };
-    if reader.next().is_some() {
-      return Err(ClientError::Other(
-        "bind() received multiple record batches; concatenate into one batch 
first".to_string(),
-      ));
-    }
-    self.inner.bind(batch)?;
+    self.inner.bind_stream(Box::new(reader))?;
     Ok(())
   }
 }

Reply via email to