Changeset: 767eae9e8198 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/767eae9e8198
Modified Files:
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_upgrades.c
        sql/scripts/77_storage.sql
Branch: insertonly
Log Message:

Refactor SQLinsertonly_persist + fix upgrade code.


diffs (truncated from 329 to 300 lines):

diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -54,6 +54,18 @@
 #include "mal_authorize.h"
 #include "gdk_cand.h"
 
+static inline void
+BBPnreclaim(int nargs, ...)
+{
+       va_list valist;
+       va_start(valist, nargs);
+       for (int i = 0; i < nargs; i++) {
+               BAT *b = va_arg(valist, BAT *);
+               BBPreclaim(b);
+       }
+       va_end(valist);
+}
+
 static int
 rel_is_table(sql_rel *rel)
 {
@@ -4315,19 +4327,24 @@ SQLinsertonly_persist(Client cntxt, MalB
        (void)stk;
        (void)pci;
 
-       bat *r0 = getArgReference_bat(stk,pci,0),
-               *r1 = getArgReference_bat(stk,pci,1),
-               *r2 = getArgReference_bat(stk,pci,2);
+       bool schema_wide = pci->argc == 4 ? true : false;
+       assert(pci->argc == 4 || pci->argc == 5);
+
+       bat *o0 = getArgReference_bat(stk, pci, 0),
+               *o1 = getArgReference_bat(stk, pci, 1),
+               *o2 = getArgReference_bat(stk, pci, 2);
+
+       str i1 = *getArgReference_str(stk, pci, 3);
+       str i2 = !schema_wide ? *getArgReference_str(stk, pci, 4) : NULL;
 
        str msg = MAL_SUCCEED;
        sqlstore *store = NULL;
        mvc *m = NULL;
        sql_trans *tr = NULL;
-       struct os_iter si = {0};
        node *ncol;
        storage *t_storage = NULL;
 
-       BAT *bs = NULL, *tables = NULL, *tables_ids = NULL, *rowcounts = NULL;
+       BAT *bs = NULL, *tables = NULL, *oids = NULL, *rowcounts = NULL;
 
        msg = getSQLContext(cntxt, mb, &m, NULL);
 
@@ -4337,133 +4354,123 @@ SQLinsertonly_persist(Client cntxt, MalB
        store = m->session->tr->store;
        tr = m->session->tr;
 
-       if (store->insertonly_nowal == false) {
-               msg = createException(SQL, "sql.insertonly_persist", "Function 
cannot be used without setting "
-                                                         "insertonly_nowal 
flag at server startup.");
-               return msg;
-       }
-
-       int n = 1000;
+       if (store->insertonly_nowal == false)
+               throw(SQL, "sql.insertonly_persist", "Function cannot be used 
without setting"
+                         " insertonly_nowal flag at server startup.");
+
+       sql_schema *s = mvc_bind_schema(m, i1);
+       if (!s)
+               throw(SQL, "sql.insertonly_persist", SQLSTATE(3F000) "Schema 
missing %s.", i1);
+
+       if (!mvc_schema_privs(m, s))
+               throw(SQL, "sql.insertonly_persist", SQLSTATE(42000) "Access 
denied for %s to schema '%s'.",
+                         get_string_global_var(m, "current_user"), 
s->base.name);
+
+       int n = 100;
        bat *commit_list = GDKmalloc(sizeof(bat) * (n + 1));
        BUN *sizes = GDKmalloc(sizeof(BUN) * (n + 1));
 
+       tables = COLnew(0, TYPE_str, 0, TRANSIENT);
+       oids = COLnew(0, TYPE_lng, 0, TRANSIENT);
+       rowcounts = COLnew(0, TYPE_lng, 0, TRANSIENT);
+
+       if (commit_list == NULL || sizes == NULL || tables == NULL || oids == 
NULL || rowcounts == NULL) {
+               GDKfree(commit_list);
+               GDKfree(sizes);
+               BBPnreclaim(3, tables, oids, rowcounts);
+               throw(SQL, "sql.insertonly_persist", SQLSTATE(HY001));
+       }
+
        commit_list[0] = 0;
        sizes[0] = 0;
        int i = 1;
 
-       tables = COLnew(0, TYPE_str, 0, TRANSIENT);
-       tables_ids = COLnew(0, TYPE_lng, 0, TRANSIENT);
-       rowcounts = COLnew(0, TYPE_lng, 0, TRANSIENT);
-
-       if (tables == NULL || tables_ids == NULL || rowcounts == NULL || 
commit_list == NULL || sizes == NULL) {
-               msg = createException(SQL, "sql.insertonly_persist", 
SQLSTATE(HY001));
-               goto exit2;
-       }
-
        MT_lock_set(&store->commit);
 
-       os_iterator(&si, tr->cat->schemas, tr, NULL);
-
-       for (sql_base *b = oi_next(&si); b; b = oi_next(&si)) {
-               sql_schema *s = (sql_schema *) b;
-               str x = s->base.name;
-
-               if (strcmp(x, "sys") == 0 || strcmp(x, "tmp") == 0 ||
-                       strcmp(x, "json") == 0 || strcmp(x, "profiler") == 0 ||
-                       strcmp(x, "logging") == 0)
-                       continue;
-
-               if (s->tables) {
-                       struct os_iter oi;
-                       os_iterator(&oi, s->tables, tr, NULL);
-
-                       for (sql_base *bt = oi_next(&oi); bt; bt = 
oi_next(&oi)) {
-                               sql_table *t = (sql_table *) bt;
-
-                               if (isTable(t) && t->access == 
TABLE_APPENDONLY) {
-                                       str t_name = t->base.name;
-                                       sqlid t_id = t->base.id;
-                                       t_storage = bind_del_data(tr, t, NULL);
-
-                                       if (ol_first_node(t->columns)) {
-
-                                               for (ncol = 
ol_first_node((t)->columns); ncol; ncol = ncol->next) {
-                                                       sql_column *c = 
(sql_column *) ncol->data;
-                                                       bs = 
store->storage_api.bind_col(tr, c, RDONLY);
-
-                                                       if (bs == NULL) {
-                                                               msg = 
createException(SQL, "insertonly_persist",
-                                                                               
                          SQLSTATE(HY005) "Cannot access column descriptor");
-                                                               goto exit1;
-                                                       }
-
-                                                       if (isVIEW(bs)) {
-                                                               bs = 
BATdescriptor(VIEWtparent(bs));
-                                                               if (bs == NULL) 
{
-                                                                       msg = 
createException(SQL, "insertonly_persist",
-                                                                               
                                  SQLSTATE(HY005) "Cannot access column 
descriptor");
-                                                                       goto 
exit1;
-                                                               }
-                                                       }
-
-                                                       if (i == n && 
ncol->next) {
-                                                               n = n * 2;
-                                                               commit_list = 
GDKrealloc(commit_list, sizeof(bat) * n);
-                                                               sizes = 
GDKrealloc(sizes, sizeof(BUN) * n);
-                                                       }
-
-                                                       if (commit_list == NULL 
|| sizes == NULL) {
-                                                               msg = 
createException(SQL, "insertonly_persist", SQLSTATE(HY001));
-                                                               goto exit1;
-                                                       }
-
-                                                       commit_list[i] = 
t_storage->cs.bid;
-                                                       commit_list[i+1] = 
bs->batCacheid;
-                                                       sizes[i] = 
!bs->batTransient ? BATcount(bs) : 0;
-                                                       sizes[i+1] = 
!bs->batTransient ? BATcount(bs) : 0;
-
-                                                       if (BUNappend(tables, 
t_name, false) != GDK_SUCCEED) {
-                                                               msg = 
createException(SQL, "insertonly_persist", "Failed to append 'table'");
-                                                               goto exit1;
-                                                       }
-
-                                                       if 
(BUNappend(tables_ids, &t_id, false) != GDK_SUCCEED) {
-                                                               msg = 
createException(SQL, "insertonly_persist", "Failed to append 'table_id'");
-                                                               goto exit1;
-                                                       }
-
-                                                       if 
(BUNappend(rowcounts, sizes + i, false) != GDK_SUCCEED) {
-                                                               msg = 
createException(SQL, "insertonly_persist", "Failed to append 'rowcount'");
-                                                               goto exit1;
-                                                       }
-
-                                                       i+=2;
+       if (s->tables) {
+               struct os_iter oi;
+               os_iterator(&oi, s->tables, tr, NULL);
+
+               for (sql_base *bt = oi_next(&oi); bt; bt = oi_next(&oi)) {
+                       sql_table *t = (sql_table *) bt;
+
+                       if (!schema_wide && strcmp(i2, t->base.name) != 0)
+                               continue;
+
+                       if (isTable(t) && t->access == TABLE_APPENDONLY) {
+                               str t_name = t->base.name;
+                               sqlid t_id = t->base.id;
+                               t_storage = bind_del_data(tr, t, NULL);
+
+                               if (ol_first_node(t->columns)) {
+
+                                       for (ncol = 
ol_first_node((t)->columns); ncol; ncol = ncol->next) {
+                                               sql_column *c = (sql_column *) 
ncol->data;
+                                               bs = 
store->storage_api.bind_col(tr, c, RDONLY);
+
+                                               if (bs && isVIEW(bs))
+                                                       bs = 
BATdescriptor(VIEWtparent(bs));
+
+                                               if (bs == NULL) {
+                                                       
MT_lock_unset(&store->commit);
+                                                       GDKfree(commit_list);
+                                                       GDKfree(sizes);
+                                                       BBPnreclaim(3, tables, 
oids, rowcounts);
+                                                       throw(SQL, 
"sql.insertonly_persist", "Cannot access column descriptor.");
                                                }
+
+                                               if (i == n && ncol->next) {
+                                                       n = n * 2;
+                                                       commit_list = 
GDKrealloc(commit_list, sizeof(bat) * n);
+                                                       sizes = 
GDKrealloc(sizes, sizeof(BUN) * n);
+                                               }
+
+                                               if (commit_list == NULL || 
sizes == NULL) {
+                                                       
MT_lock_unset(&store->commit);
+                                                       GDKfree(commit_list);
+                                                       GDKfree(sizes);
+                                                       BBPnreclaim(3, tables, 
oids, rowcounts);
+                                                       throw(SQL, 
"sql.insertonly_persist", SQLSTATE(HY001));
+                                               }
+
+                                               commit_list[i] = 
t_storage->cs.bid;
+                                               commit_list[i+1] = 
bs->batCacheid;
+                                               sizes[i] = !bs->batTransient ? 
BATcount(bs) : 0;
+                                               sizes[i+1] = !bs->batTransient 
? BATcount(bs) : 0;
+
+                                               if (BUNappend(tables, t_name, 
false) != GDK_SUCCEED ||
+                                                       BUNappend(oids, &t_id, 
false) != GDK_SUCCEED ||
+                                                       BUNappend(rowcounts, 
sizes + i, false) != GDK_SUCCEED) {
+                                                       
MT_lock_unset(&store->commit);
+                                                       GDKfree(commit_list);
+                                                       GDKfree(sizes);
+                                                       BBPnreclaim(3, tables, 
oids, rowcounts);
+                                                       throw(SQL, 
"sql.insertonly_persist", SQLSTATE(HY001));
+                                               }
+
+                                               i+=2;
                                        }
                                }
                        }
                }
        }
 
+       MT_lock_unset(&store->commit);
+
        if (TMsubcommit_list(commit_list, sizes, i, -1, -1) != GDK_SUCCEED)
-               msg = createException(SQL, "insertonly_persist", GDK_EXCEPTION);
-
- exit1:
-       MT_lock_unset(&store->commit);
-
- exit2:
+               msg = createException(SQL, "sql.insertonly_persist", 
GDK_EXCEPTION);
+
        GDKfree(commit_list);
        GDKfree(sizes);
+
        if (msg) {
-               BBPreclaim(tables);
-               BBPreclaim(tables_ids);
-               BBPreclaim(rowcounts);
+               BBPnreclaim(3, tables, oids, rowcounts);
        } else {
-               *r0 = tables->batCacheid;
-               *r1 = tables_ids->batCacheid;
-               *r2 = rowcounts->batCacheid;
+               *o0 = tables->batCacheid;
+               *o1 = oids->batCacheid;
+               *o2 = rowcounts->batCacheid;
                BBPkeepref(tables);
-               BBPkeepref(tables_ids);
+               BBPkeepref(oids);
                BBPkeepref(rowcounts);
        }
        return msg;
@@ -5221,7 +5228,8 @@ static mel_func sql_init_funcs[] = {
  pattern("sql", "resume_log_flushing", SQLresume_log_flushing, true, "Resume 
WAL log flushing", args(1,1, arg("",void))),
  pattern("sql", "suspend_log_flushing", SQLsuspend_log_flushing, true, 
"Suspend WAL log flushing", args(1,1, arg("",void))),
  pattern("sql", "hot_snapshot", SQLhot_snapshot, true, "Write db snapshot to 
the given tar(.gz/.lz4/.bz/.xz) file on either server or client", args(1,3, 
arg("",void),arg("tarfile", str),arg("onserver",bit))),
- pattern("sql", "insertonly_persist", SQLinsertonly_persist, true, "Persist 
changes to new data on append only tables.", args(3, 3, batarg("table", str), 
batarg("table_id", lng), batarg("rowcount", lng))),
+ pattern("sql", "insertonly_persist", SQLinsertonly_persist, true, "Persist 
deltas on append only tables in schema s.", args(3, 4, batarg("table", str), 
batarg("table_id", lng), batarg("rowcount", lng), arg("s", str))),
+ pattern("sql", "insertonly_persist", SQLinsertonly_persist, true, "Persist 
deltas on append only table in schema s table t.", args(3, 5, batarg("table", 
str), batarg("table_id", lng), batarg("rowcount", lng), arg("s", str), arg("t", 
str))),
  pattern("sql", "assert", SQLassert, false, "Generate an exception when 
b==true", args(1,3, arg("",void),arg("b",bit),arg("msg",str))),
  pattern("sql", "assert", SQLassertInt, false, "Generate an exception when 
b!=0", args(1,3, arg("",void),arg("b",int),arg("msg",str))),
  pattern("sql", "assert", SQLassertLng, false, "Generate an exception when 
b!=0", args(1,3, arg("",void),arg("b",lng),arg("msg",str))),
diff --git a/sql/backends/monet5/sql_upgrades.c 
b/sql/backends/monet5/sql_upgrades.c
--- a/sql/backends/monet5/sql_upgrades.c
+++ b/sql/backends/monet5/sql_upgrades.c
@@ -6173,7 +6173,7 @@ sql_update_default(Client c, mvc *sql, s
        }
 
        /* 77_storage.sql */
-       if (!sql_bind_func_(sql, s->base.name, "insertonly_persist", NULL, 
F_UNION, true)) {
+       if (!sql_bind_func(sql, s->base.name, "insertonly_persist", &tp, NULL, 
F_UNION, true)) {
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to