Changeset: 31ee3d277121 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/31ee3d277121 Modified Files: sql/backends/monet5/Tests/persist_unlogged.SQL.py sql/backends/monet5/sql.c Branch: Dec2023 Log Message:
Cleanup in persist_unlogged logic + return proper rowcount. diffs (267 lines): diff --git a/sql/backends/monet5/Tests/persist_unlogged.SQL.py b/sql/backends/monet5/Tests/persist_unlogged.SQL.py --- a/sql/backends/monet5/Tests/persist_unlogged.SQL.py +++ b/sql/backends/monet5/Tests/persist_unlogged.SQL.py @@ -9,10 +9,7 @@ except ImportError: with tempfile.TemporaryDirectory() as farm_dir: os.mkdir(os.path.join(farm_dir, 'db1')) - with process.server(mapiport='0', dbname='db1', - dbfarm=os.path.join(farm_dir, 'db1'), - stdin=process.PIPE, - stdout=process.PIPE, stderr=process.PIPE) as s: + with process.server(mapiport='0', dbname='db1', dbfarm=os.path.join(farm_dir, 'db1'), stdin=process.PIPE, stdout=process.PIPE, stderr=process.PIPE) as s: with SQLTestCase() as tc: tc.connect(username="monetdb", password="monetdb", port=s.dbport, database='db1') tc.execute("CREATE SCHEMA put").assertSucceeded() @@ -30,10 +27,7 @@ with tempfile.TemporaryDirectory() as fa tc.execute("SELECT table, rowcount FROM persist_unlogged(\'put\', \'foo\')").assertSucceeded().assertDataResultMatch([('foo', 500)]) s.communicate() - with process.server(mapiport='0', dbname='db1', - dbfarm=os.path.join(farm_dir, 'db1'), - stdin=process.PIPE, - stdout=process.PIPE, stderr=process.PIPE) as s: + with process.server(mapiport='0', dbname='db1', dbfarm=os.path.join(farm_dir, 'db1'), stdin=process.PIPE, stdout=process.PIPE, stderr=process.PIPE) as s: with SQLTestCase() as tc: tc.connect(username="monetdb", password="monetdb", port=s.dbport, database='db1') tc.execute("SET SCHEMA put").assertSucceeded() diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c --- a/sql/backends/monet5/sql.c +++ b/sql/backends/monet5/sql.c @@ -19,35 +19,23 @@ #include "sql.h" #include "mapi_prompt.h" #include "sql_result.h" -#include "sql_gencode.h" #include "sql_storage.h" #include "sql_scenario.h" #include "store_sequence.h" -#include "sql_optimizer.h" -#include "sql_datetime.h" #include "sql_partition.h" -#include "rel_unnest.h" -#include "rel_optimizer.h" -#include "rel_statistics.h" #include "rel_partition.h" -#include "rel_select.h" #include "rel_rel.h" #include "rel_exp.h" -#include "rel_dump.h" -#include "rel_bin.h" #include "rel_physical.h" #include "mal.h" #include "mal_client.h" #include "mal_interpreter.h" -#include "mal_module.h" -#include "mal_session.h" #include "mal_resolve.h" #include "mal_client.h" #include "mal_interpreter.h" #include "mal_profiler.h" #include "bat5.h" #include "opt_pipes.h" -#include "orderidx.h" #include "clients.h" #include "mal_instruction.h" #include "mal_resource.h" @@ -4329,16 +4317,12 @@ SQLpersist_unlogged(Client cntxt, MalBlk assert(pci->argc == 5); - bool bat_exists = false; - bat *o0 = getArgReference_bat(stk, pci, 0), *o1 = getArgReference_bat(stk, pci, 1), *o2 = getArgReference_bat(stk, pci, 2); - - str sname = *getArgReference_str(stk, pci, 3); - str tname = *getArgReference_str(stk, pci, 4); - - str msg = MAL_SUCCEED; + str sname = *getArgReference_str(stk, pci, 3), + tname = *getArgReference_str(stk, pci, 4), + msg = MAL_SUCCEED; mvc *m = NULL; msg = getSQLContext(cntxt, mb, &m, NULL); @@ -4358,53 +4342,47 @@ SQLpersist_unlogged(Client cntxt, MalBlk sql_table *t = mvc_bind_table(m, s, tname); if (t == NULL) - throw(SQL, "sql.claim", SQLSTATE(42S02) "Table missing %s.%s", sname, tname); - - int n = 100; - bat *commit_list = GDKzalloc(sizeof(bat) * (n + 1)); - BUN *sizes = GDKzalloc(sizeof(BUN) * (n + 1)); - BAT *b = NULL, *d = NULL, *table = NULL, *tableid = NULL, *rowcount = NULL; - - table = COLnew(0, TYPE_str, 0, TRANSIENT); - tableid = COLnew(0, TYPE_int, 0, TRANSIENT); - rowcount = COLnew(0, TYPE_lng, 0, TRANSIENT); - - if (commit_list == NULL || sizes == NULL || table == NULL || tableid == NULL || rowcount == NULL) { - GDKfree(commit_list); - GDKfree(sizes); - BBPnreclaim(3, table, tableid, rowcount); - throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); - } - - commit_list[0] = 0; - sizes[0] = 0; - int i = 1; - - MT_lock_set(&store->commit); - - if (isTable(t) && isUnloggedTable(t) && t->access == TABLE_APPENDONLY) { - sql_trans *tr = m->session->tr; - storage *t_del = bind_del_data(tr, t, NULL); - - if (t_del == NULL || (d = BATdescriptor(t_del->cs.bid)) == NULL) { - MT_lock_unset(&store->commit); - GDKfree(commit_list); - GDKfree(sizes); - BBPnreclaim(3, table, tableid, rowcount); - throw(SQL, "sql.persist_unlogged", "Cannot access %s column storage.", tname); - } - - if (ol_first_node(t->columns)) { - - for (node *ncol = ol_first_node((t)->columns); ncol; ncol = ncol->next) { + throw(SQL, "sql.persist_unlogged", SQLSTATE(42S02) "Table missing %s.%s", sname, tname); + + if ((isUnloggedTable(t) && t->access == TABLE_APPENDONLY) == false) + throw(SQL, "sql.persist_unlogged", "Unlogged and Insert Only mode combination required for table %s.%s", sname, tname); + + lng count = 0; + + sql_trans *tr = m->session->tr; + storage *t_del = bind_del_data(tr, t, NULL); + + BAT *d = BATdescriptor(t_del->cs.bid); + + if (t_del == NULL || d == NULL) + throw(SQL, "sql.persist_unlogged", "Cannot access %s column storage.", tname); + + if (BBP_status(d->batCacheid) & BBPEXISTING) { + + if (BATcount(d) > d->batInserted) { + + int n = 100; + bat *commit_list = GDKzalloc(sizeof(bat) * (n + 1)); + BUN *sizes = GDKzalloc(sizeof(BUN) * (n + 1)); + + if (commit_list == NULL || sizes == NULL) { + GDKfree(commit_list); + GDKfree(sizes); + throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); + } + + commit_list[0] = 0; + sizes[0] = 0; + int i = 1; + + for (node *ncol = ol_first_node(t->columns); ncol; ncol = ncol->next) { + sql_column *c = (sql_column *) ncol->data; - b = store->storage_api.bind_col(tr, c, RDONLY); + BAT *b = store->storage_api.bind_col(tr, c, RDONLY); if (b == NULL) { - MT_lock_unset(&store->commit); GDKfree(commit_list); GDKfree(sizes); - BBPnreclaim(3, table, tableid, rowcount); throw(SQL, "sql.persist_unlogged", "Cannot access column descriptor."); } @@ -4418,57 +4396,56 @@ SQLpersist_unlogged(Client cntxt, MalBlk } if (commit_list == NULL || sizes == NULL) { - MT_lock_unset(&store->commit); GDKfree(commit_list); GDKfree(sizes); - BBPnreclaim(3, table, tableid, rowcount); throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); } - if (BBP_status(b->batCacheid) & BBPEXISTING) { - commit_list[i] = b->batCacheid; - sizes[i] = BATcount(b); - i++; - bat_exists = true; - } + commit_list[i] = b->batCacheid; + sizes[i] = BATcount(b); + i++; } - } - - if (bat_exists) { + commit_list[i] = d->batCacheid; sizes[i] = BATcount(d); i++; + + if (TMsubcommit_list(commit_list, sizes, i, -1, -1) != GDK_SUCCEED) { + GDKfree(commit_list); + GDKfree(sizes); + throw(SQL, "sql.persist_unlogged", "Lower level commit operation failed"); + } + + GDKfree(commit_list); + GDKfree(sizes); } - } - - MT_lock_unset(&store->commit); - - if (commit_list[1] > 0 && TMsubcommit_list(commit_list, sizes, i, -1, -1) != GDK_SUCCEED) - msg = createException(SQL, "sql.persist_unlogged", GDK_EXCEPTION); + + count = BATcount(d); + } + + BAT *table = COLnew(0, TYPE_str, 0, TRANSIENT), + *tableid = COLnew(0, TYPE_int, 0, TRANSIENT), + *rowcount = COLnew(0, TYPE_lng, 0, TRANSIENT); + + if (table == NULL || tableid == NULL || rowcount == NULL) { + BBPnreclaim(3, table, tableid, rowcount); + throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); + } if (BUNappend(table, tname, false) != GDK_SUCCEED || BUNappend(tableid, &(t->base.id), false) != GDK_SUCCEED || - BUNappend(rowcount, bat_exists ? sizes + (i - 1) : sizes + 0, false) != GDK_SUCCEED) { - MT_lock_unset(&store->commit); - GDKfree(commit_list); - GDKfree(sizes); + BUNappend(rowcount, &count, false) != GDK_SUCCEED) { BBPnreclaim(3, table, tableid, rowcount); throw(SQL, "sql.persist_unlogged", SQLSTATE(HY001)); } - GDKfree(commit_list); - GDKfree(sizes); - - if (msg) - BBPnreclaim(3, table, tableid, rowcount); - else { - *o0 = table->batCacheid; - *o1 = tableid->batCacheid; - *o2 = rowcount->batCacheid; - BBPkeepref(table); - BBPkeepref(tableid); - BBPkeepref(rowcount); - } + *o0 = table->batCacheid; + *o1 = tableid->batCacheid; + *o2 = rowcount->batCacheid; + BBPkeepref(table); + BBPkeepref(tableid); + BBPkeepref(rowcount); + return msg; } _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org