Changeset: 7b37fdc8255d for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7b37fdc8255d Modified Files: sql/backends/monet5/sql_wlcr.c sql/scripts/60_wlcr.sql Branch: wlcr Log Message:
Handle update propagations append, clear table and deletion. diffs (290 lines): diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c --- a/sql/backends/monet5/sql_wlcr.c +++ b/sql/backends/monet5/sql_wlcr.c @@ -38,15 +38,38 @@ static int wlcr_replaybatches; static MT_Id wlcr_thread; static str -WLCRreplayinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +CLONEgetlogfile( Client cntxt, MalBlkPtr mb) +{ + mvc *m = NULL; + str msg; + atom *a; + + if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL) + return msg; + if ((msg = checkSQLContext(cntxt)) != NULL) + return msg; + a = stack_get_var(m, "replaylog"); + if (!a) { + throw(SQL, "sql.getVariable", "variable 'replaylog' unknown"); + } + cntxt->wlcr_replaylog = GDKstrdup(a->data.val.sval); + return MAL_SUCCEED; +} + +static str +CLONEinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { int i = 1, j,k; char path[PATHLENGTH]; str dbname,dir; FILE *fd; + str msg; (void) cntxt; (void) k; + msg = CLONEgetlogfile(cntxt, mb); + if( msg) + return msg; if (getArgType(mb, pci, i) == TYPE_str){ dbname = *getArgReference_str(stk,pci,i); @@ -144,8 +167,10 @@ WLCRprocess(void *arg) if( parseMAL(c, c->curprg, 1, 1) || mb->errors){ mnstr_printf(c->fdout,"#wlcr.process:parsing failed '%s'\n",path); } + mb = c->curprg->def; // needed q= getInstrPtr(mb, mb->stop-1); if ( getModuleId(q) == cloneRef && getFunctionId(q) ==execRef){ + pushEndInstruction(mb); printFunction(c->fdout, mb, 0, LIST_MAL_DEBUG ); // execute this block chkTypes(c->fdout,c->nspace, mb, FALSE); @@ -162,8 +187,8 @@ WLCRprocess(void *arg) mnstr_printf(c->fdout,"#wlcr.process transaction commit failed"); // cleanup + resetMalBlk(mb, 1); trimMalVariables(mb, NULL); - resetMalBlk(mb, 1); pc = 0; } } while( mb->errors == 0 && pc != mb->stop); @@ -184,7 +209,7 @@ WLCRreplay(Client cntxt, MalBlkPtr mb, M throw(SQL,"wlcr.replay","System already in replay mode"); } cntxt->wlcr_mode = WLCR_REPLAY; - msg = WLCRreplayinit(cntxt, mb, stk, pci); + msg = CLONEinit(cntxt, mb, stk, pci); if( msg) return msg; @@ -195,12 +220,12 @@ WLCRreplay(Client cntxt, MalBlkPtr mb, M } close_stream(fd); + WLCRprocess((void*) cntxt); /* - WLCRprocess((void*) cntxt); -*/ if (MT_create_thread(&wlcr_thread, WLCRprocess, (void*) cntxt, MT_THR_JOINABLE) < 0) { throw(SQL,"wlcr.replay","replay process can not be started\n"); } +*/ return MAL_SUCCEED; } @@ -213,7 +238,7 @@ WLCRclone(Client cntxt, MalBlkPtr mb, Ma if( cntxt->wlcr_mode == WLCR_CLONE || cntxt->wlcr_mode == WLCR_REPLAY){ throw(SQL,"wlcr.clone","System already in synchronization mode"); } - msg = WLCRreplayinit(cntxt, mb, stk, pci); + msg = CLONEinit(cntxt, mb, stk, pci); snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP); fd= open_rstream(path); if( fd == NULL){ @@ -222,7 +247,7 @@ WLCRclone(Client cntxt, MalBlkPtr mb, Ma close_stream(fd); cntxt->wlcr_mode = WLCR_CLONE; - msg = WLCRreplayinit(cntxt, mb, stk, pci); + msg = CLONEinit(cntxt, mb, stk, pci); if( msg) return msg; /* @@ -262,14 +287,16 @@ CLONEexec(Client cntxt, MalBlkPtr mb, Ma str CLONEquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str qry = *getArgReference_str(stk,pci,1); + str msg = MAL_SUCCEED; + lng clk = GDKms(); (void) mb; // execute the query in replay mode if( cntxt->wlcr_kind == WLCR_CATALOG || cntxt->wlcr_kind == WLCR_QUERY){ - mnstr_printf(cntxt->fdout,"#execute %s",qry); - return SQLstatementIntern(cntxt, &qry, "SQLstatement", TRUE, TRUE, NULL); + msg = SQLstatementIntern(cntxt, &qry, "SQLstatement", TRUE, TRUE, NULL); + mnstr_printf(cntxt->fdout,"# "LLFMT"ms\n",GDKms() - clk); } - return MAL_SUCCEED; + return msg; } str @@ -283,6 +310,13 @@ CLONEgeneric(Client cntxt, MalBlkPtr mb, return MAL_SUCCEED; } +#define CLONEcolumn(TPE) \ + for( i = 4; i < pci->argc; i++){\ + TPE val = *getArgReference_##TPE(stk,pci,i);\ + BUNappend(ins, (void*) &val, FALSE);\ + } + + str CLONEappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str sname, tname, cname; @@ -294,9 +328,9 @@ CLONEappend(Client cntxt, MalBlkPtr mb, BAT *ins = 0; str msg; - sname = *getArgReference_str(stk,pci,2); - tname = *getArgReference_str(stk,pci,3); - cname = *getArgReference_str(stk,pci,4); + sname = *getArgReference_str(stk,pci,1); + tname = *getArgReference_str(stk,pci,2); + cname = *getArgReference_str(stk,pci,3); if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL) return msg; @@ -311,17 +345,34 @@ CLONEappend(Client cntxt, MalBlkPtr mb, throw(SQL, "sql.append", "Table missing"); // get the data into local BAT - tpe= getArgType(mb,pci,5); + tpe= getArgType(mb,pci,4); ins = COLnew(0, tpe, 0, TRANSIENT); if( ins == NULL){ throw(SQL,"CLONEappend",MAL_MALLOC_FAIL); } - for( i = 5; i < pci->argc; i++) - BATappend(ins, NULL, (void*) getArgReference(stk,pci,i), FALSE); + switch(ATOMstorage(tpe)){ + case TYPE_bit: CLONEcolumn(bit); break; + case TYPE_bte: CLONEcolumn(bte); break; + case TYPE_sht: CLONEcolumn(sht); break; + case TYPE_int: CLONEcolumn(int); break; + case TYPE_lng: CLONEcolumn(lng); break; + case TYPE_oid: CLONEcolumn(oid); break; + case TYPE_flt: CLONEcolumn(flt); break; + case TYPE_dbl: CLONEcolumn(dbl); break; +#ifdef HAVE + case TYPE_hge: CLONEcolumn(hge); break; +#endif + case TYPE_str: + for( i = 4; i < pci->argc; i++){ + str val = *getArgReference_str(stk,pci,i); + BUNappend(ins, (void*) val, FALSE); + } + break; + } if (cname[0] != '%' && (c = mvc_bind_column(m, t, cname)) != NULL) { - store_funcs.append_col(m->session->tr, c, ins, tpe); + store_funcs.append_col(m->session->tr, c, ins, TYPE_bat); } else if (cname[0] == '%') { sql_idx *i = mvc_bind_idx(m, s, cname + 1); if (i) @@ -334,13 +385,44 @@ CLONEappend(Client cntxt, MalBlkPtr mb, str CLONEdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) -{ - (void) cntxt; - (void) mb; - (void) stk; - (void) pci; +{ + str sname, tname; + int i; + mvc *m=NULL; + sql_schema *s; + sql_table *t; + BAT *ins = 0; + str msg; + + sname = *getArgReference_str(stk,pci,1); + tname = *getArgReference_str(stk,pci,2); + + if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL) + return msg; + if ((msg = checkSQLContext(cntxt)) != NULL) + return msg; + + s = mvc_bind_schema(m, sname); + if (s == NULL) + throw(SQL, "sql.append", "Schema missing"); + t = mvc_bind_table(m, s, tname); + if (t == NULL) + throw(SQL, "sql.append", "Table missing"); + // get the data into local BAT + + ins = COLnew(0, TYPE_oid, 0, TRANSIENT); + if( ins == NULL){ + throw(SQL,"CLONEappend",MAL_MALLOC_FAIL); + } + + CLONEcolumn(oid); + + store_funcs.delete_tab(m->session->tr, t, ins, TYPE_bat); + BBPunfix(((BAT *) ins)->batCacheid); + return MAL_SUCCEED; } + str CLONEupdateOID(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { @@ -362,9 +444,23 @@ CLONEupdateValue(Client cntxt, MalBlkPtr str CLONEclear_table(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { - (void) cntxt; - (void) mb; - (void) stk; - (void) pci; + sql_schema *s; + sql_table *t; + mvc *m = NULL; + str msg; + str *sname = getArgReference_str(stk, pci, 1); + str *tname = getArgReference_str(stk, pci, 2); + + if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL) + return msg; + if ((msg = checkSQLContext(cntxt)) != NULL) + return msg; + s = mvc_bind_schema(m, *sname); + if (s == NULL) + throw(SQL, "sql.clear_table", "3F000!Schema missing"); + t = mvc_bind_table(m, s, *tname); + if (t == NULL) + throw(SQL, "sql.clear_table", "42S02!Table missing"); + (void) mvc_clear_table(m, t); return MAL_SUCCEED; } diff --git a/sql/scripts/60_wlcr.sql b/sql/scripts/60_wlcr.sql --- a/sql/scripts/60_wlcr.sql +++ b/sql/scripts/60_wlcr.sql @@ -6,13 +6,15 @@ -- Workload Capture and Replay - create procedure master() external name wlcr.master; create procedure master(threshold integer) external name wlcr.master; +declare replaylog string; +set replaylog = '/tmp/wlcr'; + create procedure replay() external name wlcr.replay; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list