Changeset: 3bf5307afae1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3bf5307afae1
Added Files:
        monetdb5/modules/mal/wlcr.c
        monetdb5/modules/mal/wlcr.h
        monetdb5/modules/mal/wlcr.mal
        monetdb5/optimizer/opt_wlcr.c
        monetdb5/optimizer/opt_wlcr.h
Modified Files:
        monetdb5/mal/mal_builder.c
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_client.h
        monetdb5/mal/mal_interpreter.c
        monetdb5/modules/mal/Makefile.ag
        monetdb5/modules/mal/mal_init.mal
        monetdb5/optimizer/Makefile.ag
        monetdb5/optimizer/opt_pipes.c
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
        monetdb5/optimizer/opt_support.c
        monetdb5/optimizer/opt_wrapper.c
        monetdb5/optimizer/optimizer.mal
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_scenario.c
        sql/backends/monet5/sql_transaction.c
Branch: wlcr
Log Message:

WorkLoad Capture and Replay
First batch of extensions. See wlcr.c for more info


diffs (truncated from 1215 to 300 lines):

diff --git a/monetdb5/mal/mal_builder.c b/monetdb5/mal/mal_builder.c
--- a/monetdb5/mal/mal_builder.c
+++ b/monetdb5/mal/mal_builder.c
@@ -454,11 +454,16 @@ pushStr(MalBlkPtr mb, InstrPtr q, const 
        if (q == NULL)
                return NULL;
        cst.vtype= TYPE_str;
-       if ((cst.val.sval= GDKstrdup(Val)) == NULL) {
-               freeInstruction(q);
-               return NULL;
+       if( Val == 0){
+               cst.val.sval = 0;
+               cst.len= 0;
+       } else{
+               if ( Val != NULL && (cst.val.sval= GDKstrdup(Val)) == NULL) {
+                       freeInstruction(q);
+                       return NULL;
+               }
+               cst.len= (int) strlen(cst.val.sval);
        }
-       cst.len= (int) strlen(cst.val.sval);
        _t = defConstant(mb,TYPE_str,&cst);
        return pushArgument(mb, q, _t);
 }
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -251,6 +251,8 @@ MCinitClientRecord(Client c, oid user, b
        /* create a recycler cache */
        c->exception_buf_initialized = 0;
        c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
+       c->wlcr_kind = 0;
+       c->wlcr = NULL;
 #ifndef HAVE_EMBEDDED /* no authentication in embedded mode */
        {
                str msg = AUTHgetUsername(&c->username, c);
@@ -403,6 +405,10 @@ freeClient(Client c)
                BBPdecref(c->error_msg->batCacheid,TRUE);
                BBPdecref(c->error_input->batCacheid,TRUE);
                c->error_row = c->error_fld = c->error_msg = c->error_input = 
NULL;
+               if( c->wlcr)
+                       freeMalBlk(c->wlcr);
+               c->wlcr_kind = 0;
+               c->wlcr = NULL;
        }
        if (t)
                THRdel(t);  /* you may perform suicide */
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -171,6 +171,11 @@ typedef struct CLIENT {
         */
        bit             active;         /* processing a query or not */
        Workset inprogress[THREADS];
+       /*
+        * The workload for replication/replay is saved initially as a MAL 
block.
+        */
+       int wlcr_kind;  
+       MalBlkPtr wlcr;
        /*      
         *      Errors during copy into are collected in a user specific column 
set
         */
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -1434,6 +1434,7 @@ void garbageCollector(Client cntxt, MalB
                printStack(cntxt->fdout, mb, stk, 0);
        }
 #endif
+       assert(mb->vtop < mb->vsize);
        (void) flag;
        for (k = 0; k < mb->vtop; k++) {
        //      if (isVarCleanup(mb, k) ){
diff --git a/monetdb5/modules/mal/Makefile.ag b/monetdb5/modules/mal/Makefile.ag
--- a/monetdb5/modules/mal/Makefile.ag
+++ b/monetdb5/modules/mal/Makefile.ag
@@ -36,6 +36,7 @@ lib_mal = {
                mkey.c mkey.h \
                manifold.c manifold.h \
                oltp.c oltp.h \
+               wlcr.c wlcr.h \
                pcre.c \
                profiler.c profiler.h \
                querylog.c querylog.h \
@@ -59,7 +60,7 @@ headers_mal = {
                inspect.mal manual.mal mal_io.mal mkey.mal manifold.mal \
                iterator.mal clients.mal \
                factories.mal groupby.mal mdb.mal pcre.mal mat.mal \
-               transaction.mal oltp.mal \
+               transaction.mal oltp.mal wlcr.mal \
                mal_mapi.mal sabaoth.mal remote.mal  \
                txtsim.mal \
                tokenizer.mal sample.mal json_util.mal \
diff --git a/monetdb5/modules/mal/mal_init.mal 
b/monetdb5/modules/mal/mal_init.mal
--- a/monetdb5/modules/mal/mal_init.mal
+++ b/monetdb5/modules/mal/mal_init.mal
@@ -94,6 +94,7 @@ include srvpool;
 
 include mal_mapi;
 include oltp;
+include wlcr;
 
 # Any extensions (MAL scripts) that should be automatically loaded upon
 # startup can be placed in the autoload directory.  One typically finds
diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/mal/wlcr.c
@@ -0,0 +1,461 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0.  If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
+ */
+
+/*
+ * (c) Martin Kersten
+ * This module collects the workload-capture-replay statements during 
transaction execution. 
+ *
+ * Each wlcr log file contains a serial log for a transaction batch. 
+ * Each job is identified by the original owner of the query, the snapshot 
identity (name+nr) against
+ * which it was ran, an indication of the kind of transaction and 
commit/rollback, its runtime (in ms) and starting time.
+ *
+ * Replaying the wlcr against another server based on the same snapshot should 
produce a perfect copy.
+ * Each job should be executed using the credentials of the user issuing the 
transaction.
+ * Any failuer encountered terminates the replication process.
+ * 
+ * All wlcr files should be stored on a shared file system for all replicas to 
access.
+ * The default is a subdirectory of the database and act as a secondary 
database rebuild log.
+ * The location can be overruled using a full path to a shared disk as 
GDKenvironemnt variable (wlcr_dir)
+ *
+ * The wlcr files have a textual format derived from the MAL statements.
+ * This can be used to ease the implementation of the wlreplay
+ *
+ * The logs may only be removed after a new snapshot has been taken or wlcr is 
disabled
+ */
+#include "monetdb_config.h"
+#include "mal_builder.h"
+#include "wlcr.h"
+
+static MT_Lock     wlcr_lock MT_LOCK_INITIALIZER("wlcr_lock");
+
+
+int wlcr_duration = INT_MAX; // how long to capture default= 0
+int wlcr_threshold = 0; // threshold (milliseconds) for keeping readonly 
queries
+int wlcr_deltas = 1;  // sent the delta values
+int wlcr_all = 1;      // also ship failed transaction
+str wlcr_snapshot= "baseline"; // name assigned to the snapshot
+int wlcr_unit = 0;     // last job executed
+
+static char *wlcr_name[]= {"","query","update","catalog"};
+
+static stream *wlcr_fd = 0;
+static str wlcr_log = "/tmp/wlcr";
+
+static InstrPtr
+WLCRaddtime(Client cntxt, InstrPtr pci, InstrPtr p)
+{
+       char *tbuf;
+    char ctm[26];
+    time_t clk = pci->clock.tv_sec;
+
+#ifdef HAVE_CTIME_R3
+    tbuf = ctime_r(&clk, ctm, sizeof(ctm));
+#else
+#ifdef HAVE_CTIME_R
+    tbuf = ctime_r(&clk, ctm);
+#else
+    tbuf = ctime(&clk);
+#endif
+#endif
+    tbuf[19]=0;
+       return pushStr(cntxt->wlcr, p, tbuf);
+}
+
+#define WLCR_start()\
+{ Symbol s; \
+       if( cntxt->wlcr == NULL){\
+               s = newSymbol("wlrc", FUNCTIONsymbol);\
+               cntxt->wlcr_kind = WLCR_QUERY;\
+               cntxt->wlcr = s->def;\
+               s->def = NULL;\
+       } \
+       if( cntxt->wlcr->stop == 0){\
+               p = newStmt(cntxt->wlcr,"wlreplay","job");\
+               p = pushStr(cntxt->wlcr,p, cntxt->username);\
+               p = pushStr(cntxt->wlcr,p, wlcr_snapshot);\
+               p = pushInt(cntxt->wlcr,p, wlcr_unit);\
+               p = WLCRaddtime(cntxt,pci, p); \
+               p->ticks = GDKms();\
+}      }
+
+str
+WLCRproperties (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       int i;
+
+       (void) cntxt;
+       (void) mb;
+
+       i = *getArgReference_int(stk,pci,1);
+       if ( i < 0)
+               throw(MAL,"wlcr.properties","Duration must be a possitive 
number");
+       wlcr_duration = i;
+
+       i = *getArgReference_int(stk,pci,2);
+       if ( i < 0)
+               throw(MAL,"wlcr.properties","Duration must be a possitive 
number");
+       wlcr_threshold = i;
+
+       wlcr_deltas = *getArgReference_int(stk,pci,3) != 0;
+       wlcr_all = *getArgReference_int(stk,pci,4) != 0;
+       return MAL_SUCCEED;
+}
+
+str
+WLCRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str snapshot = *getArgReference_str(stk,pci,1);
+       int tid = *getArgReference_int(stk,pci,2);
+
+       (void) cntxt;
+       (void) mb;
+
+       if ( strcmp(snapshot, wlcr_snapshot))
+               throw(MAL,"wlcr.job","Incompatible snapshot identifier");
+       if ( tid < wlcr_unit)
+               throw(MAL,"wlcr.job","Work unit identifier is before last one 
executed");
+       return MAL_SUCCEED;
+}
+
+str
+WLCRfin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       (void) cntxt;
+       (void) mb;
+       (void) stk;
+       (void) pci;
+       return MAL_SUCCEED;
+}
+
+str
+WLCRquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{      InstrPtr p;
+
+       (void) stk;
+       if ( strcmp("-- no query",getVarConstant(mb, getArg(pci,1)).val.sval) 
== 0)
+               return MAL_SUCCEED;
+       WLCR_start();
+       p = newStmt(cntxt->wlcr, "wlreplay","query");
+       p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,1)).val.sval);
+       p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,2)).val.sval);
+       p->ticks = GDKms();
+       return MAL_SUCCEED;
+}
+
+str
+WLCRgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{      InstrPtr p;
+       int i, tpe, varid;
+       (void) stk;
+       
+       WLCR_start();
+       p = newStmt(cntxt->wlcr, "wlreplay",getFunctionId(pci));
+       for( i = pci->retc; i< pci->argc; i++){
+               tpe =getArgType(mb, pci, i);
+               switch(tpe){
+               case TYPE_str:
+                       p = pushStr(cntxt->wlcr, p, getVarConstant(mb, 
getArg(pci, i)).val.sval);
+                       break;
+               default:
+                       varid = defConstant(cntxt->wlcr, tpe, 
getArgReference(stk, pci, i));
+                       p = pushArgument(cntxt->wlcr, p, varid);
+               }
+       }
+       p->ticks = GDKms();
+       cntxt->wlcr_kind = WLCR_CATALOG;
+       return MAL_SUCCEED;
+}
+
+#define bulk(TPE1, TPE2)\
+{      TPE1 *p = (TPE1 *) Tloc(b,0);\
+       TPE1 *q = (TPE1 *) Tloc(b, BUNlast(b));\
+       int k=0; \
+       for( ; p < q; p++, k++){\
+               if( k % 32 == 31){\
+                       pci = newStmt(cntxt->wlcr, 
"wlreplay",getFunctionId(pci));\
+                       pci = pushStr(cntxt->wlcr, pci, sch);\
+                       pci = pushStr(cntxt->wlcr, pci, tbl);\
+                       pci = pushStr(cntxt->wlcr, pci, col);\
+                       pci->ticks = GDKms();\
+               }\
+               pci = push##TPE2(cntxt->wlcr, pci ,*p);\
+} }
+
+static void
+WLCRdatashipping(Client cntxt, MalBlkPtr mb, InstrPtr pci, int bid)
+{      BAT *b;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to