Changeset: f097aee982a1 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f097aee982a1 Added Files: sql/backends/monet5/sql_wlcr.c sql/backends/monet5/sql_wlcr.h sql/backends/monet5/sql_wlcr.mal sql/scripts/60_wlcr.sql Modified Files: monetdb5/modules/mal/wlcr.c monetdb5/modules/mal/wlcr.h monetdb5/modules/mal/wlcr.mal sql/backends/monet5/40_sql.mal sql/backends/monet5/Makefile.ag sql/scripts/Makefile.ag Branch: wlcr Log Message:
Basic handling of WLCR logs writes diffs (truncated from 550 to 300 lines): diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c --- a/monetdb5/modules/mal/wlcr.c +++ b/monetdb5/modules/mal/wlcr.c @@ -7,62 +7,129 @@ */ /* - * (c) Martin Kersten + * (c) 2017 Martin Kersten * This module collects the workload-capture-replay statements during transaction execution. + * It is used primarilly for replication management and workload replay + * + * The goal is to maintain a replica of a master database. All data of the master + * is basically only available for read only access. Accidental corruption of this + * data is avoided by setting ownership and access properties at the SQL level in the replica. + * + * + * IMPLEMENTATION + * + * The replica directory should be on a shared (global) file system. + * As default we use dbfarm/master. + * + * The binary dump for the database snapshot should be stored there in master/bat. + * The associated log files are stored as master/wlcr<number>. + * Creation and restore of a snapshot should be a monetdb option. TODO + * + * Replication management start when you run the command + * CALL wlcr.master("(full)path to snapshot dir") + * It can also be passed as a command line parameter + * --set wlcr_dir="(full)path to snapshot dir" * * Each wlcr log file contains a serial log for a transaction batch. - * Each job is identified by the original owner of the query, the snapshot identity (name+nr) against - * which it was ran, an indication of the kind of transaction and commit/rollback, its runtime (in ms) and starting time. + * Each job is identified by the owner of the query, the snapshot tag, + * commit/rollback, its starting time and runtime (in ms). * - * Replaying the wlcr against another server based on the same snapshot should produce a perfect copy. - * Each job should be executed using the credentials of the user issuing the transaction. - * Any failuer encountered terminates the replication process. + * Logging of queries can be further limited to those that satisfy a threshold. + * CALL wlcr.master("(full)path to snapshot dir", threshold) + * The threshold is given in milliseconds. A negative threshold leads to ignoring all queries. + * + * A replica server should issue the matching call + * CALL wlcr.synchronize("(full)path to snapshot dir") + * + * During synchronization only updates are executed for the user responsible for the call. + * Queries are simply ignored unless needed as replacement for update actions. + * + * The alternative is to replay the log + * CALL wlcr.replay("(full)path to snapshot dir") + * In this mode all queries are executed under the credentials of the query owner, including those that lead to updates. + * + * Any failure encountered terminates the synchronization process, leaving a message in the merovingian log. + * + * The replay progress can be inspected using the function wlcr.drift() and wlcr.synced(). + * The latter is true if all accessible log files have been processed. * - * All wlcr files should be stored on a shared file system for all replicas to access. - * The default is a subdirectory of the database and act as a secondary database rebuild log. - * The location can be overruled using a full path to a shared disk as GDKenvironemnt variable (wlcr_dir) + * The wlcr files purposely have a textual format derived from the MAL statements. + * It creates some overhead for copy into situations. * - * The wlcr files have a textual format derived from the MAL statements. - * This can be used to ease the implementation of the wlreplay + * The integrity of the wlcr directories is critical. For now we assume that all batches are available. + * We should detect that wlcr.master() is issued after updates have taken place on the snapshot TODO. * - * The logs may only be removed after a new snapshot has been taken or wlcr is disabled */ #include "monetdb_config.h" +#include <time.h> #include "mal_builder.h" #include "wlcr.h" static MT_Lock wlcr_lock MT_LOCK_INITIALIZER("wlcr_lock"); -int wlcr_duration = INT_MAX; // how long to capture default= 0 int wlcr_threshold = 0; // threshold (milliseconds) for keeping readonly queries -int wlcr_deltas = 1; // sent the delta values -int wlcr_all = 1; // also ship failed transaction -str wlcr_snapshot= "baseline"; // name assigned to the snapshot -int wlcr_unit = 0; // last job executed +str wlcr_snapshot= 0; // name assigned to the snapshot +int wlcr_batch = 0; // last job executed static char *wlcr_name[]= {"","query","update","catalog"}; static stream *wlcr_fd = 0; -static str wlcr_log = "/tmp/wlcr"; +static str wlcr_dir = 0; + +/* The database snapshots are binary copies of the dbfarm/database/bat + * New snapshots are created currently using the 'monetdb snapshot <db>' command + * or a SQL procedure. + * It requires a database halt. + * + * The wlcr logs are stored in the snapshot directory as a time-stamped list + */ +str +WLCRmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +{ + char path[PATHLENGTH]; + FILE *fd; + int i = 1; + (void) stk; + (void) pci; + + wlcr_dir = GDKgetenv("gdk_wlcrdir"); + if (i< pci->argc+1 && getArgType(mb, pci, i) == TYPE_str){ + wlcr_dir = *getArgReference_str(stk,pci,i); + wlcr_dir = GDKfilepath(0,wlcr_dir,"master",0); + i++; + } + // if the master director does not exit, create it + if ( wlcr_dir == NULL) + wlcr_dir = GDKfilepath(0,0,"master",0); + + if ( i < pci->argc+1 && getArgType(mb, pci, i) == TYPE_int) + wlcr_threshold = *getArgReference_int(stk,pci,i); + + snprintf(path, PATHLENGTH,"%s%cwlcr",wlcr_dir, DIR_SEP); + if( GDKcreatedir(path) == GDK_FAIL) + mnstr_printf(cntxt->fdout,"#Could not create %s\n",wlcr_dir); + mnstr_printf(cntxt->fdout,"#Snapshot directory '%s'\n", wlcr_dir); + + fd = fopen(path,"w"); + if ( fd == NULL) + return createException(MAL,"wlcr.master","Unable to initialize WLCR %s", path); + if( fscanf(fd,"%d %d", &wlcr_batch, &wlcr_threshold) != 3) + fprintf(fd,"0 %d\n", wlcr_threshold); + fclose(fd); + mnstr_printf(cntxt->fdout,"#master wlcr_batch %d\n",wlcr_batch, wlcr_threshold); + return MAL_SUCCEED; +} static InstrPtr WLCRaddtime(Client cntxt, InstrPtr pci, InstrPtr p) { - char *tbuf; - char ctm[26]; + char tbuf[26]; time_t clk = pci->clock.tv_sec; + struct tm ctm; -#ifdef HAVE_CTIME_R3 - tbuf = ctime_r(&clk, ctm, sizeof(ctm)); -#else -#ifdef HAVE_CTIME_R - tbuf = ctime_r(&clk, ctm); -#else - tbuf = ctime(&clk); -#endif -#endif - tbuf[19]=0; + ctm = *localtime(&clk); + strftime(tbuf, 26, "%Y-%m-%dT%H:%M:%S",&ctm); return pushStr(cntxt->wlcr, p, tbuf); } @@ -77,36 +144,13 @@ WLCRaddtime(Client cntxt, InstrPtr pci, if( cntxt->wlcr->stop == 0){\ p = newStmt(cntxt->wlcr,"wlreplay","job");\ p = pushStr(cntxt->wlcr,p, cntxt->username);\ - p = pushStr(cntxt->wlcr,p, wlcr_snapshot);\ - p = pushInt(cntxt->wlcr,p, wlcr_unit);\ + p = pushStr(cntxt->wlcr,p, wlcr_snapshot? wlcr_snapshot:"dummy");\ + p = pushInt(cntxt->wlcr,p, wlcr_batch);\ p = WLCRaddtime(cntxt,pci, p); \ p->ticks = GDKms();\ } } str -WLCRproperties (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) -{ - int i; - - (void) cntxt; - (void) mb; - - i = *getArgReference_int(stk,pci,1); - if ( i < 0) - throw(MAL,"wlcr.properties","Duration must be a possitive number"); - wlcr_duration = i; - - i = *getArgReference_int(stk,pci,2); - if ( i < 0) - throw(MAL,"wlcr.properties","Duration must be a possitive number"); - wlcr_threshold = i; - - wlcr_deltas = *getArgReference_int(stk,pci,3) != 0; - wlcr_all = *getArgReference_int(stk,pci,4) != 0; - return MAL_SUCCEED; -} - -str WLCRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str snapshot = *getArgReference_str(stk,pci,1); @@ -117,7 +161,7 @@ WLCRjob(Client cntxt, MalBlkPtr mb, MalS if ( strcmp(snapshot, wlcr_snapshot)) throw(MAL,"wlcr.job","Incompatible snapshot identifier"); - if ( tid < wlcr_unit) + if ( tid < wlcr_batch) throw(MAL,"wlcr.job","Work unit identifier is before last one executed"); return MAL_SUCCEED; } @@ -366,36 +410,51 @@ WLCRclear_table(Client cntxt, MalBlkPtr return MAL_SUCCEED; } +// creation of file and updating the version file should be atomic TODO!!! static str -WLCRnewlogger(Client cntxt) +WLCRloggerfile(Client cntxt) { + char path[PATHLENGTH]; + FILE *fd; + (void) cntxt; - // find next available file - return GDKstrdup(wlcr_log); + snprintf(path,PATHLENGTH,"%s%cwlcr",wlcr_dir, DIR_SEP); + mnstr_printf(cntxt->fdout,"#WLCRloggerfile %s\n",wlcr_dir); + fd = fopen(path,"w"); + if( fd == NULL) + return "unknown"; + fprintf(fd,"%d %d\n", wlcr_batch, wlcr_threshold); + fclose(fd); + snprintf(path,PATHLENGTH,"%s%cwlcr_%06d",wlcr_dir,DIR_SEP,wlcr_batch); + wlcr_batch++; + return GDKstrdup(path); } static str WLCRwrite(Client cntxt) { str fname; // save the wlcr record on a file and ship it to registered slaves - if ( wlcr_fd == NULL){ - fname= WLCRnewlogger(cntxt); - wlcr_fd = open_wastream(fname); + if( wlcr_dir ){ + if ( wlcr_fd == NULL){ + fname= WLCRloggerfile(cntxt); + mnstr_printf(cntxt->fdout,"#WLCRloggerfile batch %s\n",fname); + wlcr_fd = open_wastream(fname); + } + // Limit the size of the log files + + if ( wlcr_fd == NULL) + throw(MAL,"wlcr.write","WLCR log file not accessible"); + + if(cntxt->wlcr->stop == 0) + return MAL_SUCCEED; + + newStmt(cntxt->wlcr,"wlreplay","fin"); + MT_lock_set(&wlcr_lock); + printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG ); + (void) mnstr_flush(wlcr_fd); + wlcr_batch++; + MT_lock_unset(&wlcr_lock); } - // Limit the size of the log files - - if ( wlcr_fd == NULL) - throw(MAL,"wlcr.write","WLCR log file not accessible"); - - if(cntxt->wlcr->stop == 0) - return MAL_SUCCEED; - - newStmt(cntxt->wlcr,"wlreplay","fin"); - MT_lock_set(&wlcr_lock); - printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG ); - (void) mnstr_flush(wlcr_fd); - wlcr_unit++; - MT_lock_unset(&wlcr_lock); #ifdef _DEBUG_WLCR_ printFunction(cntxt->fdout, cntxt->wlcr, 0, LIST_MAL_ALL ); @@ -437,7 +496,7 @@ WLCRrollback(Client cntxt) InstrPtr p; if( cntxt->wlcr){ - if (wlcr_all && cntxt->wlcr->stop){ + if (cntxt->wlcr->stop){ p= getInstrPtr(cntxt->wlcr,0); p = pushStr(cntxt->wlcr,p,"rollback"); p = pushStr(cntxt->wlcr, p, wlcr_name[cntxt->wlcr_kind]); diff --git a/monetdb5/modules/mal/wlcr.h b/monetdb5/modules/mal/wlcr.h --- a/monetdb5/modules/mal/wlcr.h +++ b/monetdb5/modules/mal/wlcr.h @@ -21,14 +21,11 @@ #define WLCR_UPDATE 2 #define WLCR_CATALOG 3 -mal_export int wlcr_duration; // how long to capture default= 0 _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list