Changeset: a6b49786e839 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/a6b49786e839
Modified Files:
        sql/backends/monet5/sql.c
        sql/storage/bat/bat_storage.c
        sql/storage/sql_storage.h
Branch: github_7273
Log Message:

Fix locked access to update BAT pair.


diffs (truncated from 315 to 300 lines):

diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -1209,15 +1209,16 @@ str
 mvc_bind_wrap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        int upd = (pci->argc == 7 || pci->argc == 9);
-       BAT *b = NULL, *bn;
+       BAT *b = NULL;
        bat *bid = getArgReference_bat(stk, pci, 0);
-       int coltype = getBatType(getArgType(mb, pci, 0));
        mvc *m = NULL;
        str msg;
-       const char *sname = *getArgReference_str(stk, pci, 2 + upd);
-       const char *tname = *getArgReference_str(stk, pci, 3 + upd);
-       const char *cname = *getArgReference_str(stk, pci, 4 + upd);
-       int access = *getArgReference_int(stk, pci, 5 + upd);
+       const char *sname       = *getArgReference_str(stk, pci, 2 + upd);
+       const char *tname       = *getArgReference_str(stk, pci, 3 + upd);
+       const char *cname       = *getArgReference_str(stk, pci, 4 + upd);
+       const int       access  = *getArgReference_int(stk, pci, 5 + upd);
+
+       const bool partitioned_access = pci->argc == (8 + upd) && 
getArgType(mb, pci, 6 + upd) == TYPE_int;
 
        /* This doesn't work with quick access for now... */
        assert(access != QUICK);
@@ -1232,103 +1233,71 @@ mvc_bind_wrap(Client cntxt, MalBlkPtr mb
                throw(SQL, "sql.bind", SQLSTATE(42000) "%s '%s' is not 
persistent",
                          TABLE_TYPE_DESCRIPTION(t->type, t->properties), 
t->base.name);
        sql_column *c = mvc_bind_column(m, t, cname);
-       b = mvc_bind(m, sname, tname, cname, access);
-       if (b && b->ttype && b->ttype != coltype) {
-               BBPunfix(b->batCacheid);
-               throw(SQL,"sql.bind",SQLSTATE(42000) "Column type mismatch 
%s.%s.%s",sname,tname,cname);
-       }
-       if (b) {
-               if (pci->argc == (8 + upd) && getArgType(mb, pci, 6 + upd) == 
TYPE_int) {
-                       BUN cnt = store->storage_api.count_col(m->session->tr, 
c, 0), psz;
-                       /* partitioned access */
-                       int part_nr = *getArgReference_int(stk, pci, 6 + upd);
-                       int nr_parts = *getArgReference_int(stk, pci, 7 + upd);
-
-                       if (access == 0) {
-                               BUN l, h;
-                               psz = cnt ? (cnt / nr_parts) : 0;
-                               l = part_nr * psz;
-                               if (l > cnt)
-                                       l = cnt;
-                               h = (part_nr + 1 == nr_parts) ? cnt : ((part_nr 
+ 1) * psz);
-                               if (h > cnt)
-                                       h = cnt;
-                               bn = BATslice(b, l, h);
-                               if(bn == NULL) {
-                                       BBPunfix(b->batCacheid);
-                                       throw(SQL, "sql.bind", GDK_EXCEPTION);
-                               }
-                               BAThseqbase(bn, l);
-                       } else {
-                               /* BAT b holds the UPD_ID bat */
-                               oid l, h;
-                               cnt = 
store->storage_api.count_col(m->session->tr, c, 0);
-                               psz = cnt ? (cnt / nr_parts) : 0;
-                               l = part_nr * psz;
-                               if (l > cnt)
-                                       l = cnt;
-                               h = (part_nr + 1 == nr_parts) ? cnt : ((part_nr 
+ 1) * psz);
-                               if (h > cnt)
-                                       h = cnt;
-                               h--;
-                               bn = BATselect(b, NULL, &l, &h, true, true, 
false);
-                               if(bn == NULL) {
-                                       BBPunfix(b->batCacheid);
-                                       throw(SQL, "sql.bind", GDK_EXCEPTION);
-                               }
+
+       if (partitioned_access) {
+               /* partitioned access */
+               int part_nr = *getArgReference_int(stk, pci, 6 + upd);
+               int nr_parts = *getArgReference_int(stk, pci, 7 + upd);
+               BUN cnt = store->storage_api.count_col(m->session->tr, c, 0), 
psz;
+               oid l, h;
+               psz = cnt ? (cnt / nr_parts) : 0;
+               l = part_nr * psz;
+               if (l > cnt)
+                       l = cnt;
+               h = (part_nr + 1 == nr_parts) ? cnt : ((part_nr + 1) * psz);
+               if (h > cnt)
+                       h = cnt;
+
+               if (upd) {
+                       sql_updates* updates = 
store->storage_api.bind_updates(m->session->tr, c);
+
+                       if (!updates)
+                               throw(SQL,"sql.bind",SQLSTATE(HY005) "Cannot 
access the update columns");
+                       BAT *ui = updates->ui;
+                       BAT *uv = updates->uv;
+                       GDKfree(updates);
+
+                       h--;
+                       BAT* bn = BATselect(ui, NULL, &l, &h, true, true, 
false);
+                       if(bn == NULL) {
+                               BBPunfix(ui->batCacheid);
+                               BBPunfix(uv->batCacheid);
+                               throw(SQL, "sql.bind", GDK_EXCEPTION);
                        }
-                       BBPunfix(b->batCacheid);
-                       b = bn;
-               } else if (upd) {
-                       BAT *uv = mvc_bind(m, sname, tname, cname, RD_UPD_VAL);
+
                        bat *uvl = getArgReference_bat(stk, pci, 1);
 
-                       if (uv == NULL) {
-                               BBPunfix(b->batCacheid);
-                               throw(SQL,"sql.bind",SQLSTATE(HY005) "Cannot 
access the update column %s.%s.%s",
-                                       sname,tname,cname);
-                       }
-                       *bid = b->batCacheid;
-                       BBPkeepref(b);
-                       *uvl = uv->batCacheid;
-                       BBPkeepref(uv);
-                       return MAL_SUCCEED;
-               }
-               if (upd) {
-                       bat *uvl = getArgReference_bat(stk, pci, 1);
-
-                       if (BATcount(b)) {
-                               BAT *uv = mvc_bind(m, sname, tname, cname, 
RD_UPD_VAL);
-                               BAT *ui = mvc_bind(m, sname, tname, cname, 
RD_UPD_ID);
+                       if (BATcount(bn)) {
                                BAT *id;
                                BAT *vl;
                                if (ui == NULL || uv == NULL) {
                                        bat_destroy(uv);
                                        bat_destroy(ui);
-                                       BBPunfix(b->batCacheid);
+                                       BBPunfix(bn->batCacheid);
                                        throw(SQL,"sql.bind",SQLSTATE(HY005) 
"Cannot access the insert column %s.%s.%s",
                                                sname, tname, cname);
                                }
-                               id = BATproject(b, ui);
-                               vl = BATproject(b, uv);
+                               assert(uv->batCount == ui->batCount);
+                               id = BATproject(bn, ui);
+                               vl = BATproject(bn, uv);
                                bat_destroy(ui);
                                bat_destroy(uv);
                                if (id == NULL || vl == NULL) {
-                                       BBPunfix(b->batCacheid);
+                                       BBPunfix(bn->batCacheid);
                                        bat_destroy(id);
                                        bat_destroy(vl);
                                        throw(SQL, "sql.bind", GDK_EXCEPTION);
                                }
                                if ( BATcount(id) != BATcount(vl)){
-                                       BBPunfix(b->batCacheid);
+                                       BBPunfix(bn->batCacheid);
                                        bat_destroy(id);
                                        bat_destroy(vl);
                                        throw(SQL, "sql.bind", SQLSTATE(0000) 
"Inconsistent BAT count");
                                }
+                               BBPkeepref(id);
+                               BBPkeepref(vl);
                                *bid = id->batCacheid;
-                               BBPkeepref(id);
                                *uvl = vl->batCacheid;
-                               BBPkeepref(vl);
                        } else {
                                *bid = e_bat(TYPE_oid);
                                *uvl = e_bat(c->type.type->localtype);
@@ -1341,16 +1310,51 @@ mvc_bind_wrap(Client cntxt, MalBlkPtr mb
                                        throw(SQL, "sql.bind", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                                }
                        }
+               } else {
+                       int coltype = getBatType(getArgType(mb, pci, 0));
+                       b = store->storage_api.bind_col(m->session->tr, c, 
access);
+
+                       if (b && b->ttype && b->ttype != coltype) {
+                               BBPunfix(b->batCacheid);
+                               throw(SQL,"sql.bind",SQLSTATE(42000) "Column 
type mismatch %s.%s.%s",sname,tname,cname);
+                       }
+
+                       BAT* bn = BATslice(b, l, h);
+                       if(bn == NULL) {
+                               BBPunfix(b->batCacheid);
+                               throw(SQL, "sql.bind", GDK_EXCEPTION);
+                       }
+                       BAThseqbase(bn, l);
                        BBPunfix(b->batCacheid);
-               } else {
-                       *bid = b->batCacheid;
-                       BBPkeepref(b);
+                       BBPkeepref(bn);
+                       *bid = bn->batCacheid;
                }
-               return MAL_SUCCEED;
-       }
-       if (!strNil(sname))
-               throw(SQL, "sql.bind", SQLSTATE(42000) "unable to find 
%s.%s(%s)", sname, tname, cname);
-       throw(SQL, "sql.bind", SQLSTATE(42000) "unable to find %s(%s)", tname, 
cname);
+       }
+       else if (upd) { /*unpartitioned access to update bats*/
+               sql_updates* updates = 
store->storage_api.bind_updates(m->session->tr, c);
+
+               if (!updates)
+                       throw(SQL,"sql.bind",SQLSTATE(HY005) "Cannot access the 
update columns");
+
+               bat *uvl = getArgReference_bat(stk, pci, 1);
+               BBPkeepref(updates->ui);
+               BBPkeepref(updates->uv);
+               *bid = updates->ui->batCacheid;
+               *uvl = updates->uv->batCacheid;
+               GDKfree(updates);
+       }
+       else { /*unpartitioned access to base column*/
+               int coltype = getBatType(getArgType(mb, pci, 0));
+               b = store->storage_api.bind_col(m->session->tr, c, access);
+
+               if (b && b->ttype && b->ttype != coltype) {
+                       BBPunfix(b->batCacheid);
+                       throw(SQL,"sql.bind",SQLSTATE(42000) "Column type 
mismatch %s.%s.%s",sname,tname,cname);
+               }
+               BBPkeepref(b);
+               *bid = b->batCacheid;
+       }
+       return MAL_SUCCEED;
 }
 
 /* The output of this function are 7 columns:
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -996,6 +996,42 @@ cs_bind_bat( column_storage *cs, int acc
        return s;
 }
 
+static void*
+bind_updates(sql_trans *tr, sql_column *c) {
+       sql_updates* upd = GDKmalloc(sizeof(sql_updates));
+       if (!upd)
+               return NULL;
+
+       lock_column(tr->store, c->base.id);
+       size_t cnt = count_col(tr, c, 0);
+       sql_delta *d = col_timestamp_delta(tr, c);
+       int type = c->type.type->localtype;
+
+       if (!d) {
+               unlock_column(tr->store, c->base.id);
+               GDKfree(upd);
+               return NULL;
+       }
+       if (d->cs.st == ST_DICT) {
+               BAT *b = quick_descriptor(d->cs.bid);
+
+               type = b->ttype;
+       }
+
+       upd->ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
+       upd->uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
+
+       unlock_column(tr->store, c->base.id);
+
+       if (upd->ui == NULL || upd->uv == NULL) {
+               bat_destroy(upd->ui);
+               bat_destroy(upd->uv);
+               GDKfree(upd);
+               return NULL;
+       }
+       return upd;
+}
+
 static void *                                  /* BAT * */
 bind_col(sql_trans *tr, sql_column *c, int access)
 {
@@ -4827,6 +4863,7 @@ void
 bat_storage_init( store_functions *sf)
 {
        sf->bind_col = &bind_col;
+       sf->bind_updates = &bind_updates;
        sf->bind_idx = &bind_idx;
        sf->bind_cands = &bind_cands;
 
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -69,6 +69,11 @@ typedef struct subrids {
        void *rids;
 } subrids;
 
+typedef struct sql_updates {
+       BAT* ui;
+       BAT* uv;
+} sql_updates;
+
 /* returns table rids, for the given select ranges */
 typedef rids *(*rids_select_fptr)( sql_trans *tr, sql_column *key, const void 
*key_value_low, const void *key_value_high, ...);
 
@@ -131,6 +136,7 @@ typedef struct table_functions {
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to