Changeset: 8f8320c3b327 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/8f8320c3b327
Branch: default
Log Message:

Merge with insertonly branch.


diffs (truncated from 574 to 300 lines):

diff --git a/clients/Tests/MAL-signatures-hge.test 
b/clients/Tests/MAL-signatures-hge.test
--- a/clients/Tests/MAL-signatures-hge.test
+++ b/clients/Tests/MAL-signatures-hge.test
@@ -49764,6 +49764,21 @@ pattern sql.percent_rank(X_0:any_1, X_1:
 SQLpercent_rank;
 return the percentage into the total number of groups for each row
 sql
+persist_unlogged
+unsafe pattern sql.persist_unlogged() (X_0:bat[:str], X_1:bat[:int], 
X_2:bat[:lng]) 
+SQLpersist_unlogged;
+Persist deltas on append only tables in current schema
+sql
+persist_unlogged
+unsafe pattern sql.persist_unlogged(X_0:str) (X_1:bat[:str], X_2:bat[:int], 
X_3:bat[:lng]) 
+SQLpersist_unlogged;
+Persist deltas on append only tables in schema s
+sql
+persist_unlogged
+unsafe pattern sql.persist_unlogged(X_0:str, X_1:str) (X_2:bat[:str], 
X_3:bat[:int], X_4:bat[:lng]) 
+SQLpersist_unlogged;
+Persist deltas on append only table in schema s table t
+sql
 predicate
 unsafe pattern sql.predicate(X_0:str, X_1:str, X_2:str):void 
 mvc_add_column_predicate;
diff --git a/clients/Tests/MAL-signatures.test 
b/clients/Tests/MAL-signatures.test
--- a/clients/Tests/MAL-signatures.test
+++ b/clients/Tests/MAL-signatures.test
@@ -38169,6 +38169,21 @@ pattern sql.percent_rank(X_0:any_1, X_1:
 SQLpercent_rank;
 return the percentage into the total number of groups for each row
 sql
+persist_unlogged
+unsafe pattern sql.persist_unlogged() (X_0:bat[:str], X_1:bat[:int], 
X_2:bat[:lng]) 
+SQLpersist_unlogged;
+Persist deltas on append only tables in current schema
+sql
+persist_unlogged
+unsafe pattern sql.persist_unlogged(X_0:str) (X_1:bat[:str], X_2:bat[:int], 
X_3:bat[:lng]) 
+SQLpersist_unlogged;
+Persist deltas on append only tables in schema s
+sql
+persist_unlogged
+unsafe pattern sql.persist_unlogged(X_0:str, X_1:str) (X_2:bat[:str], 
X_3:bat[:int], X_4:bat[:lng]) 
+SQLpersist_unlogged;
+Persist deltas on append only table in schema s table t
+sql
 predicate
 unsafe pattern sql.predicate(X_0:str, X_1:str, X_2:str):void 
 mvc_add_column_predicate;
diff --git a/sql/backends/monet5/CMakeLists.txt 
b/sql/backends/monet5/CMakeLists.txt
--- a/sql/backends/monet5/CMakeLists.txt
+++ b/sql/backends/monet5/CMakeLists.txt
@@ -113,6 +113,7 @@ set(include_sql_files
   58_hot_snapshot
   75_storagemodel
   76_dump
+  77_storage
   80_statistics
   81_tracer
   91_information_schema)
diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -33,3 +33,5 @@ limithack
 shutdown
 
 HAVE_HGE?int_notation_1e5
+
+!NOWAL?persist_unlogged
diff --git a/sql/backends/monet5/Tests/persist_unlogged.SQL.py 
b/sql/backends/monet5/Tests/persist_unlogged.SQL.py
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/persist_unlogged.SQL.py
@@ -0,0 +1,38 @@
+import os, tempfile
+
+from MonetDBtesting.sqltest import SQLTestCase
+try:
+    from MonetDBtesting import process
+except ImportError:
+    import process
+
+with tempfile.TemporaryDirectory() as farm_dir:
+    os.mkdir(os.path.join(farm_dir, 'db1'))
+
+    with process.server(mapiport='0', dbname='db1',
+                        dbfarm=os.path.join(farm_dir, 'db1'),
+                        stdin=process.PIPE,
+                        stdout=process.PIPE, stderr=process.PIPE) as s:
+        with SQLTestCase() as tc:
+            tc.connect(username="monetdb", password="monetdb", port=s.dbport, 
database='db1')
+            tc.execute("CREATE UNLOGGED TABLE foo (x INT)").assertSucceeded()
+            tc.execute("ALTER TABLE foo SET INSERT ONLY").assertSucceeded()
+            tc.execute("INSERT INTO foo SELECT * FROM generate_series(0,500)")
+            tc.execute("SELECT count(*) FROM 
foo").assertSucceeded().assertDataResultMatch([(500,)])
+            tc.execute("SELECT table, rowcount FROM 
persist_unlogged()").assertSucceeded().assertDataResultMatch([('foo', 0)])
+
+            # Simulate some work in order to trigger WAL flush(note that 
Mtests runs with --forcemito)
+            tc.execute("CREATE TABLE bar (x INT)").assertSucceeded()
+            tc.execute("INSERT INTO bar SELECT * FROM 
generate_series(0,100000)").assertSucceeded()
+
+            tc.execute("SELECT table, rowcount FROM 
persist_unlogged()").assertSucceeded().assertDataResultMatch([('foo', 500)])
+        s.communicate()
+
+    with process.server(mapiport='0', dbname='db1',
+                        dbfarm=os.path.join(farm_dir, 'db1'),
+                        stdin=process.PIPE,
+                        stdout=process.PIPE, stderr=process.PIPE) as s:
+        with SQLTestCase() as tc:
+            tc.connect(username="monetdb", password="monetdb", port=s.dbport, 
database='db1')
+            tc.execute("SELECT COUNT(*) FROM 
foo").assertSucceeded().assertDataResultMatch([(500,)])
+        s.communicate()
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -54,6 +54,18 @@
 #include "mal_authorize.h"
 #include "gdk_cand.h"
 
+static inline void
+BBPnreclaim(int nargs, ...)
+{
+       va_list valist;
+       va_start(valist, nargs);
+       for (int i = 0; i < nargs; i++) {
+               BAT *b = va_arg(valist, BAT *);
+               BBPreclaim(b);
+       }
+       va_end(valist);
+}
+
 static int
 rel_is_table(sql_rel *rel)
 {
@@ -4310,6 +4322,178 @@ end:
 }
 
 str
+SQLpersist_unlogged(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       (void)stk;
+       (void)pci;
+
+       bool schema_wide = (pci->argc == 3 || pci->argc == 4),
+               bat_exists = false;
+
+       bat *o0 = getArgReference_bat(stk, pci, 0),
+               *o1 = getArgReference_bat(stk, pci, 1),
+               *o2 = getArgReference_bat(stk, pci, 2);
+
+       str i1 = schema_wide && pci->argc == 4 ? *getArgReference_str(stk, pci, 
3) : NULL;
+       str i2 = !schema_wide ? *getArgReference_str(stk, pci, 4) : NULL;
+
+       str msg = MAL_SUCCEED;
+       sqlstore *store = NULL;
+       mvc *m = NULL;
+       sql_trans *tr = NULL;
+       node *ncol;
+       storage *t_del = NULL;
+
+       BAT *b = NULL, *d = NULL, *tables = NULL, *sqlids = NULL, *rowcounts = 
NULL;
+
+       msg = getSQLContext(cntxt, mb, &m, NULL);
+
+       if (msg)
+               return msg;
+
+       store = m->session->tr->store;
+       tr = m->session->tr;
+
+       sql_schema *s = NULL;
+       if (i1) {
+               s = mvc_bind_schema(m, i1);
+               if (s == NULL)
+                       throw(SQL, "sql.persist_unlogged", SQLSTATE(3F000) 
"Schema missing %s.", i1);
+       } else {
+               s = m->session->schema;
+       }
+
+       if (pci->argc != 3 && !mvc_schema_privs(m, s))
+               throw(SQL, "sql.persist_unlogged", SQLSTATE(42000) "Access 
denied for %s to schema '%s'.",
+                         get_string_global_var(m, "current_user"), 
s->base.name);
+
+       int n = 100;
+       bat *commit_list = GDKzalloc(sizeof(bat) * (n + 1));
+       BUN *sizes = GDKzalloc(sizeof(BUN) * (n + 1));
+
+       tables = COLnew(0, TYPE_str, 0, TRANSIENT);
+       sqlids = COLnew(0, TYPE_int, 0, TRANSIENT);
+       rowcounts = COLnew(0, TYPE_lng, 0, TRANSIENT);
+
+       if (commit_list == NULL || sizes == NULL || tables == NULL || sqlids == 
NULL || rowcounts == NULL) {
+               GDKfree(commit_list);
+               GDKfree(sizes);
+               BBPnreclaim(3, tables, sqlids, rowcounts);
+               throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001));
+       }
+
+       commit_list[0] = 0;
+       sizes[0] = 0;
+       int i = 1;
+
+       MT_lock_set(&store->commit);
+
+       if (s->tables) {
+               struct os_iter oi;
+               os_iterator(&oi, s->tables, tr, NULL);
+
+               for (sql_base *bt = oi_next(&oi); bt; bt = oi_next(&oi)) {
+                       sql_table *t = (sql_table *) bt;
+
+                       if (!schema_wide && strcmp(i2, t->base.name) != 0)
+                               continue;
+
+                       if (isTable(t) && t->access == TABLE_APPENDONLY && 
isUnloggedTable(t)) {
+                               str t_name = t->base.name;
+                               sqlid t_id = t->base.id;
+                               t_del = bind_del_data(tr, t, NULL);
+
+                               if (t_del == NULL || (d = 
BATdescriptor(t_del->cs.bid)) == NULL) {
+                                       MT_lock_unset(&store->commit);
+                                       GDKfree(commit_list);
+                                       GDKfree(sizes);
+                                       BBPnreclaim(3, tables, sqlids, 
rowcounts);
+                                       throw(SQL, "sql.persist_unlogged", 
"Cannot access %s column storage.", t_name);
+                               }
+
+                               if (ol_first_node(t->columns)) {
+
+                                       for (ncol = 
ol_first_node((t)->columns); ncol; ncol = ncol->next) {
+                                               sql_column *c = (sql_column *) 
ncol->data;
+                                               b = 
store->storage_api.bind_col(tr, c, RDONLY);
+
+                                               if (b == NULL) {
+                                                       
MT_lock_unset(&store->commit);
+                                                       GDKfree(commit_list);
+                                                       GDKfree(sizes);
+                                                       BBPnreclaim(3, tables, 
sqlids, rowcounts);
+                                                       throw(SQL, 
"sql.persist_unlogged", "Cannot access column descriptor.");
+                                               }
+
+                                               if (isVIEW(b))
+                                                       b = 
BATdescriptor(VIEWtparent(b));
+
+                                               if (i == n && ncol->next) {
+                                                       n = n * 2;
+                                                       commit_list = 
GDKrealloc(commit_list, sizeof(bat) * n);
+                                                       sizes = 
GDKrealloc(sizes, sizeof(BUN) * n);
+                                               }
+
+                                               if (commit_list == NULL || 
sizes == NULL) {
+                                                       
MT_lock_unset(&store->commit);
+                                                       GDKfree(commit_list);
+                                                       GDKfree(sizes);
+                                                       BBPnreclaim(3, tables, 
sqlids, rowcounts);
+                                                       throw(SQL, 
"sql.persist_unlogged", SQLSTATE(HY001));
+                                               }
+
+                                               if (BBP_status(b->batCacheid) & 
BBPEXISTING) {
+                                                       commit_list[i] = 
b->batCacheid;
+                                                       sizes[i] = BATcount(b);
+                                                       i++;
+                                                       bat_exists = true;
+                                               }
+
+                                               if (BUNappend(tables, t_name, 
false) != GDK_SUCCEED ||
+                                                       BUNappend(sqlids, 
&t_id, false) != GDK_SUCCEED ||
+                                                       BUNappend(rowcounts, 
bat_exists ? sizes + (i - 1) : sizes, false) != GDK_SUCCEED) {
+                                                       
MT_lock_unset(&store->commit);
+                                                       GDKfree(commit_list);
+                                                       GDKfree(sizes);
+                                                       BBPnreclaim(3, tables, 
sqlids, rowcounts);
+                                                       throw(SQL, 
"sql.persist_unlogged", SQLSTATE(HY001));
+                                               }
+                                       }
+                               }
+
+                               if (bat_exists) {
+                                       commit_list[i] = d->batCacheid;
+                                       sizes[i] = BATcount(d);
+                                       i++;
+                               }
+
+                               bat_exists = false;
+                       }
+               }
+       }
+
+       MT_lock_unset(&store->commit);
+
+       if (commit_list[1] > 0 && TMsubcommit_list(commit_list, sizes, i, -1, 
-1) != GDK_SUCCEED)
+               msg = createException(SQL, "sql.persist_unlogged", 
GDK_EXCEPTION);
+
+       GDKfree(commit_list);
+       GDKfree(sizes);
+
+       if (msg) {
+               BBPnreclaim(3, tables, sqlids, rowcounts);
+       } else {
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to