Changeset: 7b37fdc8255d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7b37fdc8255d
Modified Files:
        sql/backends/monet5/sql_wlcr.c
        sql/scripts/60_wlcr.sql
Branch: wlcr
Log Message:

Handle update propagations
append, clear table and deletion.


diffs (290 lines):

diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c
--- a/sql/backends/monet5/sql_wlcr.c
+++ b/sql/backends/monet5/sql_wlcr.c
@@ -38,15 +38,38 @@ static int wlcr_replaybatches;
 static MT_Id wlcr_thread;
 
 static str
-WLCRreplayinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+CLONEgetlogfile( Client cntxt, MalBlkPtr mb)
+{
+       mvc *m = NULL;
+       str msg;
+       atom *a;
+
+       if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
+               return msg;
+       if ((msg = checkSQLContext(cntxt)) != NULL)
+               return msg;
+       a = stack_get_var(m, "replaylog");
+       if (!a) {
+               throw(SQL, "sql.getVariable", "variable 'replaylog' unknown");
+       }
+       cntxt->wlcr_replaylog = GDKstrdup(a->data.val.sval);
+       return MAL_SUCCEED;
+}
+
+static str
+CLONEinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
     int i = 1, j,k;
        char path[PATHLENGTH];
        str dbname,dir;
        FILE *fd;
+       str msg;
 
        (void) cntxt;
        (void) k;
+       msg = CLONEgetlogfile(cntxt, mb);
+       if( msg)
+               return msg;
 
     if (getArgType(mb, pci, i) == TYPE_str){
         dbname =  *getArgReference_str(stk,pci,i);
@@ -144,8 +167,10 @@ WLCRprocess(void *arg)
                        if( parseMAL(c, c->curprg, 1, 1)  || mb->errors){
                                mnstr_printf(c->fdout,"#wlcr.process:parsing 
failed '%s'\n",path);
                        }
+                       mb = c->curprg->def; // needed
                        q= getInstrPtr(mb, mb->stop-1);
                        if ( getModuleId(q) == cloneRef && getFunctionId(q) 
==execRef){
+                               pushEndInstruction(mb);
                                printFunction(c->fdout, mb, 0, LIST_MAL_DEBUG );
                                // execute this block
                                chkTypes(c->fdout,c->nspace, mb, FALSE);
@@ -162,8 +187,8 @@ WLCRprocess(void *arg)
                                        mnstr_printf(c->fdout,"#wlcr.process 
transaction commit failed");
 
                                // cleanup
+                               resetMalBlk(mb, 1);
                                trimMalVariables(mb, NULL);
-                               resetMalBlk(mb, 1);
                                pc = 0;
                        }
                } while( mb->errors == 0 && pc != mb->stop);
@@ -184,7 +209,7 @@ WLCRreplay(Client cntxt, MalBlkPtr mb, M
                throw(SQL,"wlcr.replay","System already in replay mode");
        }
        cntxt->wlcr_mode = WLCR_REPLAY;
-       msg = WLCRreplayinit(cntxt, mb, stk, pci);
+       msg = CLONEinit(cntxt, mb, stk, pci);
        if( msg)
                return msg;
 
@@ -195,12 +220,12 @@ WLCRreplay(Client cntxt, MalBlkPtr mb, M
        }
        close_stream(fd);
 
+       WLCRprocess((void*) cntxt);
 /*
-       WLCRprocess((void*) cntxt);
-*/
     if (MT_create_thread(&wlcr_thread, WLCRprocess, (void*) cntxt, 
MT_THR_JOINABLE) < 0) {
                        throw(SQL,"wlcr.replay","replay process can not be 
started\n");
        }
+*/
        return MAL_SUCCEED;
 }
 
@@ -213,7 +238,7 @@ WLCRclone(Client cntxt, MalBlkPtr mb, Ma
        if( cntxt->wlcr_mode == WLCR_CLONE || cntxt->wlcr_mode == WLCR_REPLAY){
                throw(SQL,"wlcr.clone","System already in synchronization 
mode");
        }
-       msg = WLCRreplayinit(cntxt, mb, stk, pci);
+       msg = CLONEinit(cntxt, mb, stk, pci);
        snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP);
        fd= open_rstream(path);
        if( fd == NULL){
@@ -222,7 +247,7 @@ WLCRclone(Client cntxt, MalBlkPtr mb, Ma
        close_stream(fd);
 
        cntxt->wlcr_mode = WLCR_CLONE;
-       msg = WLCRreplayinit(cntxt, mb, stk, pci);
+       msg = CLONEinit(cntxt, mb, stk, pci);
        if( msg)
                return msg;
 /*
@@ -262,14 +287,16 @@ CLONEexec(Client cntxt, MalBlkPtr mb, Ma
 str
 CLONEquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      str qry =  *getArgReference_str(stk,pci,1);
+       str msg = MAL_SUCCEED;
+       lng clk = GDKms();
 
        (void) mb;
        // execute the query in replay mode
        if( cntxt->wlcr_kind == WLCR_CATALOG || cntxt->wlcr_kind == WLCR_QUERY){
-               mnstr_printf(cntxt->fdout,"#execute %s",qry);
-               return SQLstatementIntern(cntxt, &qry, "SQLstatement", TRUE, 
TRUE, NULL);
+               msg =  SQLstatementIntern(cntxt, &qry, "SQLstatement", TRUE, 
TRUE, NULL);
+               mnstr_printf(cntxt->fdout,"# "LLFMT"ms\n",GDKms() - clk);
        }
-       return MAL_SUCCEED;
+       return msg;
 }
 
 str
@@ -283,6 +310,13 @@ CLONEgeneric(Client cntxt, MalBlkPtr mb,
        return MAL_SUCCEED;
 }
 
+#define CLONEcolumn(TPE) \
+               for( i = 4; i < pci->argc; i++){\
+                       TPE val = *getArgReference_##TPE(stk,pci,i);\
+                       BUNappend(ins, (void*) &val, FALSE);\
+               }
+
+
 str
 CLONEappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      str sname, tname, cname;
@@ -294,9 +328,9 @@ CLONEappend(Client cntxt, MalBlkPtr mb, 
        BAT *ins = 0;
        str msg;
 
-       sname = *getArgReference_str(stk,pci,2);
-       tname = *getArgReference_str(stk,pci,3);
-       cname = *getArgReference_str(stk,pci,4);
+       sname = *getArgReference_str(stk,pci,1);
+       tname = *getArgReference_str(stk,pci,2);
+       cname = *getArgReference_str(stk,pci,3);
 
        if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
                return msg;
@@ -311,17 +345,34 @@ CLONEappend(Client cntxt, MalBlkPtr mb, 
                throw(SQL, "sql.append", "Table missing");
        // get the data into local BAT
 
-       tpe= getArgType(mb,pci,5);
+       tpe= getArgType(mb,pci,4);
        ins = COLnew(0, tpe, 0, TRANSIENT);
        if( ins == NULL){
                throw(SQL,"CLONEappend",MAL_MALLOC_FAIL);
        }
 
-       for( i = 5; i < pci->argc; i++)
-               BATappend(ins, NULL, (void*) getArgReference(stk,pci,i), FALSE);
+       switch(ATOMstorage(tpe)){
+       case TYPE_bit: CLONEcolumn(bit); break;
+       case TYPE_bte: CLONEcolumn(bte); break;
+       case TYPE_sht: CLONEcolumn(sht); break;
+       case TYPE_int: CLONEcolumn(int); break;
+       case TYPE_lng: CLONEcolumn(lng); break;
+       case TYPE_oid: CLONEcolumn(oid); break;
+       case TYPE_flt: CLONEcolumn(flt); break;
+       case TYPE_dbl: CLONEcolumn(dbl); break;
+#ifdef HAVE
+       case TYPE_hge: CLONEcolumn(hge); break;
+#endif
+       case TYPE_str:
+               for( i = 4; i < pci->argc; i++){
+                       str val = *getArgReference_str(stk,pci,i);
+                       BUNappend(ins, (void*) val, FALSE);
+               }
+               break;
+       }
 
        if (cname[0] != '%' && (c = mvc_bind_column(m, t, cname)) != NULL) {
-               store_funcs.append_col(m->session->tr, c, ins, tpe);
+               store_funcs.append_col(m->session->tr, c, ins, TYPE_bat);
        } else if (cname[0] == '%') {
                sql_idx *i = mvc_bind_idx(m, s, cname + 1);
                if (i)
@@ -334,13 +385,44 @@ CLONEappend(Client cntxt, MalBlkPtr mb, 
 
 str
 CLONEdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
-       (void) cntxt;
-       (void) mb;
-       (void) stk;
-       (void) pci;
+{      
+       str sname, tname;
+    int i;
+       mvc *m=NULL;
+       sql_schema *s;
+       sql_table *t;
+       BAT *ins = 0;
+       str msg;
+
+       sname = *getArgReference_str(stk,pci,1);
+       tname = *getArgReference_str(stk,pci,2);
+
+       if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
+               return msg;
+       if ((msg = checkSQLContext(cntxt)) != NULL)
+               return msg;
+
+       s = mvc_bind_schema(m, sname);
+       if (s == NULL)
+               throw(SQL, "sql.append", "Schema missing");
+       t = mvc_bind_table(m, s, tname);
+       if (t == NULL)
+               throw(SQL, "sql.append", "Table missing");
+       // get the data into local BAT
+
+       ins = COLnew(0, TYPE_oid, 0, TRANSIENT);
+       if( ins == NULL){
+               throw(SQL,"CLONEappend",MAL_MALLOC_FAIL);
+       }
+
+       CLONEcolumn(oid); 
+
+    store_funcs.delete_tab(m->session->tr, t, ins, TYPE_bat);
+       BBPunfix(((BAT *) ins)->batCacheid);
+
        return MAL_SUCCEED;
 }
+
 str
 CLONEupdateOID(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -362,9 +444,23 @@ CLONEupdateValue(Client cntxt, MalBlkPtr
 str
 CLONEclear_table(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
-       (void) cntxt;
-       (void) mb;
-       (void) stk;
-       (void) pci;
+       sql_schema *s;
+       sql_table *t;
+       mvc *m = NULL;
+       str msg;
+       str *sname = getArgReference_str(stk, pci, 1);
+       str *tname = getArgReference_str(stk, pci, 2);
+
+       if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
+               return msg;
+       if ((msg = checkSQLContext(cntxt)) != NULL)
+               return msg;
+       s = mvc_bind_schema(m, *sname);
+       if (s == NULL)
+               throw(SQL, "sql.clear_table", "3F000!Schema missing");
+       t = mvc_bind_table(m, s, *tname);
+       if (t == NULL)
+               throw(SQL, "sql.clear_table", "42S02!Table missing");
+       (void) mvc_clear_table(m, t);
        return MAL_SUCCEED;
 }
diff --git a/sql/scripts/60_wlcr.sql b/sql/scripts/60_wlcr.sql
--- a/sql/scripts/60_wlcr.sql
+++ b/sql/scripts/60_wlcr.sql
@@ -6,13 +6,15 @@
 
 -- Workload Capture and Replay
 
-
 create procedure master()
 external name wlcr.master;
 
 create procedure master(threshold integer)
 external name wlcr.master;
 
+declare replaylog string;
+set replaylog = '/tmp/wlcr';
+
 create procedure replay()
 external name wlcr.replay;
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to