Changeset: 3bf5307afae1 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3bf5307afae1 Added Files: monetdb5/modules/mal/wlcr.c monetdb5/modules/mal/wlcr.h monetdb5/modules/mal/wlcr.mal monetdb5/optimizer/opt_wlcr.c monetdb5/optimizer/opt_wlcr.h Modified Files: monetdb5/mal/mal_builder.c monetdb5/mal/mal_client.c monetdb5/mal/mal_client.h monetdb5/mal/mal_interpreter.c monetdb5/modules/mal/Makefile.ag monetdb5/modules/mal/mal_init.mal monetdb5/optimizer/Makefile.ag monetdb5/optimizer/opt_pipes.c monetdb5/optimizer/opt_prelude.c monetdb5/optimizer/opt_prelude.h monetdb5/optimizer/opt_support.c monetdb5/optimizer/opt_wrapper.c monetdb5/optimizer/optimizer.mal sql/backends/monet5/sql.c sql/backends/monet5/sql_scenario.c sql/backends/monet5/sql_transaction.c Branch: wlcr Log Message:
WorkLoad Capture and Replay First batch of extensions. See wlcr.c for more info diffs (truncated from 1215 to 300 lines): diff --git a/monetdb5/mal/mal_builder.c b/monetdb5/mal/mal_builder.c --- a/monetdb5/mal/mal_builder.c +++ b/monetdb5/mal/mal_builder.c @@ -454,11 +454,16 @@ pushStr(MalBlkPtr mb, InstrPtr q, const if (q == NULL) return NULL; cst.vtype= TYPE_str; - if ((cst.val.sval= GDKstrdup(Val)) == NULL) { - freeInstruction(q); - return NULL; + if( Val == 0){ + cst.val.sval = 0; + cst.len= 0; + } else{ + if ( Val != NULL && (cst.val.sval= GDKstrdup(Val)) == NULL) { + freeInstruction(q); + return NULL; + } + cst.len= (int) strlen(cst.val.sval); } - cst.len= (int) strlen(cst.val.sval); _t = defConstant(mb,TYPE_str,&cst); return pushArgument(mb, q, _t); } diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c --- a/monetdb5/mal/mal_client.c +++ b/monetdb5/mal/mal_client.c @@ -251,6 +251,8 @@ MCinitClientRecord(Client c, oid user, b /* create a recycler cache */ c->exception_buf_initialized = 0; c->error_row = c->error_fld = c->error_msg = c->error_input = NULL; + c->wlcr_kind = 0; + c->wlcr = NULL; #ifndef HAVE_EMBEDDED /* no authentication in embedded mode */ { str msg = AUTHgetUsername(&c->username, c); @@ -403,6 +405,10 @@ freeClient(Client c) BBPdecref(c->error_msg->batCacheid,TRUE); BBPdecref(c->error_input->batCacheid,TRUE); c->error_row = c->error_fld = c->error_msg = c->error_input = NULL; + if( c->wlcr) + freeMalBlk(c->wlcr); + c->wlcr_kind = 0; + c->wlcr = NULL; } if (t) THRdel(t); /* you may perform suicide */ diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h --- a/monetdb5/mal/mal_client.h +++ b/monetdb5/mal/mal_client.h @@ -171,6 +171,11 @@ typedef struct CLIENT { */ bit active; /* processing a query or not */ Workset inprogress[THREADS]; + /* + * The workload for replication/replay is saved initially as a MAL block. + */ + int wlcr_kind; + MalBlkPtr wlcr; /* * Errors during copy into are collected in a user specific column set */ diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c --- a/monetdb5/mal/mal_interpreter.c +++ b/monetdb5/mal/mal_interpreter.c @@ -1434,6 +1434,7 @@ void garbageCollector(Client cntxt, MalB printStack(cntxt->fdout, mb, stk, 0); } #endif + assert(mb->vtop < mb->vsize); (void) flag; for (k = 0; k < mb->vtop; k++) { // if (isVarCleanup(mb, k) ){ diff --git a/monetdb5/modules/mal/Makefile.ag b/monetdb5/modules/mal/Makefile.ag --- a/monetdb5/modules/mal/Makefile.ag +++ b/monetdb5/modules/mal/Makefile.ag @@ -36,6 +36,7 @@ lib_mal = { mkey.c mkey.h \ manifold.c manifold.h \ oltp.c oltp.h \ + wlcr.c wlcr.h \ pcre.c \ profiler.c profiler.h \ querylog.c querylog.h \ @@ -59,7 +60,7 @@ headers_mal = { inspect.mal manual.mal mal_io.mal mkey.mal manifold.mal \ iterator.mal clients.mal \ factories.mal groupby.mal mdb.mal pcre.mal mat.mal \ - transaction.mal oltp.mal \ + transaction.mal oltp.mal wlcr.mal \ mal_mapi.mal sabaoth.mal remote.mal \ txtsim.mal \ tokenizer.mal sample.mal json_util.mal \ diff --git a/monetdb5/modules/mal/mal_init.mal b/monetdb5/modules/mal/mal_init.mal --- a/monetdb5/modules/mal/mal_init.mal +++ b/monetdb5/modules/mal/mal_init.mal @@ -94,6 +94,7 @@ include srvpool; include mal_mapi; include oltp; +include wlcr; # Any extensions (MAL scripts) that should be automatically loaded upon # startup can be placed in the autoload directory. One typically finds diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c new file mode 100644 --- /dev/null +++ b/monetdb5/modules/mal/wlcr.c @@ -0,0 +1,461 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V. + */ + +/* + * (c) Martin Kersten + * This module collects the workload-capture-replay statements during transaction execution. + * + * Each wlcr log file contains a serial log for a transaction batch. + * Each job is identified by the original owner of the query, the snapshot identity (name+nr) against + * which it was ran, an indication of the kind of transaction and commit/rollback, its runtime (in ms) and starting time. + * + * Replaying the wlcr against another server based on the same snapshot should produce a perfect copy. + * Each job should be executed using the credentials of the user issuing the transaction. + * Any failuer encountered terminates the replication process. + * + * All wlcr files should be stored on a shared file system for all replicas to access. + * The default is a subdirectory of the database and act as a secondary database rebuild log. + * The location can be overruled using a full path to a shared disk as GDKenvironemnt variable (wlcr_dir) + * + * The wlcr files have a textual format derived from the MAL statements. + * This can be used to ease the implementation of the wlreplay + * + * The logs may only be removed after a new snapshot has been taken or wlcr is disabled + */ +#include "monetdb_config.h" +#include "mal_builder.h" +#include "wlcr.h" + +static MT_Lock wlcr_lock MT_LOCK_INITIALIZER("wlcr_lock"); + + +int wlcr_duration = INT_MAX; // how long to capture default= 0 +int wlcr_threshold = 0; // threshold (milliseconds) for keeping readonly queries +int wlcr_deltas = 1; // sent the delta values +int wlcr_all = 1; // also ship failed transaction +str wlcr_snapshot= "baseline"; // name assigned to the snapshot +int wlcr_unit = 0; // last job executed + +static char *wlcr_name[]= {"","query","update","catalog"}; + +static stream *wlcr_fd = 0; +static str wlcr_log = "/tmp/wlcr"; + +static InstrPtr +WLCRaddtime(Client cntxt, InstrPtr pci, InstrPtr p) +{ + char *tbuf; + char ctm[26]; + time_t clk = pci->clock.tv_sec; + +#ifdef HAVE_CTIME_R3 + tbuf = ctime_r(&clk, ctm, sizeof(ctm)); +#else +#ifdef HAVE_CTIME_R + tbuf = ctime_r(&clk, ctm); +#else + tbuf = ctime(&clk); +#endif +#endif + tbuf[19]=0; + return pushStr(cntxt->wlcr, p, tbuf); +} + +#define WLCR_start()\ +{ Symbol s; \ + if( cntxt->wlcr == NULL){\ + s = newSymbol("wlrc", FUNCTIONsymbol);\ + cntxt->wlcr_kind = WLCR_QUERY;\ + cntxt->wlcr = s->def;\ + s->def = NULL;\ + } \ + if( cntxt->wlcr->stop == 0){\ + p = newStmt(cntxt->wlcr,"wlreplay","job");\ + p = pushStr(cntxt->wlcr,p, cntxt->username);\ + p = pushStr(cntxt->wlcr,p, wlcr_snapshot);\ + p = pushInt(cntxt->wlcr,p, wlcr_unit);\ + p = WLCRaddtime(cntxt,pci, p); \ + p->ticks = GDKms();\ +} } + +str +WLCRproperties (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + int i; + + (void) cntxt; + (void) mb; + + i = *getArgReference_int(stk,pci,1); + if ( i < 0) + throw(MAL,"wlcr.properties","Duration must be a possitive number"); + wlcr_duration = i; + + i = *getArgReference_int(stk,pci,2); + if ( i < 0) + throw(MAL,"wlcr.properties","Duration must be a possitive number"); + wlcr_threshold = i; + + wlcr_deltas = *getArgReference_int(stk,pci,3) != 0; + wlcr_all = *getArgReference_int(stk,pci,4) != 0; + return MAL_SUCCEED; +} + +str +WLCRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + str snapshot = *getArgReference_str(stk,pci,1); + int tid = *getArgReference_int(stk,pci,2); + + (void) cntxt; + (void) mb; + + if ( strcmp(snapshot, wlcr_snapshot)) + throw(MAL,"wlcr.job","Incompatible snapshot identifier"); + if ( tid < wlcr_unit) + throw(MAL,"wlcr.job","Work unit identifier is before last one executed"); + return MAL_SUCCEED; +} + +str +WLCRfin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + (void) cntxt; + (void) mb; + (void) stk; + (void) pci; + return MAL_SUCCEED; +} + +str +WLCRquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ InstrPtr p; + + (void) stk; + if ( strcmp("-- no query",getVarConstant(mb, getArg(pci,1)).val.sval) == 0) + return MAL_SUCCEED; + WLCR_start(); + p = newStmt(cntxt->wlcr, "wlreplay","query"); + p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,1)).val.sval); + p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,2)).val.sval); + p->ticks = GDKms(); + return MAL_SUCCEED; +} + +str +WLCRgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ InstrPtr p; + int i, tpe, varid; + (void) stk; + + WLCR_start(); + p = newStmt(cntxt->wlcr, "wlreplay",getFunctionId(pci)); + for( i = pci->retc; i< pci->argc; i++){ + tpe =getArgType(mb, pci, i); + switch(tpe){ + case TYPE_str: + p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci, i)).val.sval); + break; + default: + varid = defConstant(cntxt->wlcr, tpe, getArgReference(stk, pci, i)); + p = pushArgument(cntxt->wlcr, p, varid); + } + } + p->ticks = GDKms(); + cntxt->wlcr_kind = WLCR_CATALOG; + return MAL_SUCCEED; +} + +#define bulk(TPE1, TPE2)\ +{ TPE1 *p = (TPE1 *) Tloc(b,0);\ + TPE1 *q = (TPE1 *) Tloc(b, BUNlast(b));\ + int k=0; \ + for( ; p < q; p++, k++){\ + if( k % 32 == 31){\ + pci = newStmt(cntxt->wlcr, "wlreplay",getFunctionId(pci));\ + pci = pushStr(cntxt->wlcr, pci, sch);\ + pci = pushStr(cntxt->wlcr, pci, tbl);\ + pci = pushStr(cntxt->wlcr, pci, col);\ + pci->ticks = GDKms();\ + }\ + pci = push##TPE2(cntxt->wlcr, pci ,*p);\ +} } + +static void +WLCRdatashipping(Client cntxt, MalBlkPtr mb, InstrPtr pci, int bid) +{ BAT *b; _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list