Changeset: 9cfebdb5fbe1 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/9cfebdb5fbe1 Modified Files: gdk/gdk_logger.c gdk/gdk_logger_internals.h gdk/gdk_logger_old.c Branch: logger-cleanup Log Message:
fix flush range cleanup diffs (206 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -1064,14 +1064,14 @@ static gdk_return log_open_output(logger *lg) { logged_range *new_range = (logged_range*)GDKmalloc(sizeof(logged_range)); - ATOMIC_INIT(&new_range->refcount, 1); if (!new_range) { TRC_CRITICAL(GDK, "allocation failure\n"); return GDK_FAIL; } - + ATOMIC_INIT(&new_range->refcount, 1); + ATOMIC_INIT(&new_range->last_ts, 0); ATOMIC_INIT(&new_range->end, 0); ATOMIC_INIT(&new_range->pend, 0); ATOMIC_INIT(&new_range->flushed_end, 0); @@ -1108,7 +1108,6 @@ log_open_output(logger *lg) GDKfree(filename); } new_range->id = lg->id; - new_range->last_ts = 0; new_range->next = NULL; logged_range* current = lg->current; assert(current && current->next == NULL); @@ -2323,9 +2322,9 @@ log_next_logfile(logger *lg, ulng ts) int m = (GDKdebug & FORCEMITOMASK)?1000:100; if (!lg->pending || !lg->pending->next) return 0; - if (lg->pending != lg->current && lg->pending->last_ts <= ts) { + if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && (ulng) ATOMIC_GET(&lg->pending->last_ts) <= ts) { logged_range *p = lg->pending; - for(int i = 1; i<m && p->next && p->next != lg->current && p->last_ts <= ts; i++) + for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current && (ulng) ATOMIC_GET(&p->last_ts) <= ts; i++) p = p->next; return p->id; } @@ -2348,8 +2347,8 @@ gdk_return log_activate(logger *lg) { gdk_return res = GDK_SUCCEED; - rotation_lock(lg); /*protects against potential concurrent write in log_tflush*/ - if (lg->current->drops > 100000 && ATOMIC_GET(&lg->current->end) > 0 && lg->saved_id+1 == lg->id) { + rotation_lock(lg); + if (lg->current->drops > 100000 && ((ulng) ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && lg->saved_id+1 == lg->id) { lg->id++; log_close_output(lg); /* start new file */ @@ -2863,7 +2862,7 @@ check_rotation_conditions(logger *lg) { const lng log_large = (GDKdebug & FORCEMITOMASK)?LOG_MINI:LOG_LARGE; return (lg->saved_id+1 < lg->id && lg->current->drops > 100000) || - (p > log_large || (((lng) ATOMIC_GET(&lg->current->end) * 1024) > log_large)); + (p > log_large || (((lng) ATOMIC_GET(&lg->current->end) - (lng) ATOMIC_GET(&lg->current->pend)) * 1024) > log_large); } gdk_return @@ -2924,8 +2923,42 @@ log_tdone(logger* lg, logged_range *rang if (lg->debug & 1) fprintf(stderr, "#log_tdone " LLFMT "\n", commit_ts); - if (range->last_ts < commit_ts) - range->last_ts = commit_ts; + if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts) + ATOMIC_SET(&range->last_ts, commit_ts); +} + +static void +do_rotate(logger *lg) { + logged_range* next = lg->current->next; + if (next) { + assert(ATOMIC_GET(&next->refcount) == 1); + lng end = ATOMIC_GET(&lg->current->end); + ATOMIC_SET(&next->pend, end); + ATOMIC_SET(&next->end, end); + logged_range* pcurrent = lg->current; + assert(ATOMIC_GET(&pcurrent->refcount) > 0); + lg->current = lg->current->next; + (void) ATOMIC_DEC(&pcurrent->refcount); + } +} + +static logged_range* +do_flush_range_cleanup(logger* lg) { + + logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges); + while ( frange->next) { + if (ATOMIC_GET(&frange->refcount)) + break; + if (ATOMIC_PTR_CAS(&lg->flush_ranges, &frange, frange->next)) { + close_stream(frange->output_log); + frange->output_log = NULL; + frange = frange->next; + } + else /*some other flusher is cleaning up the flush ranges*/ + break; + } + + return frange; } gdk_return @@ -2933,32 +2966,27 @@ log_tflush(logger* lg, ulng log_file_id, if (lg->flushnow) { logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges); + assert(frange == lg->current); + ulng end = ATOMIC_GET(&lg->current->end); + assert(end > ATOMIC_GET(&lg->current->pend)); + ATOMIC_SET(&lg->current->flushed_end, end); + log_tdone(lg, lg->current, commit_ts); + lg->id++; + lg->flushnow = 0; + if (log_open_output(lg) != GDK_SUCCEED) + GDKfatal("Could not create new log file\n"); // TODO: does not have to be fatal (yet) + do_rotate(lg); (void) ATOMIC_DEC(&frange->refcount); assert(ATOMIC_GET(&frange->refcount) == 0); - assert(frange == lg->current); - assert(ATOMIC_GET(&lg->current->end) ==ATOMIC_GET(&lg->current->pend)); - lg->flushnow = 0; - if (log_commit(lg) == GDK_SUCCEED) - log_tdone(lg, lg->current, commit_ts); - else - return GDK_FAIL; + (void) do_flush_range_cleanup(lg); + return log_commit(lg); } if (LOG_DISABLED(lg)) { return GDK_SUCCEED; } - logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges); - logged_range* first = frange; - while ( frange->next) { - if (ATOMIC_GET(&frange->refcount)) - break; - close_stream(frange->output_log); - frange->output_log = NULL; - frange = frange->next; - } - if (frange != first) - ATOMIC_PTR_CAS(&lg->flush_ranges, &first, frange); + logged_range* frange = do_flush_range_cleanup(lg); lng end = (lng) log_file_id; while ((lng) ATOMIC_GET(&frange->end) < end) { @@ -3156,20 +3184,6 @@ log_find_bat(logger *lg, log_id id) } -static inline void -do_rotate(logger *lg) { - logged_range* next = lg->current->next; - if (next) { - assert(ATOMIC_GET(&next->refcount) == 1); - lng end = ATOMIC_GET(&lg->current->end); - ATOMIC_SET(&next->pend, end); - ATOMIC_SET(&next->end, end); - logged_range* pcurrent = lg->current; - assert(ATOMIC_GET(&pcurrent->refcount) > 0); - lg->current = lg->current->next; - (void) ATOMIC_DEC(&pcurrent->refcount); - } -} gdk_return log_tstart(logger *lg, bool flushnow, ulng *log_file_id) @@ -3188,7 +3202,7 @@ log_tstart(logger *lg, bool flushnow, ul assert(ATOMIC_GET(&lg->nr_flushers) == 0); ulng end = ATOMIC_GET(¤t->end); - assert(ATOMIC_GET(¤t->flushed_end) == end); + assert(!ATOMIC_GET(¤t->flushed_end) || ATOMIC_GET(¤t->flushed_end) == end); if (ATOMIC_GET(¤t->pend) < end) { lg->id++; log_open_output(lg); diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h --- a/gdk/gdk_logger_internals.h +++ b/gdk/gdk_logger_internals.h @@ -14,8 +14,8 @@ #define FLUSH_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum number of transactions committing simultaneously */ typedef struct logged_range_t { - ulng id; /* log file id */ - ulng last_ts; /* last stored timestamp */ + ulng id; /* log file id */ + ATOMIC_TYPE last_ts; /* last stored timestamp */ struct logged_range_t *next; ATOMIC_TYPE refcount; ATOMIC_TYPE end; diff --git a/gdk/gdk_logger_old.c b/gdk/gdk_logger_old.c --- a/gdk/gdk_logger_old.c +++ b/gdk/gdk_logger_old.c @@ -1658,7 +1658,6 @@ logger_load(const char *fn, char filenam BBPrelease(bids[p]); } logbat_destroy(lg->del); - GDKfree(lg->local_dir); GDKfree(lg); return GDK_FAIL; } _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org