Changeset: 3bf5307afae1 for MonetDB
Added Files:
Modified Files:
Branch: wlcr
Log Message:

WorkLoad Capture and Replay
First batch of extensions. See wlcr.c for more info

diffs (truncated from 1215 to 300 lines):

diff --git a/monetdb5/mal/mal_builder.c b/monetdb5/mal/mal_builder.c
--- a/monetdb5/mal/mal_builder.c
+++ b/monetdb5/mal/mal_builder.c
@@ -454,11 +454,16 @@ pushStr(MalBlkPtr mb, InstrPtr q, const 
        if (q == NULL)
                return NULL;
        cst.vtype= TYPE_str;
-       if ((cst.val.sval= GDKstrdup(Val)) == NULL) {
-               freeInstruction(q);
-               return NULL;
+       if( Val == 0){
+               cst.val.sval = 0;
+               cst.len= 0;
+       } else{
+               if ( Val != NULL && (cst.val.sval= GDKstrdup(Val)) == NULL) {
+                       freeInstruction(q);
+                       return NULL;
+               }
+               cst.len= (int) strlen(cst.val.sval);
-       cst.len= (int) strlen(cst.val.sval);
        _t = defConstant(mb,TYPE_str,&cst);
        return pushArgument(mb, q, _t);
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -251,6 +251,8 @@ MCinitClientRecord(Client c, oid user, b
        /* create a recycler cache */
        c->exception_buf_initialized = 0;
        c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
+       c->wlcr_kind = 0;
+       c->wlcr = NULL;
 #ifndef HAVE_EMBEDDED /* no authentication in embedded mode */
                str msg = AUTHgetUsername(&c->username, c);
@@ -403,6 +405,10 @@ freeClient(Client c)
                c->error_row = c->error_fld = c->error_msg = c->error_input = 
+               if( c->wlcr)
+                       freeMalBlk(c->wlcr);
+               c->wlcr_kind = 0;
+               c->wlcr = NULL;
        if (t)
                THRdel(t);  /* you may perform suicide */
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -171,6 +171,11 @@ typedef struct CLIENT {
        bit             active;         /* processing a query or not */
        Workset inprogress[THREADS];
+       /*
+        * The workload for replication/replay is saved initially as a MAL 
+        */
+       int wlcr_kind;  
+       MalBlkPtr wlcr;
         *      Errors during copy into are collected in a user specific column 
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -1434,6 +1434,7 @@ void garbageCollector(Client cntxt, MalB
                printStack(cntxt->fdout, mb, stk, 0);
+       assert(mb->vtop < mb->vsize);
        (void) flag;
        for (k = 0; k < mb->vtop; k++) {
        //      if (isVarCleanup(mb, k) ){
diff --git a/monetdb5/modules/mal/ b/monetdb5/modules/mal/
--- a/monetdb5/modules/mal/
+++ b/monetdb5/modules/mal/
@@ -36,6 +36,7 @@ lib_mal = {
                mkey.c mkey.h \
                manifold.c manifold.h \
                oltp.c oltp.h \
+               wlcr.c wlcr.h \
                pcre.c \
                profiler.c profiler.h \
                querylog.c querylog.h \
@@ -59,7 +60,7 @@ headers_mal = {
                inspect.mal manual.mal mal_io.mal mkey.mal manifold.mal \
                iterator.mal clients.mal \
                factories.mal groupby.mal mdb.mal pcre.mal mat.mal \
-               transaction.mal oltp.mal \
+               transaction.mal oltp.mal wlcr.mal \
                mal_mapi.mal sabaoth.mal remote.mal  \
                txtsim.mal \
                tokenizer.mal sample.mal json_util.mal \
diff --git a/monetdb5/modules/mal/mal_init.mal 
--- a/monetdb5/modules/mal/mal_init.mal
+++ b/monetdb5/modules/mal/mal_init.mal
@@ -94,6 +94,7 @@ include srvpool;
 include mal_mapi;
 include oltp;
+include wlcr;
 # Any extensions (MAL scripts) that should be automatically loaded upon
 # startup can be placed in the autoload directory.  One typically finds
diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/mal/wlcr.c
@@ -0,0 +1,461 @@
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0.  If a copy of the MPL was not distributed with this
+ * file, You can obtain one at
+ *
+ * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
+ */
+ * (c) Martin Kersten
+ * This module collects the workload-capture-replay statements during 
transaction execution. 
+ *
+ * Each wlcr log file contains a serial log for a transaction batch. 
+ * Each job is identified by the original owner of the query, the snapshot 
identity (name+nr) against
+ * which it was ran, an indication of the kind of transaction and 
commit/rollback, its runtime (in ms) and starting time.
+ *
+ * Replaying the wlcr against another server based on the same snapshot should 
produce a perfect copy.
+ * Each job should be executed using the credentials of the user issuing the 
+ * Any failuer encountered terminates the replication process.
+ * 
+ * All wlcr files should be stored on a shared file system for all replicas to 
+ * The default is a subdirectory of the database and act as a secondary 
database rebuild log.
+ * The location can be overruled using a full path to a shared disk as 
GDKenvironemnt variable (wlcr_dir)
+ *
+ * The wlcr files have a textual format derived from the MAL statements.
+ * This can be used to ease the implementation of the wlreplay
+ *
+ * The logs may only be removed after a new snapshot has been taken or wlcr is 
+ */
+#include "monetdb_config.h"
+#include "mal_builder.h"
+#include "wlcr.h"
+static MT_Lock     wlcr_lock MT_LOCK_INITIALIZER("wlcr_lock");
+int wlcr_duration = INT_MAX; // how long to capture default= 0
+int wlcr_threshold = 0; // threshold (milliseconds) for keeping readonly 
+int wlcr_deltas = 1;  // sent the delta values
+int wlcr_all = 1;      // also ship failed transaction
+str wlcr_snapshot= "baseline"; // name assigned to the snapshot
+int wlcr_unit = 0;     // last job executed
+static char *wlcr_name[]= {"","query","update","catalog"};
+static stream *wlcr_fd = 0;
+static str wlcr_log = "/tmp/wlcr";
+static InstrPtr
+WLCRaddtime(Client cntxt, InstrPtr pci, InstrPtr p)
+       char *tbuf;
+    char ctm[26];
+    time_t clk = pci->clock.tv_sec;
+#ifdef HAVE_CTIME_R3
+    tbuf = ctime_r(&clk, ctm, sizeof(ctm));
+#ifdef HAVE_CTIME_R
+    tbuf = ctime_r(&clk, ctm);
+    tbuf = ctime(&clk);
+    tbuf[19]=0;
+       return pushStr(cntxt->wlcr, p, tbuf);
+#define WLCR_start()\
+{ Symbol s; \
+       if( cntxt->wlcr == NULL){\
+               s = newSymbol("wlrc", FUNCTIONsymbol);\
+               cntxt->wlcr_kind = WLCR_QUERY;\
+               cntxt->wlcr = s->def;\
+               s->def = NULL;\
+       } \
+       if( cntxt->wlcr->stop == 0){\
+               p = newStmt(cntxt->wlcr,"wlreplay","job");\
+               p = pushStr(cntxt->wlcr,p, cntxt->username);\
+               p = pushStr(cntxt->wlcr,p, wlcr_snapshot);\
+               p = pushInt(cntxt->wlcr,p, wlcr_unit);\
+               p = WLCRaddtime(cntxt,pci, p); \
+               p->ticks = GDKms();\
+}      }
+WLCRproperties (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+       int i;
+       (void) cntxt;
+       (void) mb;
+       i = *getArgReference_int(stk,pci,1);
+       if ( i < 0)
+               throw(MAL,"","Duration must be a possitive 
+       wlcr_duration = i;
+       i = *getArgReference_int(stk,pci,2);
+       if ( i < 0)
+               throw(MAL,"","Duration must be a possitive 
+       wlcr_threshold = i;
+       wlcr_deltas = *getArgReference_int(stk,pci,3) != 0;
+       wlcr_all = *getArgReference_int(stk,pci,4) != 0;
+       return MAL_SUCCEED;
+WLCRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+       str snapshot = *getArgReference_str(stk,pci,1);
+       int tid = *getArgReference_int(stk,pci,2);
+       (void) cntxt;
+       (void) mb;
+       if ( strcmp(snapshot, wlcr_snapshot))
+               throw(MAL,"wlcr.job","Incompatible snapshot identifier");
+       if ( tid < wlcr_unit)
+               throw(MAL,"wlcr.job","Work unit identifier is before last one 
+       return MAL_SUCCEED;
+WLCRfin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+       (void) cntxt;
+       (void) mb;
+       (void) stk;
+       (void) pci;
+       return MAL_SUCCEED;
+WLCRquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{      InstrPtr p;
+       (void) stk;
+       if ( strcmp("-- no query",getVarConstant(mb, getArg(pci,1)).val.sval) 
== 0)
+               return MAL_SUCCEED;
+       WLCR_start();
+       p = newStmt(cntxt->wlcr, "wlreplay","query");
+       p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,1)).val.sval);
+       p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,2)).val.sval);
+       p->ticks = GDKms();
+       return MAL_SUCCEED;
+WLCRgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{      InstrPtr p;
+       int i, tpe, varid;
+       (void) stk;
+       WLCR_start();
+       p = newStmt(cntxt->wlcr, "wlreplay",getFunctionId(pci));
+       for( i = pci->retc; i< pci->argc; i++){
+               tpe =getArgType(mb, pci, i);
+               switch(tpe){
+               case TYPE_str:
+                       p = pushStr(cntxt->wlcr, p, getVarConstant(mb, 
getArg(pci, i)).val.sval);
+                       break;
+               default:
+                       varid = defConstant(cntxt->wlcr, tpe, 
getArgReference(stk, pci, i));
+                       p = pushArgument(cntxt->wlcr, p, varid);
+               }
+       }
+       p->ticks = GDKms();
+       cntxt->wlcr_kind = WLCR_CATALOG;
+       return MAL_SUCCEED;
+#define bulk(TPE1, TPE2)\
+{      TPE1 *p = (TPE1 *) Tloc(b,0);\
+       TPE1 *q = (TPE1 *) Tloc(b, BUNlast(b));\
+       int k=0; \
+       for( ; p < q; p++, k++){\
+               if( k % 32 == 31){\
+                       pci = newStmt(cntxt->wlcr, 
+                       pci = pushStr(cntxt->wlcr, pci, sch);\
+                       pci = pushStr(cntxt->wlcr, pci, tbl);\
+                       pci = pushStr(cntxt->wlcr, pci, col);\
+                       pci->ticks = GDKms();\
+               }\
+               pci = push##TPE2(cntxt->wlcr, pci ,*p);\
+} }
+static void
+WLCRdatashipping(Client cntxt, MalBlkPtr mb, InstrPtr pci, int bid)
+{      BAT *b;
checkin-list mailing list

Reply via email to