Changeset: 767eae9e8198 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/767eae9e8198 Modified Files: sql/backends/monet5/sql.c sql/backends/monet5/sql_upgrades.c sql/scripts/77_storage.sql Branch: insertonly Log Message:
Refactor SQLinsertonly_persist + fix upgrade code. diffs (truncated from 329 to 300 lines): diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c --- a/sql/backends/monet5/sql.c +++ b/sql/backends/monet5/sql.c @@ -54,6 +54,18 @@ #include "mal_authorize.h" #include "gdk_cand.h" +static inline void +BBPnreclaim(int nargs, ...) +{ + va_list valist; + va_start(valist, nargs); + for (int i = 0; i < nargs; i++) { + BAT *b = va_arg(valist, BAT *); + BBPreclaim(b); + } + va_end(valist); +} + static int rel_is_table(sql_rel *rel) { @@ -4315,19 +4327,24 @@ SQLinsertonly_persist(Client cntxt, MalB (void)stk; (void)pci; - bat *r0 = getArgReference_bat(stk,pci,0), - *r1 = getArgReference_bat(stk,pci,1), - *r2 = getArgReference_bat(stk,pci,2); + bool schema_wide = pci->argc == 4 ? true : false; + assert(pci->argc == 4 || pci->argc == 5); + + bat *o0 = getArgReference_bat(stk, pci, 0), + *o1 = getArgReference_bat(stk, pci, 1), + *o2 = getArgReference_bat(stk, pci, 2); + + str i1 = *getArgReference_str(stk, pci, 3); + str i2 = !schema_wide ? *getArgReference_str(stk, pci, 4) : NULL; str msg = MAL_SUCCEED; sqlstore *store = NULL; mvc *m = NULL; sql_trans *tr = NULL; - struct os_iter si = {0}; node *ncol; storage *t_storage = NULL; - BAT *bs = NULL, *tables = NULL, *tables_ids = NULL, *rowcounts = NULL; + BAT *bs = NULL, *tables = NULL, *oids = NULL, *rowcounts = NULL; msg = getSQLContext(cntxt, mb, &m, NULL); @@ -4337,133 +4354,123 @@ SQLinsertonly_persist(Client cntxt, MalB store = m->session->tr->store; tr = m->session->tr; - if (store->insertonly_nowal == false) { - msg = createException(SQL, "sql.insertonly_persist", "Function cannot be used without setting " - "insertonly_nowal flag at server startup."); - return msg; - } - - int n = 1000; + if (store->insertonly_nowal == false) + throw(SQL, "sql.insertonly_persist", "Function cannot be used without setting" + " insertonly_nowal flag at server startup."); + + sql_schema *s = mvc_bind_schema(m, i1); + if (!s) + throw(SQL, "sql.insertonly_persist", SQLSTATE(3F000) "Schema missing %s.", i1); + + if (!mvc_schema_privs(m, s)) + throw(SQL, "sql.insertonly_persist", SQLSTATE(42000) "Access denied for %s to schema '%s'.", + get_string_global_var(m, "current_user"), s->base.name); + + int n = 100; bat *commit_list = GDKmalloc(sizeof(bat) * (n + 1)); BUN *sizes = GDKmalloc(sizeof(BUN) * (n + 1)); + tables = COLnew(0, TYPE_str, 0, TRANSIENT); + oids = COLnew(0, TYPE_lng, 0, TRANSIENT); + rowcounts = COLnew(0, TYPE_lng, 0, TRANSIENT); + + if (commit_list == NULL || sizes == NULL || tables == NULL || oids == NULL || rowcounts == NULL) { + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, oids, rowcounts); + throw(SQL, "sql.insertonly_persist", SQLSTATE(HY001)); + } + commit_list[0] = 0; sizes[0] = 0; int i = 1; - tables = COLnew(0, TYPE_str, 0, TRANSIENT); - tables_ids = COLnew(0, TYPE_lng, 0, TRANSIENT); - rowcounts = COLnew(0, TYPE_lng, 0, TRANSIENT); - - if (tables == NULL || tables_ids == NULL || rowcounts == NULL || commit_list == NULL || sizes == NULL) { - msg = createException(SQL, "sql.insertonly_persist", SQLSTATE(HY001)); - goto exit2; - } - MT_lock_set(&store->commit); - os_iterator(&si, tr->cat->schemas, tr, NULL); - - for (sql_base *b = oi_next(&si); b; b = oi_next(&si)) { - sql_schema *s = (sql_schema *) b; - str x = s->base.name; - - if (strcmp(x, "sys") == 0 || strcmp(x, "tmp") == 0 || - strcmp(x, "json") == 0 || strcmp(x, "profiler") == 0 || - strcmp(x, "logging") == 0) - continue; - - if (s->tables) { - struct os_iter oi; - os_iterator(&oi, s->tables, tr, NULL); - - for (sql_base *bt = oi_next(&oi); bt; bt = oi_next(&oi)) { - sql_table *t = (sql_table *) bt; - - if (isTable(t) && t->access == TABLE_APPENDONLY) { - str t_name = t->base.name; - sqlid t_id = t->base.id; - t_storage = bind_del_data(tr, t, NULL); - - if (ol_first_node(t->columns)) { - - for (ncol = ol_first_node((t)->columns); ncol; ncol = ncol->next) { - sql_column *c = (sql_column *) ncol->data; - bs = store->storage_api.bind_col(tr, c, RDONLY); - - if (bs == NULL) { - msg = createException(SQL, "insertonly_persist", - SQLSTATE(HY005) "Cannot access column descriptor"); - goto exit1; - } - - if (isVIEW(bs)) { - bs = BATdescriptor(VIEWtparent(bs)); - if (bs == NULL) { - msg = createException(SQL, "insertonly_persist", - SQLSTATE(HY005) "Cannot access column descriptor"); - goto exit1; - } - } - - if (i == n && ncol->next) { - n = n * 2; - commit_list = GDKrealloc(commit_list, sizeof(bat) * n); - sizes = GDKrealloc(sizes, sizeof(BUN) * n); - } - - if (commit_list == NULL || sizes == NULL) { - msg = createException(SQL, "insertonly_persist", SQLSTATE(HY001)); - goto exit1; - } - - commit_list[i] = t_storage->cs.bid; - commit_list[i+1] = bs->batCacheid; - sizes[i] = !bs->batTransient ? BATcount(bs) : 0; - sizes[i+1] = !bs->batTransient ? BATcount(bs) : 0; - - if (BUNappend(tables, t_name, false) != GDK_SUCCEED) { - msg = createException(SQL, "insertonly_persist", "Failed to append 'table'"); - goto exit1; - } - - if (BUNappend(tables_ids, &t_id, false) != GDK_SUCCEED) { - msg = createException(SQL, "insertonly_persist", "Failed to append 'table_id'"); - goto exit1; - } - - if (BUNappend(rowcounts, sizes + i, false) != GDK_SUCCEED) { - msg = createException(SQL, "insertonly_persist", "Failed to append 'rowcount'"); - goto exit1; - } - - i+=2; + if (s->tables) { + struct os_iter oi; + os_iterator(&oi, s->tables, tr, NULL); + + for (sql_base *bt = oi_next(&oi); bt; bt = oi_next(&oi)) { + sql_table *t = (sql_table *) bt; + + if (!schema_wide && strcmp(i2, t->base.name) != 0) + continue; + + if (isTable(t) && t->access == TABLE_APPENDONLY) { + str t_name = t->base.name; + sqlid t_id = t->base.id; + t_storage = bind_del_data(tr, t, NULL); + + if (ol_first_node(t->columns)) { + + for (ncol = ol_first_node((t)->columns); ncol; ncol = ncol->next) { + sql_column *c = (sql_column *) ncol->data; + bs = store->storage_api.bind_col(tr, c, RDONLY); + + if (bs && isVIEW(bs)) + bs = BATdescriptor(VIEWtparent(bs)); + + if (bs == NULL) { + MT_lock_unset(&store->commit); + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, oids, rowcounts); + throw(SQL, "sql.insertonly_persist", "Cannot access column descriptor."); } + + if (i == n && ncol->next) { + n = n * 2; + commit_list = GDKrealloc(commit_list, sizeof(bat) * n); + sizes = GDKrealloc(sizes, sizeof(BUN) * n); + } + + if (commit_list == NULL || sizes == NULL) { + MT_lock_unset(&store->commit); + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, oids, rowcounts); + throw(SQL, "sql.insertonly_persist", SQLSTATE(HY001)); + } + + commit_list[i] = t_storage->cs.bid; + commit_list[i+1] = bs->batCacheid; + sizes[i] = !bs->batTransient ? BATcount(bs) : 0; + sizes[i+1] = !bs->batTransient ? BATcount(bs) : 0; + + if (BUNappend(tables, t_name, false) != GDK_SUCCEED || + BUNappend(oids, &t_id, false) != GDK_SUCCEED || + BUNappend(rowcounts, sizes + i, false) != GDK_SUCCEED) { + MT_lock_unset(&store->commit); + GDKfree(commit_list); + GDKfree(sizes); + BBPnreclaim(3, tables, oids, rowcounts); + throw(SQL, "sql.insertonly_persist", SQLSTATE(HY001)); + } + + i+=2; } } } } } + MT_lock_unset(&store->commit); + if (TMsubcommit_list(commit_list, sizes, i, -1, -1) != GDK_SUCCEED) - msg = createException(SQL, "insertonly_persist", GDK_EXCEPTION); - - exit1: - MT_lock_unset(&store->commit); - - exit2: + msg = createException(SQL, "sql.insertonly_persist", GDK_EXCEPTION); + GDKfree(commit_list); GDKfree(sizes); + if (msg) { - BBPreclaim(tables); - BBPreclaim(tables_ids); - BBPreclaim(rowcounts); + BBPnreclaim(3, tables, oids, rowcounts); } else { - *r0 = tables->batCacheid; - *r1 = tables_ids->batCacheid; - *r2 = rowcounts->batCacheid; + *o0 = tables->batCacheid; + *o1 = oids->batCacheid; + *o2 = rowcounts->batCacheid; BBPkeepref(tables); - BBPkeepref(tables_ids); + BBPkeepref(oids); BBPkeepref(rowcounts); } return msg; @@ -5221,7 +5228,8 @@ static mel_func sql_init_funcs[] = { pattern("sql", "resume_log_flushing", SQLresume_log_flushing, true, "Resume WAL log flushing", args(1,1, arg("",void))), pattern("sql", "suspend_log_flushing", SQLsuspend_log_flushing, true, "Suspend WAL log flushing", args(1,1, arg("",void))), pattern("sql", "hot_snapshot", SQLhot_snapshot, true, "Write db snapshot to the given tar(.gz/.lz4/.bz/.xz) file on either server or client", args(1,3, arg("",void),arg("tarfile", str),arg("onserver",bit))), - pattern("sql", "insertonly_persist", SQLinsertonly_persist, true, "Persist changes to new data on append only tables.", args(3, 3, batarg("table", str), batarg("table_id", lng), batarg("rowcount", lng))), + pattern("sql", "insertonly_persist", SQLinsertonly_persist, true, "Persist deltas on append only tables in schema s.", args(3, 4, batarg("table", str), batarg("table_id", lng), batarg("rowcount", lng), arg("s", str))), + pattern("sql", "insertonly_persist", SQLinsertonly_persist, true, "Persist deltas on append only table in schema s table t.", args(3, 5, batarg("table", str), batarg("table_id", lng), batarg("rowcount", lng), arg("s", str), arg("t", str))), pattern("sql", "assert", SQLassert, false, "Generate an exception when b==true", args(1,3, arg("",void),arg("b",bit),arg("msg",str))), pattern("sql", "assert", SQLassertInt, false, "Generate an exception when b!=0", args(1,3, arg("",void),arg("b",int),arg("msg",str))), pattern("sql", "assert", SQLassertLng, false, "Generate an exception when b!=0", args(1,3, arg("",void),arg("b",lng),arg("msg",str))), diff --git a/sql/backends/monet5/sql_upgrades.c b/sql/backends/monet5/sql_upgrades.c --- a/sql/backends/monet5/sql_upgrades.c +++ b/sql/backends/monet5/sql_upgrades.c @@ -6173,7 +6173,7 @@ sql_update_default(Client c, mvc *sql, s } /* 77_storage.sql */ - if (!sql_bind_func_(sql, s->base.name, "insertonly_persist", NULL, F_UNION, true)) { + if (!sql_bind_func(sql, s->base.name, "insertonly_persist", &tp, NULL, F_UNION, true)) { _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org