Changeset: 8f8320c3b327 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/8f8320c3b327 Branch: default Log Message:
Merge with insertonly branch. diffs (truncated from 574 to 300 lines): diff --git a/clients/Tests/MAL-signatures-hge.test b/clients/Tests/MAL-signatures-hge.test --- a/clients/Tests/MAL-signatures-hge.test +++ b/clients/Tests/MAL-signatures-hge.test @@ -49764,6 +49764,21 @@ pattern sql.percent_rank(X_0:any_1, X_1: SQLpercent_rank; return the percentage into the total number of groups for each row sql +persist_unlogged +unsafe pattern sql.persist_unlogged() (X_0:bat[:str], X_1:bat[:int], X_2:bat[:lng]) +SQLpersist_unlogged; +Persist deltas on append only tables in current schema +sql +persist_unlogged +unsafe pattern sql.persist_unlogged(X_0:str) (X_1:bat[:str], X_2:bat[:int], X_3:bat[:lng]) +SQLpersist_unlogged; +Persist deltas on append only tables in schema s +sql +persist_unlogged +unsafe pattern sql.persist_unlogged(X_0:str, X_1:str) (X_2:bat[:str], X_3:bat[:int], X_4:bat[:lng]) +SQLpersist_unlogged; +Persist deltas on append only table in schema s table t +sql predicate unsafe pattern sql.predicate(X_0:str, X_1:str, X_2:str):void mvc_add_column_predicate; diff --git a/clients/Tests/MAL-signatures.test b/clients/Tests/MAL-signatures.test --- a/clients/Tests/MAL-signatures.test +++ b/clients/Tests/MAL-signatures.test @@ -38169,6 +38169,21 @@ pattern sql.percent_rank(X_0:any_1, X_1: SQLpercent_rank; return the percentage into the total number of groups for each row sql +persist_unlogged +unsafe pattern sql.persist_unlogged() (X_0:bat[:str], X_1:bat[:int], X_2:bat[:lng]) +SQLpersist_unlogged; +Persist deltas on append only tables in current schema +sql +persist_unlogged +unsafe pattern sql.persist_unlogged(X_0:str) (X_1:bat[:str], X_2:bat[:int], X_3:bat[:lng]) +SQLpersist_unlogged; +Persist deltas on append only tables in schema s +sql +persist_unlogged +unsafe pattern sql.persist_unlogged(X_0:str, X_1:str) (X_2:bat[:str], X_3:bat[:int], X_4:bat[:lng]) +SQLpersist_unlogged; +Persist deltas on append only table in schema s table t +sql predicate unsafe pattern sql.predicate(X_0:str, X_1:str, X_2:str):void mvc_add_column_predicate; diff --git a/sql/backends/monet5/CMakeLists.txt b/sql/backends/monet5/CMakeLists.txt --- a/sql/backends/monet5/CMakeLists.txt +++ b/sql/backends/monet5/CMakeLists.txt @@ -113,6 +113,7 @@ set(include_sql_files 58_hot_snapshot 75_storagemodel 76_dump + 77_storage 80_statistics 81_tracer 91_information_schema) diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All --- a/sql/backends/monet5/Tests/All +++ b/sql/backends/monet5/Tests/All @@ -33,3 +33,5 @@ limithack shutdown HAVE_HGE?int_notation_1e5 + +!NOWAL?persist_unlogged diff --git a/sql/backends/monet5/Tests/persist_unlogged.SQL.py b/sql/backends/monet5/Tests/persist_unlogged.SQL.py new file mode 100644 --- /dev/null +++ b/sql/backends/monet5/Tests/persist_unlogged.SQL.py @@ -0,0 +1,38 @@ +import os, tempfile + +from MonetDBtesting.sqltest import SQLTestCase +try: + from MonetDBtesting import process +except ImportError: + import process + +with tempfile.TemporaryDirectory() as farm_dir: + os.mkdir(os.path.join(farm_dir, 'db1')) + + with process.server(mapiport='0', dbname='db1', + dbfarm=os.path.join(farm_dir, 'db1'), + stdin=process.PIPE, + stdout=process.PIPE, stderr=process.PIPE) as s: + with SQLTestCase() as tc: + tc.connect(username="monetdb", password="monetdb", port=s.dbport, database='db1') + tc.execute("CREATE UNLOGGED TABLE foo (x INT)").assertSucceeded() + tc.execute("ALTER TABLE foo SET INSERT ONLY").assertSucceeded() + tc.execute("INSERT INTO foo SELECT * FROM generate_series(0,500)") + tc.execute("SELECT count(*) FROM foo").assertSucceeded().assertDataResultMatch([(500,)]) + tc.execute("SELECT table, rowcount FROM persist_unlogged()").assertSucceeded().assertDataResultMatch([('foo', 0)]) + + # Simulate some work in order to trigger WAL flush(note that Mtests runs with --forcemito) + tc.execute("CREATE TABLE bar (x INT)").assertSucceeded() + tc.execute("INSERT INTO bar SELECT * FROM generate_series(0,100000)").assertSucceeded() + + tc.execute("SELECT table, rowcount FROM persist_unlogged()").assertSucceeded().assertDataResultMatch([('foo', 500)]) + s.communicate() + + with process.server(mapiport='0', dbname='db1', + dbfarm=os.path.join(farm_dir, 'db1'), + stdin=process.PIPE, + stdout=process.PIPE, stderr=process.PIPE) as s: + with SQLTestCase() as tc: + tc.connect(username="monetdb", password="monetdb", port=s.dbport, database='db1') + tc.execute("SELECT COUNT(*) FROM foo").assertSucceeded().assertDataResultMatch([(500,)]) + s.communicate() diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c --- a/sql/backends/monet5/sql.c +++ b/sql/backends/monet5/sql.c @@ -54,6 +54,18 @@ #include "mal_authorize.h" #include "gdk_cand.h" +static inline void +BBPnreclaim(int nargs, ...) +{ + va_list valist; + va_start(valist, nargs); + for (int i = 0; i < nargs; i++) { + BAT *b = va_arg(valist, BAT *); + BBPreclaim(b); + } + va_end(valist); +} + static int rel_is_table(sql_rel *rel) { @@ -4310,6 +4322,178 @@ end: } str +SQLpersist_unlogged(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + (void)stk; + (void)pci; + + bool schema_wide = (pci->argc == 3 || pci->argc == 4), + bat_exists = false; + + bat *o0 = getArgReference_bat(stk, pci, 0), + *o1 = getArgReference_bat(stk, pci, 1), + *o2 = getArgReference_bat(stk, pci, 2); + + str i1 = schema_wide && pci->argc == 4 ? *getArgReference_str(stk, pci, 3) : NULL; + str i2 = !schema_wide ? *getArgReference_str(stk, pci, 4) : NULL; + + str msg = MAL_SUCCEED; + sqlstore *store = NULL; + mvc *m = NULL; + sql_trans *tr = NULL; + node *ncol; + storage *t_del = NULL; + + BAT *b = NULL, *d = NULL, *tables = NULL, *sqlids = NULL, *rowcounts = NULL; + + msg = getSQLContext(cntxt, mb, &m, NULL); + + if (msg) + return msg; + + store = m->session->tr->store; + tr = m->session->tr; + + sql_schema *s = NULL; + if (i1) { + s = mvc_bind_schema(m, i1); + if (s == NULL) + throw(SQL, "sql.persist_unlogged", SQLSTATE(3F000) "Schema missing %s.", i1); + } else { + s = m->session->schema; + } + + if (pci->argc != 3 && !mvc_schema_privs(m, s)) + throw(SQL, "sql.persist_unlogged", SQLSTATE(42000) "Access denied for %s to schema '%s'.", + get_string_global_var(m, "current_user"), s->base.name); + + int n = 100; + bat *commit_list = GDKzalloc(sizeof(bat) * (n + 1)); + BUN *sizes = GDKzalloc(sizeof(BUN) * (n + 1)); + + tables = COLnew(0, TYPE_str, 0, TRANSIENT); + sqlids = COLnew(0, TYPE_int, 0, TRANSIENT); + rowcounts = COLnew(0, TYPE_lng, 0, TRANSIENT); + + if (commit_list == NULL || sizes == NULL || tables == NULL || sqlids == NULL || rowcounts == NULL) { + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, sqlids, rowcounts); + throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); + } + + commit_list[0] = 0; + sizes[0] = 0; + int i = 1; + + MT_lock_set(&store->commit); + + if (s->tables) { + struct os_iter oi; + os_iterator(&oi, s->tables, tr, NULL); + + for (sql_base *bt = oi_next(&oi); bt; bt = oi_next(&oi)) { + sql_table *t = (sql_table *) bt; + + if (!schema_wide && strcmp(i2, t->base.name) != 0) + continue; + + if (isTable(t) && t->access == TABLE_APPENDONLY && isUnloggedTable(t)) { + str t_name = t->base.name; + sqlid t_id = t->base.id; + t_del = bind_del_data(tr, t, NULL); + + if (t_del == NULL || (d = BATdescriptor(t_del->cs.bid)) == NULL) { + MT_lock_unset(&store->commit); + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, sqlids, rowcounts); + throw(SQL, "sql.persist_unlogged", "Cannot access %s column storage.", t_name); + } + + if (ol_first_node(t->columns)) { + + for (ncol = ol_first_node((t)->columns); ncol; ncol = ncol->next) { + sql_column *c = (sql_column *) ncol->data; + b = store->storage_api.bind_col(tr, c, RDONLY); + + if (b == NULL) { + MT_lock_unset(&store->commit); + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, sqlids, rowcounts); + throw(SQL, "sql.persist_unlogged", "Cannot access column descriptor."); + } + + if (isVIEW(b)) + b = BATdescriptor(VIEWtparent(b)); + + if (i == n && ncol->next) { + n = n * 2; + commit_list = GDKrealloc(commit_list, sizeof(bat) * n); + sizes = GDKrealloc(sizes, sizeof(BUN) * n); + } + + if (commit_list == NULL || sizes == NULL) { + MT_lock_unset(&store->commit); + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, sqlids, rowcounts); + throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); + } + + if (BBP_status(b->batCacheid) & BBPEXISTING) { + commit_list[i] = b->batCacheid; + sizes[i] = BATcount(b); + i++; + bat_exists = true; + } + + if (BUNappend(tables, t_name, false) != GDK_SUCCEED || + BUNappend(sqlids, &t_id, false) != GDK_SUCCEED || + BUNappend(rowcounts, bat_exists ? sizes + (i - 1) : sizes, false) != GDK_SUCCEED) { + MT_lock_unset(&store->commit); + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, sqlids, rowcounts); + throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); + } + } + } + + if (bat_exists) { + commit_list[i] = d->batCacheid; + sizes[i] = BATcount(d); + i++; + } + + bat_exists = false; + } + } + } + + MT_lock_unset(&store->commit); + + if (commit_list[1] > 0 && TMsubcommit_list(commit_list, sizes, i, -1, -1) != GDK_SUCCEED) + msg = createException(SQL, "sql.persist_unlogged", GDK_EXCEPTION); + + GDKfree(commit_list); + GDKfree(sizes); + + if (msg) { + BBPnreclaim(3, tables, sqlids, rowcounts); + } else { _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org