Changeset: f097aee982a1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f097aee982a1
Added Files:
        sql/backends/monet5/sql_wlcr.c
        sql/backends/monet5/sql_wlcr.h
        sql/backends/monet5/sql_wlcr.mal
        sql/scripts/60_wlcr.sql
Modified Files:
        monetdb5/modules/mal/wlcr.c
        monetdb5/modules/mal/wlcr.h
        monetdb5/modules/mal/wlcr.mal
        sql/backends/monet5/40_sql.mal
        sql/backends/monet5/Makefile.ag
        sql/scripts/Makefile.ag
Branch: wlcr
Log Message:

Basic handling of WLCR logs writes


diffs (truncated from 550 to 300 lines):

diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
--- a/monetdb5/modules/mal/wlcr.c
+++ b/monetdb5/modules/mal/wlcr.c
@@ -7,62 +7,129 @@
  */
 
 /*
- * (c) Martin Kersten
+ * (c) 2017 Martin Kersten
  * This module collects the workload-capture-replay statements during 
transaction execution. 
+ * It is used primarilly for replication management and workload replay
+ *
+ * The goal is to maintain a replica of a master database. All data of the 
master
+ * is basically only available for read only access. Accidental corruption of 
this
+ * data is avoided by setting ownership and access properties at the SQL level 
in the replica.
+ *
+ *
+ * IMPLEMENTATION
+ *
+ * The replica directory should be on a shared (global) file system.
+ * As default we use dbfarm/master.
+ *
+ * The binary dump for the database snapshot should be stored there in  
master/bat.
+ * The associated log files are stored as master/wlcr<number>.
+ * Creation and restore of a snapshot should be a  monetdb option. TODO
+ *
+ * Replication management start when you run the command 
+ * CALL wlcr.master("(full)path to snapshot dir")
+ * It can also be passed as a command line parameter 
+ * --set wlcr_dir="(full)path to snapshot dir"
  *
  * Each wlcr log file contains a serial log for a transaction batch. 
- * Each job is identified by the original owner of the query, the snapshot 
identity (name+nr) against
- * which it was ran, an indication of the kind of transaction and 
commit/rollback, its runtime (in ms) and starting time.
+ * Each job is identified by the owner of the query, the snapshot tag,
+ * commit/rollback, its starting time and runtime (in ms).
  *
- * Replaying the wlcr against another server based on the same snapshot should 
produce a perfect copy.
- * Each job should be executed using the credentials of the user issuing the 
transaction.
- * Any failuer encountered terminates the replication process.
+ * Logging of queries can be further limited to those that satisfy a threshold.
+ * CALL wlcr.master("(full)path to snapshot dir", threshold)
+ * The threshold is given in milliseconds. A negative threshold leads to 
ignoring all queries.
+ *
+ * A replica server should issue the matching call
+ * CALL wlcr.synchronize("(full)path to snapshot dir")
+ *
+ * During synchronization only updates are executed for the user responsible 
for the call. 
+ * Queries are simply ignored unless needed as replacement for update actions.
+ *
+ * The alternative is to replay the log
+ * CALL wlcr.replay("(full)path to snapshot dir")
+ * In this mode all queries are executed under the credentials of the query 
owner, including those that lead to updates.
+ *
+ * Any failure encountered terminates the synchronization process, leaving a 
message in the merovingian log.
+ *
+ * The replay progress can be inspected using the function wlcr.drift() and 
wlcr.synced().
+ * The latter is true if all accessible log files have been processed.
  * 
- * All wlcr files should be stored on a shared file system for all replicas to 
access.
- * The default is a subdirectory of the database and act as a secondary 
database rebuild log.
- * The location can be overruled using a full path to a shared disk as 
GDKenvironemnt variable (wlcr_dir)
+ * The wlcr files purposely have a textual format derived from the MAL 
statements.
+ * It creates some overhead for copy into situations.
  *
- * The wlcr files have a textual format derived from the MAL statements.
- * This can be used to ease the implementation of the wlreplay
+ * The integrity of the wlcr directories is critical. For now we assume that 
all batches are available. 
+ * We should detect that wlcr.master() is issued after updates have taken 
place on the snapshot TODO.
  *
- * The logs may only be removed after a new snapshot has been taken or wlcr is 
disabled
  */
 #include "monetdb_config.h"
+#include <time.h>
 #include "mal_builder.h"
 #include "wlcr.h"
 
 static MT_Lock     wlcr_lock MT_LOCK_INITIALIZER("wlcr_lock");
 
 
-int wlcr_duration = INT_MAX; // how long to capture default= 0
 int wlcr_threshold = 0; // threshold (milliseconds) for keeping readonly 
queries
-int wlcr_deltas = 1;  // sent the delta values
-int wlcr_all = 1;      // also ship failed transaction
-str wlcr_snapshot= "baseline"; // name assigned to the snapshot
-int wlcr_unit = 0;     // last job executed
+str wlcr_snapshot= 0;  // name assigned to the snapshot
+int wlcr_batch = 0;    // last job executed
 
 static char *wlcr_name[]= {"","query","update","catalog"};
 
 static stream *wlcr_fd = 0;
-static str wlcr_log = "/tmp/wlcr";
+static str wlcr_dir = 0;
+
+/* The database snapshots are binary copies of the dbfarm/database/bat
+ * New snapshots are created currently using the 'monetdb snapshot <db>' 
command
+ * or a SQL procedure.
+ * It requires a database halt.
+ *
+ * The wlcr logs are stored in the snapshot directory as a time-stamped list
+ */
+str 
+WLCRmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       char path[PATHLENGTH];
+       FILE *fd;
+       int i = 1;
+       (void) stk;
+       (void) pci;
+
+       wlcr_dir = GDKgetenv("gdk_wlcrdir");
+       if (i< pci->argc+1 && getArgType(mb, pci, i) == TYPE_str){
+               wlcr_dir =  *getArgReference_str(stk,pci,i);
+               wlcr_dir = GDKfilepath(0,wlcr_dir,"master",0);
+               i++;
+       }
+       // if the master director does not exit, create it
+       if ( wlcr_dir == NULL)
+               wlcr_dir = GDKfilepath(0,0,"master",0);
+
+       if ( i < pci->argc+1 && getArgType(mb, pci, i) == TYPE_int)
+               wlcr_threshold = *getArgReference_int(stk,pci,i);
+
+       snprintf(path, PATHLENGTH,"%s%cwlcr",wlcr_dir, DIR_SEP);
+       if( GDKcreatedir(path) == GDK_FAIL)
+               mnstr_printf(cntxt->fdout,"#Could not create %s\n",wlcr_dir);
+       mnstr_printf(cntxt->fdout,"#Snapshot directory '%s'\n", wlcr_dir);
+
+       fd = fopen(path,"w");
+       if ( fd == NULL)
+               return createException(MAL,"wlcr.master","Unable to initialize 
WLCR %s", path);
+       if( fscanf(fd,"%d %d", &wlcr_batch, &wlcr_threshold) != 3)
+               fprintf(fd,"0 %d\n", wlcr_threshold);
+       fclose(fd);
+       mnstr_printf(cntxt->fdout,"#master wlcr_batch %d\n",wlcr_batch, 
wlcr_threshold);
+       return MAL_SUCCEED;
+}
 
 static InstrPtr
 WLCRaddtime(Client cntxt, InstrPtr pci, InstrPtr p)
 {
-       char *tbuf;
-    char ctm[26];
+       char tbuf[26];
     time_t clk = pci->clock.tv_sec;
+       struct tm ctm;
 
-#ifdef HAVE_CTIME_R3
-    tbuf = ctime_r(&clk, ctm, sizeof(ctm));
-#else
-#ifdef HAVE_CTIME_R
-    tbuf = ctime_r(&clk, ctm);
-#else
-    tbuf = ctime(&clk);
-#endif
-#endif
-    tbuf[19]=0;
+       ctm = *localtime(&clk);
+       strftime(tbuf, 26, "%Y-%m-%dT%H:%M:%S",&ctm);
        return pushStr(cntxt->wlcr, p, tbuf);
 }
 
@@ -77,36 +144,13 @@ WLCRaddtime(Client cntxt, InstrPtr pci, 
        if( cntxt->wlcr->stop == 0){\
                p = newStmt(cntxt->wlcr,"wlreplay","job");\
                p = pushStr(cntxt->wlcr,p, cntxt->username);\
-               p = pushStr(cntxt->wlcr,p, wlcr_snapshot);\
-               p = pushInt(cntxt->wlcr,p, wlcr_unit);\
+               p = pushStr(cntxt->wlcr,p, wlcr_snapshot? 
wlcr_snapshot:"dummy");\
+               p = pushInt(cntxt->wlcr,p, wlcr_batch);\
                p = WLCRaddtime(cntxt,pci, p); \
                p->ticks = GDKms();\
 }      }
 
 str
-WLCRproperties (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
-       int i;
-
-       (void) cntxt;
-       (void) mb;
-
-       i = *getArgReference_int(stk,pci,1);
-       if ( i < 0)
-               throw(MAL,"wlcr.properties","Duration must be a possitive 
number");
-       wlcr_duration = i;
-
-       i = *getArgReference_int(stk,pci,2);
-       if ( i < 0)
-               throw(MAL,"wlcr.properties","Duration must be a possitive 
number");
-       wlcr_threshold = i;
-
-       wlcr_deltas = *getArgReference_int(stk,pci,3) != 0;
-       wlcr_all = *getArgReference_int(stk,pci,4) != 0;
-       return MAL_SUCCEED;
-}
-
-str
 WLCRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        str snapshot = *getArgReference_str(stk,pci,1);
@@ -117,7 +161,7 @@ WLCRjob(Client cntxt, MalBlkPtr mb, MalS
 
        if ( strcmp(snapshot, wlcr_snapshot))
                throw(MAL,"wlcr.job","Incompatible snapshot identifier");
-       if ( tid < wlcr_unit)
+       if ( tid < wlcr_batch)
                throw(MAL,"wlcr.job","Work unit identifier is before last one 
executed");
        return MAL_SUCCEED;
 }
@@ -366,36 +410,51 @@ WLCRclear_table(Client cntxt, MalBlkPtr 
        return MAL_SUCCEED;
 }
 
+// creation of file and updating the version file should be atomic TODO!!!
 static str
-WLCRnewlogger(Client cntxt)
+WLCRloggerfile(Client cntxt)
 {
+       char path[PATHLENGTH];
+       FILE *fd;
+
        (void) cntxt;
-       // find next available file
-       return GDKstrdup(wlcr_log);
+       snprintf(path,PATHLENGTH,"%s%cwlcr",wlcr_dir, DIR_SEP);
+       mnstr_printf(cntxt->fdout,"#WLCRloggerfile %s\n",wlcr_dir);
+       fd = fopen(path,"w");
+       if( fd == NULL)
+               return "unknown";
+       fprintf(fd,"%d %d\n", wlcr_batch, wlcr_threshold);
+       fclose(fd);
+       snprintf(path,PATHLENGTH,"%s%cwlcr_%06d",wlcr_dir,DIR_SEP,wlcr_batch);
+       wlcr_batch++;
+       return GDKstrdup(path);
 }
 
 static str
 WLCRwrite(Client cntxt)
 {      str fname;
        // save the wlcr record on a file and ship it to registered slaves
-       if ( wlcr_fd == NULL){
-               fname= WLCRnewlogger(cntxt);
-               wlcr_fd = open_wastream(fname);
+       if( wlcr_dir ){
+               if ( wlcr_fd == NULL){
+                       fname= WLCRloggerfile(cntxt);
+                       mnstr_printf(cntxt->fdout,"#WLCRloggerfile batch 
%s\n",fname);
+                       wlcr_fd = open_wastream(fname);
+               }
+               // Limit the size of the log files
+               
+               if ( wlcr_fd == NULL)
+                       throw(MAL,"wlcr.write","WLCR log file not accessible");
+
+               if(cntxt->wlcr->stop == 0)
+                       return MAL_SUCCEED;
+
+               newStmt(cntxt->wlcr,"wlreplay","fin");
+               MT_lock_set(&wlcr_lock);
+               printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG );
+               (void) mnstr_flush(wlcr_fd);
+               wlcr_batch++;
+               MT_lock_unset(&wlcr_lock);
        }
-       // Limit the size of the log files
-       
-       if ( wlcr_fd == NULL)
-               throw(MAL,"wlcr.write","WLCR log file not accessible");
-
-       if(cntxt->wlcr->stop == 0)
-               return MAL_SUCCEED;
-
-       newStmt(cntxt->wlcr,"wlreplay","fin");
-       MT_lock_set(&wlcr_lock);
-       printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG );
-       (void) mnstr_flush(wlcr_fd);
-       wlcr_unit++;
-       MT_lock_unset(&wlcr_lock);
 
 #ifdef _DEBUG_WLCR_
        printFunction(cntxt->fdout, cntxt->wlcr, 0, LIST_MAL_ALL );
@@ -437,7 +496,7 @@ WLCRrollback(Client cntxt)
        InstrPtr p;
        
        if( cntxt->wlcr){
-               if (wlcr_all && cntxt->wlcr->stop){
+               if (cntxt->wlcr->stop){
                        p= getInstrPtr(cntxt->wlcr,0);
                        p = pushStr(cntxt->wlcr,p,"rollback");
                        p = pushStr(cntxt->wlcr, p, 
wlcr_name[cntxt->wlcr_kind]);
diff --git a/monetdb5/modules/mal/wlcr.h b/monetdb5/modules/mal/wlcr.h
--- a/monetdb5/modules/mal/wlcr.h
+++ b/monetdb5/modules/mal/wlcr.h
@@ -21,14 +21,11 @@
 #define WLCR_UPDATE    2
 #define WLCR_CATALOG   3
 
-mal_export int wlcr_duration; // how long to capture default= 0
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to