This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-adbc.git
The following commit(s) were added to refs/heads/main by this push:
new e3b4655 [C] Validate statement behaviors in test suite (#114)
e3b4655 is described below
commit e3b4655ef2394db19cffd720f5f00a1394dcfdd5
Author: David Li <[email protected]>
AuthorDate: Thu Sep 8 11:37:51 2022 -0400
[C] Validate statement behaviors in test suite (#114)
Fixes #67.
---
c/driver_manager/adbc_driver_manager_test.cc | 5 +-
c/drivers/postgres/postgres_test.cc | 8 ++
c/drivers/postgres/statement.cc | 15 +++
c/drivers/postgres/type.cc | 2 +
c/drivers/sqlite/sqlite_test.cc | 93 +---------------
c/validation/adbc_validation.cc | 158 +++++++++++++++++++++++++++
c/validation/adbc_validation.h | 14 ++-
7 files changed, 204 insertions(+), 91 deletions(-)
diff --git a/c/driver_manager/adbc_driver_manager_test.cc
b/c/driver_manager/adbc_driver_manager_test.cc
index f6a4618..c2545ae 100644
--- a/c/driver_manager/adbc_driver_manager_test.cc
+++ b/c/driver_manager/adbc_driver_manager_test.cc
@@ -96,10 +96,13 @@ class SqliteQuirks : public adbc_validation::DriverQuirks {
res != 0) {
return res;
}
- return AdbcDatabaseSetOption(database, "filename", ":memory:", error);
+ return AdbcDatabaseSetOption(
+ database, "filename",
"file:Sqlite_Transactions?mode=memory&cache=shared", error);
}
std::string BindParameter(int index) const override { return "?"; }
+
+ bool supports_concurrent_statements() const override { return true; }
};
class SqliteDatabaseTest : public ::testing::Test, public
adbc_validation::DatabaseTest {
diff --git a/c/drivers/postgres/postgres_test.cc
b/c/drivers/postgres/postgres_test.cc
index 59e1978..8605aaf 100644
--- a/c/drivers/postgres/postgres_test.cc
+++ b/c/drivers/postgres/postgres_test.cc
@@ -103,6 +103,14 @@ class PostgresStatementTest : public ::testing::Test,
void TestSqlPrepareSelectNoParams() { GTEST_SKIP() << "Not yet implemented";
}
void TestSqlPrepareSelectParams() { GTEST_SKIP() << "Not yet implemented"; }
+ void TestConcurrentStatements() {
+ // TODO: refactor driver so that we read all the data as soon as
+ // we ExecuteQuery() since that's how libpq already works - then
+ // we can actually support concurrent statements (because there is
+ // no concurrency)
+ GTEST_SKIP() << "Not yet implemented";
+ }
+
protected:
PostgresQuirks quirks_;
};
diff --git a/c/drivers/postgres/statement.cc b/c/drivers/postgres/statement.cc
index f061169..ecca6c2 100644
--- a/c/drivers/postgres/statement.cc
+++ b/c/drivers/postgres/statement.cc
@@ -248,6 +248,10 @@ struct BindStream {
pg_type = PgType::kInt8;
param_lengths[i] = 8;
break;
+ case ArrowType::NANOARROW_TYPE_STRING:
+ pg_type = PgType::kText;
+ param_lengths[i] = 0;
+ break;
default:
// TODO: data type to string
SetError(error, "Field #", i + 1, " ('",
bind_schema->children[i]->name,
@@ -326,6 +330,14 @@ struct BindStream {
std::memcpy(param_values[col], &value, sizeof(int64_t));
break;
}
+ case ArrowType::NANOARROW_TYPE_STRING: {
+ const ArrowBufferView view =
+ ArrowArrayViewGetBytesUnsafe(array_view->children[col], row);
+ // TODO: overflow check?
+ param_lengths[col] = static_cast<int>(view.n_bytes);
+ param_values[col] = const_cast<char*>(view.data.as_char);
+ break;
+ }
default:
// TODO: data type to string
SetError(error, "Field #", col + 1, " ('",
bind_schema->children[col]->name,
@@ -692,6 +704,9 @@ AdbcStatusCode PostgresStatement::CreateBulkTable(
case ArrowType::NANOARROW_TYPE_INT64:
create += " BIGINT";
break;
+ case ArrowType::NANOARROW_TYPE_STRING:
+ create += " TEXT";
+ break;
default:
// TODO: data type to string
SetError(error, "Field #", i + 1, " ('",
source_schema.children[i]->name,
diff --git a/c/drivers/postgres/type.cc b/c/drivers/postgres/type.cc
index 0c4a731..0a77016 100644
--- a/c/drivers/postgres/type.cc
+++ b/c/drivers/postgres/type.cc
@@ -30,6 +30,8 @@ void TypeMapping::Insert(uint32_t oid, const char* typname,
const char* typrecei
if (std::strcmp(typname, "int8") == 0) {
// DCHECK_EQ(type, PgType::kInt8);
canonical_types[PgType::kInt8] = oid;
+ } else if (std::strcmp(typname, "text") == 0) {
+ canonical_types[PgType::kText] = oid;
}
// TODO: fill in remainder
}
diff --git a/c/drivers/sqlite/sqlite_test.cc b/c/drivers/sqlite/sqlite_test.cc
index 814d848..eb60833 100644
--- a/c/drivers/sqlite/sqlite_test.cc
+++ b/c/drivers/sqlite/sqlite_test.cc
@@ -235,101 +235,18 @@ TEST_F(Sqlite, MetadataGetObjectsColumns) {
batches.clear();
}
-TEST_F(Sqlite, Transactions) {
- // For this test, we explicitly want a shared DB
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseRelease(&database, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseNew(&database, &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error,
- AdbcDatabaseSetOption(&database, "filename",
-
"file:Sqlite_Transactions?mode=memory&cache=shared", &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcDatabaseInit(&database, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&connection, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection, &database,
&error));
-
- struct AdbcConnection connection2;
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionNew(&connection2, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionInit(&connection2, &database,
&error));
- ASSERT_NE(connection.private_data, nullptr);
-
- AdbcStatement statement;
- std::memset(&statement, 0, sizeof(statement));
-
- const char* query = "SELECT * FROM bulk_insert";
-
- // Invalid option value
- ASSERT_NE(ADBC_STATUS_OK,
- AdbcConnectionSetOption(&connection,
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
- "invalid", &error));
-
- // Can't call commit/rollback without disabling autocommit
- ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionCommit(&connection,
&error));
- ASSERT_EQ(ADBC_STATUS_INVALID_STATE, AdbcConnectionRollback(&connection,
&error));
- error.release(&error);
-
- // Ensure it's idempotent
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcConnectionSetOption(&connection,
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
- ADBC_OPTION_VALUE_ENABLED, &error));
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcConnectionSetOption(&connection,
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
- ADBC_OPTION_VALUE_ENABLED, &error));
-
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcConnectionSetOption(&connection,
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
- ADBC_OPTION_VALUE_DISABLED, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
-
- // Uncommitted change
- ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
-
- // SQLite prevents us from executing the query
- {
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2,
&statement, &error));
- ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query,
&error));
- ASSERT_THAT(error.message, ::testing::HasSubstr("database schema is
locked"));
- error.release(&error);
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
- }
-
- // Rollback
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRollback(&connection,
&error));
-
- // Now nothing's visible
- {
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2,
&statement, &error));
- ASSERT_NE(ADBC_STATUS_OK, AdbcStatementSetSqlQuery(&statement, query,
&error));
- ASSERT_THAT(error.message, ::testing::HasSubstr("no such table"));
- error.release(&error);
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
- }
-
- // Commit, should now be visible on other connection
- ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionCommit(&connection, &error));
-
- {
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementNew(&connection2,
&statement, &error));
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementSetSqlQuery(&statement,
query, &error));
- struct ArrowArrayStream stream;
- ADBC_ASSERT_OK_WITH_ERROR(
- error, AdbcStatementExecuteQuery(&statement, &stream, nullptr,
&error));
- stream.release(&stream);
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcStatementRelease(&statement, &error));
- }
-
- ADBC_ASSERT_OK_WITH_ERROR(error, AdbcConnectionRelease(&connection2,
&error));
-}
-
class SqliteQuirks : public adbc_validation::DriverQuirks {
public:
AdbcStatusCode SetupDatabase(struct AdbcDatabase* database,
struct AdbcError* error) const override {
- return AdbcDatabaseSetOption(database, "filename", ":memory:", error);
+ // Shared DB required for transaction tests
+ return AdbcDatabaseSetOption(
+ database, "filename",
"file:Sqlite_Transactions?mode=memory&cache=shared", error);
}
std::string BindParameter(int index) const override { return "?"; }
+
+ bool supports_concurrent_statements() const override { return true; }
};
class SqliteDatabaseTest : public ::testing::Test, public
adbc_validation::DatabaseTest {
diff --git a/c/validation/adbc_validation.cc b/c/validation/adbc_validation.cc
index ad79781..8a6ef23 100644
--- a/c/validation/adbc_validation.cc
+++ b/c/validation/adbc_validation.cc
@@ -19,6 +19,7 @@
#include <cerrno>
#include <cstring>
+#include <iostream>
#include <optional>
#include <string>
#include <tuple>
@@ -192,6 +193,32 @@ struct Releaser<struct ArrowArrayView> {
}
};
+template <>
+struct Releaser<struct AdbcConnection> {
+ static void Release(struct AdbcConnection* value) {
+ if (value->private_data) {
+ struct AdbcError error = {};
+ auto status = AdbcConnectionRelease(value, &error);
+ if (status != ADBC_STATUS_OK) {
+ FAIL() << AdbcvErrorRepr(status) << ": " << AdbcvErrorRepr(&error);
+ }
+ }
+ }
+};
+
+template <>
+struct Releaser<struct AdbcStatement> {
+ static void Release(struct AdbcStatement* value) {
+ if (value->private_data) {
+ struct AdbcError error = {};
+ auto status = AdbcStatementRelease(value, &error);
+ if (status != ADBC_STATUS_OK) {
+ FAIL() << AdbcvErrorRepr(status) << ": " << AdbcvErrorRepr(&error);
+ }
+ }
+ }
+};
+
template <typename Resource>
struct Handle {
Resource value;
@@ -1775,6 +1802,137 @@ void StatementTest::TestSqlQueryErrors() {
ASSERT_NE(ADBC_STATUS_OK, code);
}
+void StatementTest::TestTransactions() {
+ ASSERT_OK(&error, quirks()->DropTable(&connection, "bulk_ingest", &error));
+
+ Handle<struct AdbcConnection> connection2;
+ ASSERT_OK(&error, AdbcConnectionNew(&connection2.value, &error));
+ ASSERT_OK(&error, AdbcConnectionInit(&connection2.value, &database, &error));
+
+ ASSERT_OK(&error,
+ AdbcConnectionSetOption(&connection,
ADBC_CONNECTION_OPTION_AUTOCOMMIT,
+ ADBC_OPTION_VALUE_DISABLED, &error));
+
+ // Uncommitted change
+ ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
+
+ // Query on first connection should succeed
+ {
+ Handle<struct AdbcStatement> statement;
+ StreamReader reader;
+
+ ASSERT_OK(&error, AdbcStatementNew(&connection, &statement.value, &error));
+ ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement.value,
+ "SELECT * FROM bulk_ingest",
&error));
+ ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement.value,
&reader.stream.value,
+ &reader.rows_affected,
&error));
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+ }
+
+ if (error.release) error.release(&error);
+
+ // Query on second connection should fail
+ ASSERT_FAILS(&error, ([&]() -> AdbcStatusCode {
+ Handle<struct AdbcStatement> statement;
+ StreamReader reader;
+
+ CHECK_OK(AdbcStatementNew(&connection2.value, &statement.value, &error));
+ CHECK_OK(
+ AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM
bulk_ingest", &error));
+ CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
+ &reader.rows_affected, &error));
+ return ADBC_STATUS_OK;
+ })());
+
+ if (error.release) {
+ std::cerr << "Failure message: " << error.message << std::endl;
+ error.release(&error);
+ }
+
+ // Rollback
+ ASSERT_OK(&error, AdbcConnectionRollback(&connection, &error));
+
+ // Query on first connection should fail
+ ASSERT_FAILS(&error, ([&]() -> AdbcStatusCode {
+ Handle<struct AdbcStatement> statement;
+ StreamReader reader;
+
+ CHECK_OK(AdbcStatementNew(&connection, &statement.value, &error));
+ CHECK_OK(
+ AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM
bulk_ingest", &error));
+ CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
+ &reader.rows_affected, &error));
+ return ADBC_STATUS_OK;
+ })());
+
+ // Commit
+ ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
+ ASSERT_OK(&error, AdbcConnectionCommit(&connection, &error));
+
+ // Query on second connection should succeed
+ {
+ Handle<struct AdbcStatement> statement;
+ StreamReader reader;
+
+ ASSERT_OK(&error, AdbcStatementNew(&connection2.value, &statement.value,
&error));
+ ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement.value,
+ "SELECT * FROM bulk_ingest",
&error));
+ ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement.value,
&reader.stream.value,
+ &reader.rows_affected,
&error));
+ ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
+ }
+}
+
+void StatementTest::TestConcurrentStatements() {
+ Handle<struct AdbcStatement> statement1;
+ Handle<struct AdbcStatement> statement2;
+
+ ASSERT_OK(&error, AdbcStatementNew(&connection, &statement1.value, &error));
+ ASSERT_OK(&error, AdbcStatementNew(&connection, &statement2.value, &error));
+
+ ASSERT_OK(&error,
+ AdbcStatementSetSqlQuery(&statement1.value, "SELECT
'SaShiSuSeSo'", &error));
+ ASSERT_OK(&error,
+ AdbcStatementSetSqlQuery(&statement2.value, "SELECT
'SaShiSuSeSo'", &error));
+
+ StreamReader reader1;
+ StreamReader reader2;
+ ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement1.value,
&reader1.stream.value,
+ &reader1.rows_affected, &error));
+
+ if (quirks()->supports_concurrent_statements()) {
+ ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement2.value,
&reader2.stream.value,
+ &reader2.rows_affected,
&error));
+ ASSERT_NO_FATAL_FAILURE(reader2.GetSchema());
+ } else {
+ ASSERT_FAILS(&error,
+ AdbcStatementExecuteQuery(&statement2.value,
&reader2.stream.value,
+ &reader2.rows_affected, &error));
+ ASSERT_EQ(nullptr, reader2.stream.value.release);
+ }
+ // Original stream should still be valid
+ ASSERT_NO_FATAL_FAILURE(reader1.GetSchema());
+}
+
+void StatementTest::TestResultInvalidation() {
+ // Start reading from a statement, then overwrite it
+ ASSERT_OK(&error, AdbcStatementNew(&connection, &statement, &error));
+ ASSERT_OK(&error, AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error));
+
+ StreamReader reader1;
+ StreamReader reader2;
+ ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement,
&reader1.stream.value,
+ &reader1.rows_affected, &error));
+ ASSERT_NO_FATAL_FAILURE(reader1.GetSchema());
+
+ ASSERT_OK(&error, AdbcStatementExecuteQuery(&statement,
&reader2.stream.value,
+ &reader2.rows_affected, &error));
+ ASSERT_NO_FATAL_FAILURE(reader2.GetSchema());
+
+ // First reader should not fail, but may give no data
+ ASSERT_NO_FATAL_FAILURE(reader1.Next());
+}
+
#undef ADBCV_CONCAT
#undef ADBCV_NAME
#undef ADBCV_FAILS_WITH_IMPL
diff --git a/c/validation/adbc_validation.h b/c/validation/adbc_validation.h
index 156634a..26e5f7f 100644
--- a/c/validation/adbc_validation.h
+++ b/c/validation/adbc_validation.h
@@ -46,6 +46,10 @@ class DriverQuirks {
/// \brief Return the SQL to reference the bind parameter of the given index
virtual std::string BindParameter(int index) const { return "?"; }
+
+ /// \brief Whether two statements can be used at the same time on a
+ /// single connection
+ virtual bool supports_concurrent_statements() const { return false; }
};
class DatabaseTest {
@@ -153,7 +157,10 @@ class StatementTest {
void TestSqlQueryErrors();
- // TODO: transactions
+ void TestTransactions();
+
+ void TestConcurrentStatements();
+ void TestResultInvalidation();
protected:
struct AdbcError error;
@@ -183,7 +190,10 @@ class StatementTest {
TEST_F(FIXTURE, SqlQueryInts) { TestSqlQueryInts(); }
\
TEST_F(FIXTURE, SqlQueryFloats) { TestSqlQueryFloats(); }
\
TEST_F(FIXTURE, SqlQueryStrings) { TestSqlQueryStrings(); }
\
- TEST_F(FIXTURE, SqlQueryErrors) { TestSqlQueryErrors(); }
+ TEST_F(FIXTURE, SqlQueryErrors) { TestSqlQueryErrors(); }
\
+ TEST_F(FIXTURE, Transactions) { TestTransactions(); }
\
+ TEST_F(FIXTURE, ConcurrentStatements) { TestConcurrentStatements(); }
\
+ TEST_F(FIXTURE, ResultInvalidation) { TestResultInvalidation(); }
} // namespace adbc_validation