Changeset: 78f1b9687303 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=78f1b9687303
Added Files:
        sql/test/wlcr/Tests/wlr110.py
        sql/test/wlcr/Tests/wlr110.stable.err
        sql/test/wlcr/Tests/wlr110.stable.out
        sql/test/wlcr/Tests/wlr35.py
        sql/test/wlcr/Tests/wlr35.stable.err
        sql/test/wlcr/Tests/wlr35.stable.out
Modified Files:
        clients/Tests/exports.stable.out
        monetdb5/modules/mal/wlc.c
        sql/backends/monet5/wlr.c
        sql/backends/monet5/wlr.h
        sql/scripts/61_wlcr.sql
        sql/test/wlcr/Tests/All
        sql/test/wlcr/Tests/wlr10.py
        sql/test/wlcr/Tests/wlr100.py
        sql/test/wlcr/Tests/wlr100.stable.err
        sql/test/wlcr/Tests/wlr100.stable.out
        sql/test/wlcr/Tests/wlr20.py
        sql/test/wlcr/Tests/wlr30.py
        sql/test/wlcr/Tests/wlr40.py
        sql/test/wlcr/Tests/wlr40.stable.err
        sql/test/wlcr/Tests/wlr40.stable.out
        sql/test/wlcr/Tests/wlr50.py
        sql/test/wlcr/Tests/wlr50.stable.err
        sql/test/wlcr/Tests/wlr50.stable.out
        sql/test/wlcr/Tests/wlr70.py
        sql/test/wlcr/Tests/wlr80.py
Branch: gdk_tracer
Log Message:

Cleanup of the replicationc code to focus on either a background thread
or repetitive calls to wlr.replicate(limit).
The code is more robust against interrupting the replication stream.
Exceptions are now properly returned to client. Any error blocks
further rolling forward. Manual corrections can be applied, whereafter
the wlr.accept() skips the offending replication transaction.


diffs (truncated from 1621 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -286,6 +286,20 @@ ssize_t GDKstrFromStr(unsigned char *res
 str GDKstrdup(const char *s) __attribute__((__warn_unused_result__));
 str GDKstrndup(const char *s, size_t n) 
__attribute__((__warn_unused_result__));
 void GDKsyserror(_In_z_ _Printf_format_string_ const char *format, ...) 
__attribute__((__format__(__printf__, 1, 2)));
+gdk_return GDKtracer_flush_buffer(void);
+char *GDKtracer_get_timestamp(char *fmt);
+gdk_return GDKtracer_init(void);
+gdk_return GDKtracer_log(LOG_LEVEL level, char *fmt, ...) 
__attribute__((format(printf, 2, 3)));
+gdk_return GDKtracer_reset_adapter(void);
+gdk_return GDKtracer_reset_component_level(int *comp);
+gdk_return GDKtracer_reset_flush_level(void);
+gdk_return GDKtracer_reset_layer_level(int *layer);
+gdk_return GDKtracer_set_adapter(int *adapter);
+gdk_return GDKtracer_set_component_level(int *comp, int *lvl);
+gdk_return GDKtracer_set_flush_level(int *lvl);
+gdk_return GDKtracer_set_layer_level(int *layer, int *lvl);
+gdk_return GDKtracer_show_info(void);
+gdk_return GDKtracer_stop(void);
 size_t GDKuniqueid(size_t offset);
 gdk_return GDKupgradevarheap(BAT *b, var_t v, bool copyall, bool mayshare) 
__attribute__((__warn_unused_result__));
 lng GDKusec(void);
@@ -304,6 +318,7 @@ size_t HEAPmemsize(Heap *h);
 size_t HEAPvmsize(Heap *h);
 void IMPSdestroy(BAT *b);
 lng IMPSimprintsize(BAT *b);
+LOG_LEVEL LVL_PER_COMPONENT[];
 int MT_check_nr_cores(void);
 int MT_create_thread(MT_Id *t, void (*function)(void *), void *arg, enum 
MT_thr_detach d, const char *threadname);
 void MT_exiting_thread(void);
diff --git a/monetdb5/modules/mal/wlc.c b/monetdb5/modules/mal/wlc.c
--- a/monetdb5/modules/mal/wlc.c
+++ b/monetdb5/modules/mal/wlc.c
@@ -537,7 +537,7 @@ WLCsettime(Client cntxt, InstrPtr pci, I
 #else
        ctm = *localtime(&clk);
 #endif
-       strftime(wlc_time, sizeof(wlc_time), "%Y-%m-%dT%H:%M:%S.000",&ctm);
+       strftime(wlc_time, sizeof(wlc_time), "%Y-%m-%d %H:%M:%S.000",&ctm);
        if (pushStr(cntxt->wlc, p, wlc_time) == NULL)
                throw(MAL, fcn, MAL_MALLOC_FAIL);
        return MAL_SUCCEED;
@@ -580,7 +580,7 @@ WLCpreparewrite(Client cntxt)
                }
                
                MT_lock_set(&wlc_lock);
-               printFunction(wlc_fd, cntxt->wlc, 0, LIST_MAL_DEBUG );
+               printFunction(wlc_fd, cntxt->wlc, 0, LIST_MAL_CALL );
                (void) mnstr_flush(wlc_fd);
                // close file if no delay is allowed
                if( wlc_beat == 0 )
@@ -898,7 +898,7 @@ WLCdelete(Client cntxt, MalBlkPtr mb, Ma
                last = o + BATcount(b);
                if( b->ttype == TYPE_void){
                        for( ; o < last; o++, k++){
-                               if( k%32 == 31){
+                               if( k % 32 == 31){
                                        p = newStmt(cntxt->wlc, "wlr","delete");
                                        p = pushStr(cntxt->wlc, p, 
getVarConstant(mb, getArg(pci,1)).val.sval);
                                        p = pushStr(cntxt->wlc, p, 
getVarConstant(mb, getArg(pci,2)).val.sval);
@@ -907,8 +907,8 @@ WLCdelete(Client cntxt, MalBlkPtr mb, Ma
                        }
                } else {
                        ol = (oid*) Tloc(b,0);
-                       for( ; o < last; o++, k++){
-                               if( k%32 == 31){
+                       for( ; o < last; o++, k++, ol++){
+                               if( k % 32 == 31){
                                        p = newStmt(cntxt->wlc, "wlr","delete");
                                        p = pushStr(cntxt->wlc, p, 
getVarConstant(mb, getArg(pci,1)).val.sval);
                                        p = pushStr(cntxt->wlc, p, 
getVarConstant(mb, getArg(pci,2)).val.sval);
diff --git a/sql/backends/monet5/wlr.c b/sql/backends/monet5/wlr.c
--- a/sql/backends/monet5/wlr.c
+++ b/sql/backends/monet5/wlr.c
@@ -15,8 +15,7 @@
  * The replicator copies all of them unto and including wlc_limit.
  * This leads to the wlr_tag from -1 .. wlc_limit, wlr_tag,..., INT64_MAX
  *
- * Replication start after setting the master id and giving an (optional)
- * wlr_limit.
+ * Replication start after setting the master id and giving an (optional) 
wlr_limit.
  * Any error encountered in replaying the log stops the process, because then
  * no guarantee can be given on the consistency with the master database.
  * A manual fix for an exceptional case is allowed, whereafter a call
@@ -51,47 +50,43 @@ MT_Lock     wlr_lock = MT_LOCK_INITIALIZ
  */
 static char wlr_master[IDLENGTH];
 static int     wlr_batches;                            // the next file to be 
processed
-static lng     wlr_tag = -1;                                   // the last 
transaction id being processed
+static lng     wlr_tag = -1;                           // the last transaction 
id being processed
+static char wlr_read[26];                              // last record read
 static char wlr_timelimit[26];                 // stop re-processing 
transactions when time limit is reached
-static char wlr_read[26];                              // stop re-processing 
transactions when time limit is reached
 static int     wlr_beat;                                       // period 
between successive synchronisations with master
-static char wlr_error[FILENAME_MAX];   // errors should stop the process
+static char wlr_error[BUFSIZ]; // error that stopped the replication process
 
-static MT_Id wlr_thread = 0;                   // The single replicator thread
+static MT_Id wlr_thread = 0;                   // The single replicator thread 
is active
 static int     wlr_state = WLR_WAIT;           // which state WAIT/RUN
 static lng     wlr_limit = -1;                         // stop re-processing 
after transaction id 'wlr_limit' is processed
 
 #define MAXLINE 2048
 
 /* Simple read the replica configuration status file */
-static int
+static str
 WLRgetConfig(void){
        char *path;
        char line[MAXLINE];
        FILE *fd;
        int len;
+       str msg= MAL_SUCCEED;
 
-       if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL){
-               TRC_ERROR(SQL_WLR, "Could not create wlr.config file path\n");
-               return -1;
-       }
+       if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL)
+               throw(MAL,"wlr.getConfig", "Could not create wlr.config file 
path\n");
        fd = fopen(path,"r");
        GDKfree(path);
-       if( fd == NULL){
-               // during start of the replicator it need not be there
-               return 1;
-       }
+       if( fd == NULL)
+               throw(MAL,"wlr.getConfig", "Could not access wlr.config file 
\n");
        while( fgets(line, MAXLINE, fd) ){
                line[strlen(line)-1]= 0;
-               TRC_DEBUG(SQL_WLR, "%s\n", line);
                if( strncmp("master=", line,7) == 0) {
                        len = snprintf(wlr_master, IDLENGTH, "%s", line + 7);
                        if (len == -1 || len >= IDLENGTH) {
-                               TRC_ERROR(SQL_WLR, "Master config value is too 
large\n");
+                               msg= createException(SQL,"wlr.getConfig", 
"Master config value is too large\n");
                                goto bailout;
                        } else
                        if (len  == 0) {
-                               TRC_ERROR(SQL_WLR, "Master config path is 
missing\n");
+                               msg = createException(SQL,"wlr.getConfig", 
"Master config path is missing\n");
                                goto bailout;
                        }
                } else
@@ -104,58 +99,51 @@ WLRgetConfig(void){
                if( strncmp("beat=", line, 5) == 0)
                        wlr_beat = atoi(line+ 5);
                else
-               if( strncmp("timelimit=", line, 10) == 0)
-                       strcpy(wlr_timelimit, line + 10);
+               if( strncmp("time=", line, 5) == 0)
+                       strcpy(wlr_read, line + 5);
                else
                if( strncmp("error=", line, 6) == 0) {
                        char *s;
-                       len = snprintf(wlr_error, FILENAME_MAX, "%s", line + 6);
-                       if (len == -1 || len >= FILENAME_MAX) {
-                               TRC_ERROR(SQL_WLR, "Config value is too 
large\n");
+                       len = snprintf(wlr_error, BUFSIZ, "%s", line + 6);
+                       if (len == -1 || len >= BUFSIZ) {
+                               msg = createException(SQL, "wlr.getConfig", 
"Config value is too large\n");
                                goto bailout;
                        }
                        s = strchr(wlr_error, (int) '\n');
                        if ( s) *s = 0;
-               } else{
-                               TRC_ERROR(SQL_WLR, "Unknown configuration item 
'%s'\n", line);
-                               goto bailout;
-               }
+               } 
        }
-       return 0;
+       fclose(fd);
+       return msg;
 bailout:
        fclose(fd);
-       return -1;
+       return msg;
 }
 
 /* Keep the current status in the configuration status file */
-static void
+static str
 WLRputConfig(void){
        char *path;
        stream *fd;
+       str msg = MAL_SUCCEED;
 
-       if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL){
-               TRC_ERROR(SQL_WLR, "Could not access wlr.config file\n");
-               return ;
-       }
+       if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL)
+               throw(SQL, "wlr.putConfig", "Could not access wlr.config 
file\n");
        fd = open_wastream(path);
        GDKfree(path);
-       if( fd == NULL){
-               TRC_ERROR(SQL_WLR, "Could not create wlr.config file\n");
-               return;
-       }
+       if( fd == NULL)
+               throw(SQL,"wlr.putConfig", "Could not create wlr.config 
file\n");
 
        mnstr_printf(fd,"master=%s\n", wlr_master);
        mnstr_printf(fd,"batches=%d\n", wlr_batches);
        mnstr_printf(fd,"tag="LLFMT"\n", wlr_tag);
        mnstr_printf(fd,"beat=%d\n", wlr_beat);
        if( wlr_timelimit[0])
-               mnstr_printf(fd,"timelimit=%s\n", wlr_timelimit);
+               mnstr_printf(fd,"time=%s\n", wlr_read);
        if( wlr_error[0])
-               mnstr_printf(fd,"error=%s\n", wlr_error);
+               mnstr_printf(fd,"error=%s", wlr_error);
        close_stream(fd);
-
-       TRC_DEBUG(SQL_WLR, "Batches %d tag " LLFMT " limit "LLFMT " beat %d 
timelimit %s\n",
-                                       wlr_batches, wlr_tag, wlr_limit, 
wlr_beat, wlr_timelimit);
+       return msg;
 }
 
 /*
@@ -187,18 +175,17 @@ WLRgetMaster(void)
        len = snprintf(path, FILENAME_MAX, "..%c%s", DIR_SEP, wlr_master);
        if (len == -1 || len >= FILENAME_MAX)
                throw(MAL, "wlr.getMaster", "wlc.config filename path is too 
large");
-       if((dir = GDKfilepath(0,path,"wlc.config",0)) == NULL)
+       if((dir = GDKfilepath(0, path, "wlc.config", 0)) == NULL)
                throw(MAL,"wlr.getMaster","Could not access wlc.config file 
%s/wlc.config\n", path);
 
        fd = fopen(dir,"r");
        GDKfree(dir);
-       if( fd ){
-               WLCreadConfig(fd);
-               if( ! wlr_master[0] )
-                       throw(MAL,"wlr.getMaster","Master not identified\n");
-               wlc_state = WLC_CLONE; // not used as master
-       } else
+       if( fd == NULL )
                throw(MAL,"wlr.getMaster","Could not get read access to 
'%s'config file\n", wlr_master);
+       WLCreadConfig(fd);
+       if( !wlr_master[0] )
+               throw(MAL,"wlr.getMaster","Master not identified\n");
+       wlc_state = WLC_CLONE; // not used as master
        return MAL_SUCCEED;
 }
 
@@ -211,10 +198,9 @@ WLRgetMaster(void)
        trimMalVariables(mb, NULL);\
        }
 
-static void
-WLRprocessBatch(void *arg)
+static str
+WLRprocessBatch(Client cntxt)
 {
-       Client cntxt = (Client) arg;
        int i, len;
        char path[FILENAME_MAX];
        stream *fd = NULL;
@@ -222,32 +208,35 @@ WLRprocessBatch(void *arg)
        size_t sz;
        MalBlkPtr mb;
        InstrPtr q;
-       str msg, other;
+       str other;
        mvc *sql;
        Symbol prev = NULL;
-       lng tag = wlr_tag;
+       lng tag;
        char tag_read[26];                      // stop re-processing 
transactions when time limit is reached
+       str action= NULL;
+       str msg= MAL_SUCCEED, msg2= MAL_SUCCEED;
+
+       WLRgetConfig();
+       tag = wlr_tag;
+       if( wlr_error[0])
+               return GDKstrdup(wlr_error);
 
        c =MCforkClient(cntxt);
-       if( c == 0){
-               TRC_ERROR(SQL_WLR, "Could not create user for WLR process\n"); 
-               return;
-       }
+       if( c == 0)
+               throw(MAL, "wlr.batch", "Could not create user for WLR 
process\n"); 
        c->promptlength = 0;
        c->listing = 0;
        c->fdout = open_wastream(".wlr");
        if(c->fdout == NULL) {
                MCcloseClient(c);
-               TRC_ERROR(SQL_WLR, "Could not create user for WLR process\n");
-               return;
+               throw(MAL,"wlr.batch", "Could not create user for WLR 
process\n");
        }
 
        /* Cook a log file into a concreate MAL function for multiple 
transactions */
        prev = newFunction(putName("user"), putName("wlr"), FUNCTIONsymbol);
        if(prev == NULL) {
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to