Changeset: 78f1b9687303 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=78f1b9687303 Added Files: sql/test/wlcr/Tests/wlr110.py sql/test/wlcr/Tests/wlr110.stable.err sql/test/wlcr/Tests/wlr110.stable.out sql/test/wlcr/Tests/wlr35.py sql/test/wlcr/Tests/wlr35.stable.err sql/test/wlcr/Tests/wlr35.stable.out Modified Files: clients/Tests/exports.stable.out monetdb5/modules/mal/wlc.c sql/backends/monet5/wlr.c sql/backends/monet5/wlr.h sql/scripts/61_wlcr.sql sql/test/wlcr/Tests/All sql/test/wlcr/Tests/wlr10.py sql/test/wlcr/Tests/wlr100.py sql/test/wlcr/Tests/wlr100.stable.err sql/test/wlcr/Tests/wlr100.stable.out sql/test/wlcr/Tests/wlr20.py sql/test/wlcr/Tests/wlr30.py sql/test/wlcr/Tests/wlr40.py sql/test/wlcr/Tests/wlr40.stable.err sql/test/wlcr/Tests/wlr40.stable.out sql/test/wlcr/Tests/wlr50.py sql/test/wlcr/Tests/wlr50.stable.err sql/test/wlcr/Tests/wlr50.stable.out sql/test/wlcr/Tests/wlr70.py sql/test/wlcr/Tests/wlr80.py Branch: gdk_tracer Log Message:
Cleanup of the replicationc code to focus on either a background thread or repetitive calls to wlr.replicate(limit). The code is more robust against interrupting the replication stream. Exceptions are now properly returned to client. Any error blocks further rolling forward. Manual corrections can be applied, whereafter the wlr.accept() skips the offending replication transaction. diffs (truncated from 1621 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -286,6 +286,20 @@ ssize_t GDKstrFromStr(unsigned char *res str GDKstrdup(const char *s) __attribute__((__warn_unused_result__)); str GDKstrndup(const char *s, size_t n) __attribute__((__warn_unused_result__)); void GDKsyserror(_In_z_ _Printf_format_string_ const char *format, ...) __attribute__((__format__(__printf__, 1, 2))); +gdk_return GDKtracer_flush_buffer(void); +char *GDKtracer_get_timestamp(char *fmt); +gdk_return GDKtracer_init(void); +gdk_return GDKtracer_log(LOG_LEVEL level, char *fmt, ...) __attribute__((format(printf, 2, 3))); +gdk_return GDKtracer_reset_adapter(void); +gdk_return GDKtracer_reset_component_level(int *comp); +gdk_return GDKtracer_reset_flush_level(void); +gdk_return GDKtracer_reset_layer_level(int *layer); +gdk_return GDKtracer_set_adapter(int *adapter); +gdk_return GDKtracer_set_component_level(int *comp, int *lvl); +gdk_return GDKtracer_set_flush_level(int *lvl); +gdk_return GDKtracer_set_layer_level(int *layer, int *lvl); +gdk_return GDKtracer_show_info(void); +gdk_return GDKtracer_stop(void); size_t GDKuniqueid(size_t offset); gdk_return GDKupgradevarheap(BAT *b, var_t v, bool copyall, bool mayshare) __attribute__((__warn_unused_result__)); lng GDKusec(void); @@ -304,6 +318,7 @@ size_t HEAPmemsize(Heap *h); size_t HEAPvmsize(Heap *h); void IMPSdestroy(BAT *b); lng IMPSimprintsize(BAT *b); +LOG_LEVEL LVL_PER_COMPONENT[]; int MT_check_nr_cores(void); int MT_create_thread(MT_Id *t, void (*function)(void *), void *arg, enum MT_thr_detach d, const char *threadname); void MT_exiting_thread(void); diff --git a/monetdb5/modules/mal/wlc.c b/monetdb5/modules/mal/wlc.c --- a/monetdb5/modules/mal/wlc.c +++ b/monetdb5/modules/mal/wlc.c @@ -537,7 +537,7 @@ WLCsettime(Client cntxt, InstrPtr pci, I #else ctm = *localtime(&clk); #endif - strftime(wlc_time, sizeof(wlc_time), "%Y-%m-%dT%H:%M:%S.000",&ctm); + strftime(wlc_time, sizeof(wlc_time), "%Y-%m-%d %H:%M:%S.000",&ctm); if (pushStr(cntxt->wlc, p, wlc_time) == NULL) throw(MAL, fcn, MAL_MALLOC_FAIL); return MAL_SUCCEED; @@ -580,7 +580,7 @@ WLCpreparewrite(Client cntxt) } MT_lock_set(&wlc_lock); - printFunction(wlc_fd, cntxt->wlc, 0, LIST_MAL_DEBUG ); + printFunction(wlc_fd, cntxt->wlc, 0, LIST_MAL_CALL ); (void) mnstr_flush(wlc_fd); // close file if no delay is allowed if( wlc_beat == 0 ) @@ -898,7 +898,7 @@ WLCdelete(Client cntxt, MalBlkPtr mb, Ma last = o + BATcount(b); if( b->ttype == TYPE_void){ for( ; o < last; o++, k++){ - if( k%32 == 31){ + if( k % 32 == 31){ p = newStmt(cntxt->wlc, "wlr","delete"); p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval); p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval); @@ -907,8 +907,8 @@ WLCdelete(Client cntxt, MalBlkPtr mb, Ma } } else { ol = (oid*) Tloc(b,0); - for( ; o < last; o++, k++){ - if( k%32 == 31){ + for( ; o < last; o++, k++, ol++){ + if( k % 32 == 31){ p = newStmt(cntxt->wlc, "wlr","delete"); p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,1)).val.sval); p = pushStr(cntxt->wlc, p, getVarConstant(mb, getArg(pci,2)).val.sval); diff --git a/sql/backends/monet5/wlr.c b/sql/backends/monet5/wlr.c --- a/sql/backends/monet5/wlr.c +++ b/sql/backends/monet5/wlr.c @@ -15,8 +15,7 @@ * The replicator copies all of them unto and including wlc_limit. * This leads to the wlr_tag from -1 .. wlc_limit, wlr_tag,..., INT64_MAX * - * Replication start after setting the master id and giving an (optional) - * wlr_limit. + * Replication start after setting the master id and giving an (optional) wlr_limit. * Any error encountered in replaying the log stops the process, because then * no guarantee can be given on the consistency with the master database. * A manual fix for an exceptional case is allowed, whereafter a call @@ -51,47 +50,43 @@ MT_Lock wlr_lock = MT_LOCK_INITIALIZ */ static char wlr_master[IDLENGTH]; static int wlr_batches; // the next file to be processed -static lng wlr_tag = -1; // the last transaction id being processed +static lng wlr_tag = -1; // the last transaction id being processed +static char wlr_read[26]; // last record read static char wlr_timelimit[26]; // stop re-processing transactions when time limit is reached -static char wlr_read[26]; // stop re-processing transactions when time limit is reached static int wlr_beat; // period between successive synchronisations with master -static char wlr_error[FILENAME_MAX]; // errors should stop the process +static char wlr_error[BUFSIZ]; // error that stopped the replication process -static MT_Id wlr_thread = 0; // The single replicator thread +static MT_Id wlr_thread = 0; // The single replicator thread is active static int wlr_state = WLR_WAIT; // which state WAIT/RUN static lng wlr_limit = -1; // stop re-processing after transaction id 'wlr_limit' is processed #define MAXLINE 2048 /* Simple read the replica configuration status file */ -static int +static str WLRgetConfig(void){ char *path; char line[MAXLINE]; FILE *fd; int len; + str msg= MAL_SUCCEED; - if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL){ - TRC_ERROR(SQL_WLR, "Could not create wlr.config file path\n"); - return -1; - } + if((path = GDKfilepath(0, 0, "wlr.config", 0)) == NULL) + throw(MAL,"wlr.getConfig", "Could not create wlr.config file path\n"); fd = fopen(path,"r"); GDKfree(path); - if( fd == NULL){ - // during start of the replicator it need not be there - return 1; - } + if( fd == NULL) + throw(MAL,"wlr.getConfig", "Could not access wlr.config file \n"); while( fgets(line, MAXLINE, fd) ){ line[strlen(line)-1]= 0; - TRC_DEBUG(SQL_WLR, "%s\n", line); if( strncmp("master=", line,7) == 0) { len = snprintf(wlr_master, IDLENGTH, "%s", line + 7); if (len == -1 || len >= IDLENGTH) { - TRC_ERROR(SQL_WLR, "Master config value is too large\n"); + msg= createException(SQL,"wlr.getConfig", "Master config value is too large\n"); goto bailout; } else if (len == 0) { - TRC_ERROR(SQL_WLR, "Master config path is missing\n"); + msg = createException(SQL,"wlr.getConfig", "Master config path is missing\n"); goto bailout; } } else @@ -104,58 +99,51 @@ WLRgetConfig(void){ if( strncmp("beat=", line, 5) == 0) wlr_beat = atoi(line+ 5); else - if( strncmp("timelimit=", line, 10) == 0) - strcpy(wlr_timelimit, line + 10); + if( strncmp("time=", line, 5) == 0) + strcpy(wlr_read, line + 5); else if( strncmp("error=", line, 6) == 0) { char *s; - len = snprintf(wlr_error, FILENAME_MAX, "%s", line + 6); - if (len == -1 || len >= FILENAME_MAX) { - TRC_ERROR(SQL_WLR, "Config value is too large\n"); + len = snprintf(wlr_error, BUFSIZ, "%s", line + 6); + if (len == -1 || len >= BUFSIZ) { + msg = createException(SQL, "wlr.getConfig", "Config value is too large\n"); goto bailout; } s = strchr(wlr_error, (int) '\n'); if ( s) *s = 0; - } else{ - TRC_ERROR(SQL_WLR, "Unknown configuration item '%s'\n", line); - goto bailout; - } + } } - return 0; + fclose(fd); + return msg; bailout: fclose(fd); - return -1; + return msg; } /* Keep the current status in the configuration status file */ -static void +static str WLRputConfig(void){ char *path; stream *fd; + str msg = MAL_SUCCEED; - if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL){ - TRC_ERROR(SQL_WLR, "Could not access wlr.config file\n"); - return ; - } + if((path = GDKfilepath(0,0,"wlr.config",0)) == NULL) + throw(SQL, "wlr.putConfig", "Could not access wlr.config file\n"); fd = open_wastream(path); GDKfree(path); - if( fd == NULL){ - TRC_ERROR(SQL_WLR, "Could not create wlr.config file\n"); - return; - } + if( fd == NULL) + throw(SQL,"wlr.putConfig", "Could not create wlr.config file\n"); mnstr_printf(fd,"master=%s\n", wlr_master); mnstr_printf(fd,"batches=%d\n", wlr_batches); mnstr_printf(fd,"tag="LLFMT"\n", wlr_tag); mnstr_printf(fd,"beat=%d\n", wlr_beat); if( wlr_timelimit[0]) - mnstr_printf(fd,"timelimit=%s\n", wlr_timelimit); + mnstr_printf(fd,"time=%s\n", wlr_read); if( wlr_error[0]) - mnstr_printf(fd,"error=%s\n", wlr_error); + mnstr_printf(fd,"error=%s", wlr_error); close_stream(fd); - - TRC_DEBUG(SQL_WLR, "Batches %d tag " LLFMT " limit "LLFMT " beat %d timelimit %s\n", - wlr_batches, wlr_tag, wlr_limit, wlr_beat, wlr_timelimit); + return msg; } /* @@ -187,18 +175,17 @@ WLRgetMaster(void) len = snprintf(path, FILENAME_MAX, "..%c%s", DIR_SEP, wlr_master); if (len == -1 || len >= FILENAME_MAX) throw(MAL, "wlr.getMaster", "wlc.config filename path is too large"); - if((dir = GDKfilepath(0,path,"wlc.config",0)) == NULL) + if((dir = GDKfilepath(0, path, "wlc.config", 0)) == NULL) throw(MAL,"wlr.getMaster","Could not access wlc.config file %s/wlc.config\n", path); fd = fopen(dir,"r"); GDKfree(dir); - if( fd ){ - WLCreadConfig(fd); - if( ! wlr_master[0] ) - throw(MAL,"wlr.getMaster","Master not identified\n"); - wlc_state = WLC_CLONE; // not used as master - } else + if( fd == NULL ) throw(MAL,"wlr.getMaster","Could not get read access to '%s'config file\n", wlr_master); + WLCreadConfig(fd); + if( !wlr_master[0] ) + throw(MAL,"wlr.getMaster","Master not identified\n"); + wlc_state = WLC_CLONE; // not used as master return MAL_SUCCEED; } @@ -211,10 +198,9 @@ WLRgetMaster(void) trimMalVariables(mb, NULL);\ } -static void -WLRprocessBatch(void *arg) +static str +WLRprocessBatch(Client cntxt) { - Client cntxt = (Client) arg; int i, len; char path[FILENAME_MAX]; stream *fd = NULL; @@ -222,32 +208,35 @@ WLRprocessBatch(void *arg) size_t sz; MalBlkPtr mb; InstrPtr q; - str msg, other; + str other; mvc *sql; Symbol prev = NULL; - lng tag = wlr_tag; + lng tag; char tag_read[26]; // stop re-processing transactions when time limit is reached + str action= NULL; + str msg= MAL_SUCCEED, msg2= MAL_SUCCEED; + + WLRgetConfig(); + tag = wlr_tag; + if( wlr_error[0]) + return GDKstrdup(wlr_error); c =MCforkClient(cntxt); - if( c == 0){ - TRC_ERROR(SQL_WLR, "Could not create user for WLR process\n"); - return; - } + if( c == 0) + throw(MAL, "wlr.batch", "Could not create user for WLR process\n"); c->promptlength = 0; c->listing = 0; c->fdout = open_wastream(".wlr"); if(c->fdout == NULL) { MCcloseClient(c); - TRC_ERROR(SQL_WLR, "Could not create user for WLR process\n"); - return; + throw(MAL,"wlr.batch", "Could not create user for WLR process\n"); } /* Cook a log file into a concreate MAL function for multiple transactions */ prev = newFunction(putName("user"), putName("wlr"), FUNCTIONsymbol); if(prev == NULL) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list