This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git


The following commit(s) were added to refs/heads/main by this push:
     new e3b4655  [C] Validate statement behaviors in test suite (#114)
e3b4655 is described below

commit e3b4655ef2394db19cffd720f5f00a1394dcfdd5
Author: David Li <[email protected]>
AuthorDate: Thu Sep 8 11:37:51 2022 -0400

    [C] Validate statement behaviors in test suite (#114)
    
    Fixes #67.
---
 c/driver_manager/adbc_driver_manager_test.cc |   5 +-
 c/drivers/postgres/postgres_test.cc          |   8 ++
 c/drivers/postgres/statement.cc              |  15 +++
 c/drivers/postgres/type.cc                   |   2 +
 c/drivers/sqlite/sqlite_test.cc              |  93 +---------------
 c/validation/adbc_validation.cc              | 158 +++++++++++++++++++++++++++
 c/validation/adbc_validation.h               |  14 ++-
 7 files changed, 204 insertions(+), 91 deletions(-)

diff --git a/c/driver_manager/adbc_driver_manager_test.cc 
b/c/driver_manager/adbc_driver_manager_test.cc
index f6a4618..c2545ae 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -96,10 +96,13 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
         res != 0) {
       return res;
     }
-    return AdbcDatabaseSetOption(database, "filename", ":memory:", error);
+    return AdbcDatabaseSetOption(
+        database, "filename", 
"file:Sqlite_Transactions?mode=memory&cache=shared", error);
   }
 
   std::string BindParameter(int index) const override { return "?"; }
+
+  bool supports_concurrent_statements() const override { return true; }
 };
 
 class SqliteDatabaseTest : public ::testing::Test, public 
adbc_validation::DatabaseTest {
diff --git a/c/drivers/postgres/postgres_test.cc 
b/c/drivers/postgres/postgres_test.cc
index 59e1978..8605aaf 100644
--- a/c/drivers/postgres/postgres_test.cc
+++ b/c/drivers/postgres/postgres_test.cc
@@ -103,6 +103,14 @@ class PostgresStatementTest : public ::testing::Test,
   void TestSqlPrepareSelectNoParams() { GTEST_SKIP() << "Not yet implemented"; 
}
   void TestSqlPrepareSelectParams() { GTEST_SKIP() << "Not yet implemented"; }
 
+  void TestConcurrentStatements() {
+    // TODO: refactor driver so that we read all the data as soon as
+    // we ExecuteQuery() since that's how libpq already works - then
+    // we can actually support concurrent statements (because there is
+    // no concurrency)
+    GTEST_SKIP() << "Not yet implemented";
+  }
+
  protected:
   PostgresQuirks quirks_;
 };
diff --git a/c/drivers/postgres/statement.cc b/c/drivers/postgres/statement.cc
index f061169..ecca6c2 100644
--- a/c/drivers/postgres/statement.cc
+++ b/c/drivers/postgres/statement.cc
@@ -248,6 +248,10 @@ struct BindStream {
           pg_type = PgType::kInt8;
           param_lengths[i] = 8;
           break;
+        case ArrowType::NANOARROW_TYPE_STRING:
+          pg_type = PgType::kText;
+          param_lengths[i] = 0;
+          break;
         default:
           // TODO: data type to string
           SetError(error, "Field #", i + 1, " ('", 
bind_schema->children[i]->name,
@@ -326,6 +330,14 @@ struct BindStream {
               std::memcpy(param_values[col], &value, sizeof(int64_t));
               break;
             }
+            case ArrowType::NANOARROW_TYPE_STRING: {
+              const ArrowBufferView view =
+                  ArrowArrayViewGetBytesUnsafe(array_view->children[col], row);
+              // TODO: overflow check?
+              param_lengths[col] = static_cast<int>(view.n_bytes);
+              param_values[col] = const_cast<char*>(view.data.as_char);
+              break;
+            }
             default:
               // TODO: data type to string
               SetError(error, "Field #", col + 1, " ('", 
bind_schema->children[col]->name,
@@ -692,6 +704,9 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
       case ArrowType::NANOARROW_TYPE_INT64:
         create += " BIGINT";
         break;
+      case ArrowType::NANOARROW_TYPE_STRING:
+        create += " TEXT";
+        break;
       default:
         // TODO: data type to string
         SetError(error, "Field #", i + 1, " ('", 
source_schema.children[i]->name,
diff --git a/c/drivers/postgres/type.cc b/c/drivers/postgres/type.cc
index 0c4a731..0a77016 100644
--- a/c/drivers/postgres/type.cc
+++ b/c/drivers/postgres/type.cc
@@ -30,6 +30,8 @@ void TypeMapping::Insert(uint32_t oid, const char* typname, 
const char* typrecei
   if (std::strcmp(typname, "int8") == 0) {
     // DCHECK_EQ(type, PgType::kInt8);
     canonical_types[PgType::kInt8] = oid;
+  } else if (std::strcmp(typname, "text") == 0) {
+    canonical_types[PgType::kText] = oid;
   }
   // TODO: fill in remainder
 }
diff --git a/c/drivers/sqlite/sqlite_test.cc b/c/drivers/sqlite/sqlite_test.cc
index 814d848..eb60833 100644
--- a/c/drivers/sqlite/sqlite_test.cc
+++ b/c/drivers/sqlite/sqlite_test.cc
@@ -235,101 +235,18 @@ TEST_F(Sqlite, MetadataGetObjectsColumns) {
   batches.clear();
 }
 
-TEST_F(Sqlite, Transactions) {
-  // For this test, we explicitly want a shared DB
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseRelease(&database, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseNew(&database, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(
-      error,
-      AdbcDatabaseSetOption(&database, "filename",
-                            
"file:Sqlite_Transactions?mode=memory&cache=shared", &error));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseInit(&database, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&connection, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection, &database, 
&error));
-
-  struct AdbcConnection connection2;
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&connection2, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &database, 
&error));
-  ASSERT_NE(connection.private_data, nullptr);
-
-  AdbcStatement statement;
-  std::memset(&statement, 0, sizeof(statement));
-
-  const char* query = "SELECT * FROM bulk_insert";
-
-  // Invalid option value
-  ASSERT_NE(ADBC_STATUS_OK,
-            AdbcConnectionSetOption(&connection, 
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
-                                    "invalid", &error));
-
-  // Can't call commit/rollback without disabling autocommit
-  ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionCommit(&connection, 
&error));
-  ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionRollback(&connection, 
&error));
-  error.release(&error);
-
-  // Ensure it's idempotent
-  ADBC_ASSERT_OK_WITH_ERROR(
-      error, AdbcConnectionSetOption(&connection, 
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
-                                     ADBC_OPTION_VALUE_ENABLED, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(
-      error, AdbcConnectionSetOption(&connection, 
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
-                                     ADBC_OPTION_VALUE_ENABLED, &error));
-
-  ADBC_ASSERT_OK_WITH_ERROR(
-      error, AdbcConnectionSetOption(&connection, 
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
-                                     ADBC_OPTION_VALUE_DISABLED, &error));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
-
-  // Uncommitted change
-  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
-
-  // SQLite prevents us from executing the query
-  {
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, 
&statement, &error));
-    ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query, 
&error));
-    ASSERT_THAT(error.message, ::testing::HasSubstr("database schema is 
locked"));
-    error.release(&error);
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
-  }
-
-  // Rollback
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRollback(&connection, 
&error));
-
-  // Now nothing's visible
-  {
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, 
&statement, &error));
-    ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query, 
&error));
-    ASSERT_THAT(error.message, ::testing::HasSubstr("no such table"));
-    error.release(&error);
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
-  }
-
-  // Commit, should now be visible on other connection
-  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
-
-  {
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2, 
&statement, &error));
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementSetSqlQuery(&statement, 
query, &error));
-    struct ArrowArrayStream stream;
-    ADBC_ASSERT_OK_WITH_ERROR(
-        error, AdbcStatementExecuteQuery(&statement, &stream, nullptr, 
&error));
-    stream.release(&stream);
-    ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
-  }
-
-  ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection2, 
&error));
-}
-
 class SqliteQuirks : public adbc_validation::DriverQuirks {
  public:
   AdbcStatusCode SetupDatabase(struct AdbcDatabase* database,
                                struct AdbcError* error) const override {
-    return AdbcDatabaseSetOption(database, "filename", ":memory:", error);
+    // Shared DB required for transaction tests
+    return AdbcDatabaseSetOption(
+        database, "filename", 
"file:Sqlite_Transactions?mode=memory&cache=shared", error);
   }
 
   std::string BindParameter(int index) const override { return "?"; }
+
+  bool supports_concurrent_statements() const override { return true; }
 };
 
 class SqliteDatabaseTest : public ::testing::Test, public 
adbc_validation::DatabaseTest {
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index ad79781..8a6ef23 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -19,6 +19,7 @@
 
 #include <cerrno>
 #include <cstring>
+#include <iostream>
 #include <optional>
 #include <string>
 #include <tuple>
@@ -192,6 +193,32 @@ struct Releaser<struct ArrowArrayView> {
   }
 };
 
+template <>
+struct Releaser<struct AdbcConnection> {
+  static void Release(struct AdbcConnection* value) {
+    if (value->private_data) {
+      struct AdbcError error = {};
+      auto status = AdbcConnectionRelease(value, &error);
+      if (status != ADBC_STATUS_OK) {
+        FAIL() << AdbcvErrorRepr(status) << ": " << AdbcvErrorRepr(&error);
+      }
+    }
+  }
+};
+
+template <>
+struct Releaser<struct AdbcStatement> {
+  static void Release(struct AdbcStatement* value) {
+    if (value->private_data) {
+      struct AdbcError error = {};
+      auto status = AdbcStatementRelease(value, &error);
+      if (status != ADBC_STATUS_OK) {
+        FAIL() << AdbcvErrorRepr(status) << ": " << AdbcvErrorRepr(&error);
+      }
+    }
+  }
+};
+
 template <typename Resource>
 struct Handle {
   Resource value;
@@ -1775,6 +1802,137 @@ void StatementTest::TestSqlQueryErrors() {
   ASSERT_NE(ADBC_STATUS_OK, code);
 }
 
+void StatementTest::TestTransactions() {
+  ASSERT_OK(&error, quirks()->DropTable(&connection, "bulk_ingest", &error));
+
+  Handle<struct AdbcConnection> connection2;
+  ASSERT_OK(&error, AdbcConnectionNew(&connection2.value, &error));
+  ASSERT_OK(&error, AdbcConnectionInit(&connection2.value, &database, &error));
+
+  ASSERT_OK(&error,
+            AdbcConnectionSetOption(&connection, 
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+                                    ADBC_OPTION_VALUE_DISABLED, &error));
+
+  // Uncommitted change
+  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
+
+  // Query on first connection should succeed
+  {
+    Handle<struct AdbcStatement> statement;
+    StreamReader reader;
+
+    ASSERT_OK(&error, AdbcStatementNew(&connection, &statement.value, &error));
+    ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement.value,
+                                               "SELECT * FROM bulk_ingest", 
&error));
+    ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement.value, 
&reader.stream.value,
+                                                &reader.rows_affected, 
&error));
+    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+  }
+
+  if (error.release) error.release(&error);
+
+  // Query on second connection should fail
+  ASSERT_FAILS(&error, ([&]() -> AdbcStatusCode {
+    Handle<struct AdbcStatement> statement;
+    StreamReader reader;
+
+    CHECK_OK(AdbcStatementNew(&connection2.value, &statement.value, &error));
+    CHECK_OK(
+        AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM 
bulk_ingest", &error));
+    CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
+                                       &reader.rows_affected, &error));
+    return ADBC_STATUS_OK;
+  })());
+
+  if (error.release) {
+    std::cerr << "Failure message: " << error.message << std::endl;
+    error.release(&error);
+  }
+
+  // Rollback
+  ASSERT_OK(&error, AdbcConnectionRollback(&connection, &error));
+
+  // Query on first connection should fail
+  ASSERT_FAILS(&error, ([&]() -> AdbcStatusCode {
+    Handle<struct AdbcStatement> statement;
+    StreamReader reader;
+
+    CHECK_OK(AdbcStatementNew(&connection, &statement.value, &error));
+    CHECK_OK(
+        AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM 
bulk_ingest", &error));
+    CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
+                                       &reader.rows_affected, &error));
+    return ADBC_STATUS_OK;
+  })());
+
+  // Commit
+  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
+  ASSERT_OK(&error, AdbcConnectionCommit(&connection, &error));
+
+  // Query on second connection should succeed
+  {
+    Handle<struct AdbcStatement> statement;
+    StreamReader reader;
+
+    ASSERT_OK(&error, AdbcStatementNew(&connection2.value, &statement.value, 
&error));
+    ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement.value,
+                                               "SELECT * FROM bulk_ingest", 
&error));
+    ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement.value, 
&reader.stream.value,
+                                                &reader.rows_affected, 
&error));
+    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+  }
+}
+
+void StatementTest::TestConcurrentStatements() {
+  Handle<struct AdbcStatement> statement1;
+  Handle<struct AdbcStatement> statement2;
+
+  ASSERT_OK(&error, AdbcStatementNew(&connection, &statement1.value, &error));
+  ASSERT_OK(&error, AdbcStatementNew(&connection, &statement2.value, &error));
+
+  ASSERT_OK(&error,
+            AdbcStatementSetSqlQuery(&statement1.value, "SELECT 
'SaShiSuSeSo'", &error));
+  ASSERT_OK(&error,
+            AdbcStatementSetSqlQuery(&statement2.value, "SELECT 
'SaShiSuSeSo'", &error));
+
+  StreamReader reader1;
+  StreamReader reader2;
+  ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement1.value, 
&reader1.stream.value,
+                                              &reader1.rows_affected, &error));
+
+  if (quirks()->supports_concurrent_statements()) {
+    ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement2.value, 
&reader2.stream.value,
+                                                &reader2.rows_affected, 
&error));
+    ASSERT_NO_FATAL_FAILURE(reader2.GetSchema());
+  } else {
+    ASSERT_FAILS(&error,
+                 AdbcStatementExecuteQuery(&statement2.value, 
&reader2.stream.value,
+                                           &reader2.rows_affected, &error));
+    ASSERT_EQ(nullptr, reader2.stream.value.release);
+  }
+  // Original stream should still be valid
+  ASSERT_NO_FATAL_FAILURE(reader1.GetSchema());
+}
+
+void StatementTest::TestResultInvalidation() {
+  // Start reading from a statement, then overwrite it
+  ASSERT_OK(&error, AdbcStatementNew(&connection, &statement, &error));
+  ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error));
+
+  StreamReader reader1;
+  StreamReader reader2;
+  ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement, 
&reader1.stream.value,
+                                              &reader1.rows_affected, &error));
+  ASSERT_NO_FATAL_FAILURE(reader1.GetSchema());
+
+  ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement, 
&reader2.stream.value,
+                                              &reader2.rows_affected, &error));
+  ASSERT_NO_FATAL_FAILURE(reader2.GetSchema());
+
+  // First reader should not fail, but may give no data
+  ASSERT_NO_FATAL_FAILURE(reader1.Next());
+}
+
 #undef ADBCV_CONCAT
 #undef ADBCV_NAME
 #undef ADBCV_FAILS_WITH_IMPL
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 156634a..26e5f7f 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -46,6 +46,10 @@ class DriverQuirks {
 
   /// \brief Return the SQL to reference the bind parameter of the given index
   virtual std::string BindParameter(int index) const { return "?"; }
+
+  /// \brief Whether two statements can be used at the same time on a
+  ///   single connection
+  virtual bool supports_concurrent_statements() const { return false; }
 };
 
 class DatabaseTest {
@@ -153,7 +157,10 @@ class StatementTest {
 
   void TestSqlQueryErrors();
 
-  // TODO: transactions
+  void TestTransactions();
+
+  void TestConcurrentStatements();
+  void TestResultInvalidation();
 
  protected:
   struct AdbcError error;
@@ -183,7 +190,10 @@ class StatementTest {
   TEST_F(FIXTURE, SqlQueryInts) { TestSqlQueryInts(); }                        
         \
   TEST_F(FIXTURE, SqlQueryFloats) { TestSqlQueryFloats(); }                    
         \
   TEST_F(FIXTURE, SqlQueryStrings) { TestSqlQueryStrings(); }                  
         \
-  TEST_F(FIXTURE, SqlQueryErrors) { TestSqlQueryErrors(); }
+  TEST_F(FIXTURE, SqlQueryErrors) { TestSqlQueryErrors(); }                    
         \
+  TEST_F(FIXTURE, Transactions) { TestTransactions(); }                        
         \
+  TEST_F(FIXTURE, ConcurrentStatements) { TestConcurrentStatements(); }        
         \
+  TEST_F(FIXTURE, ResultInvalidation) { TestResultInvalidation(); }
 
 }  // namespace adbc_validation
 

Reply via email to