Changeset: bb09d2c561ea for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/bb09d2c561ea Modified Files: gdk/gdk_logger.c Branch: logger-cleanup Log Message:
more cleanups diffs (164 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -2231,9 +2231,42 @@ log_new(int debug, const char *fn, const return NULL; } +static logged_range* +do_flush_range_cleanup(logger* lg) { + + logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges); + logged_range* first = frange; + + while ( frange->next) { + if (ATOMIC_GET(&frange->refcount) > 1) + break; + frange = frange->next; + } + if (first == frange) + return first; + + logged_range* flast = frange; + if (ATOMIC_PTR_CAS(&lg->flush_ranges, &first, flast)) { + for (frange = first; frange && frange != flast; frange = frange->next) { + (void) ATOMIC_DEC(&frange->refcount); + close_stream(frange->output_log); + frange->output_log = NULL; + frange = frange->next; + } + } + /* else some other flusher is cleaning up the flush ranges*/ + + return flast; +} + void log_destroy(logger *lg) { + log_close_input(lg); + logged_range* last = do_flush_range_cleanup(lg); + (void) last; + assert(last == lg->current && last == (logged_range*) ATOMIC_PTR_GET(&lg->flush_ranges)); + log_close_output(lg); for (logged_range *p = lg->pending; p; ){ logged_range *n = p->next; GDKfree(p); @@ -2268,7 +2301,6 @@ log_destroy(logger *lg) logbat_destroy(lg->catalog_lid); log_unlock(lg); } - ATOMIC_DESTROY(&lg->current->refcount); MT_lock_destroy(&lg->lock); MT_lock_destroy(&lg->rotation_lock); MT_lock_destroy(&lg->flush_lock); @@ -2277,8 +2309,6 @@ log_destroy(logger *lg) GDKfree(lg->fn); GDKfree(lg->dir); GDKfree(lg->buf); - log_close_input(lg); - log_close_output(lg); GDKfree(lg); } @@ -2348,9 +2378,8 @@ log_activate(logger *lg) { gdk_return res = GDK_SUCCEED; rotation_lock(lg); - if (lg->current->drops > 100000 && ((ulng) ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && lg->saved_id+1 == lg->id) { + if (!lg->current->next && lg->current->drops > 100000 && ((ulng) ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && lg->saved_id+1 == lg->id) { lg->id++; - log_close_output(lg); /* start new file */ res = log_open_output(lg); } @@ -2504,7 +2533,6 @@ log_constant(logger *lg, int type, ptr v } ok = wt(val, lg->current->output_log, 1); - log_unlock(lg); if (lg->debug & 1) fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr); @@ -2938,29 +2966,9 @@ do_rotate(logger *lg) { logged_range* pcurrent = lg->current; assert(ATOMIC_GET(&pcurrent->refcount) > 0); lg->current = lg->current->next; - (void) ATOMIC_DEC(&pcurrent->refcount); } } -static logged_range* -do_flush_range_cleanup(logger* lg) { - - logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges); - while ( frange->next) { - if (ATOMIC_GET(&frange->refcount)) - break; - if (ATOMIC_PTR_CAS(&lg->flush_ranges, &frange, frange->next)) { - close_stream(frange->output_log); - frange->output_log = NULL; - frange = frange->next; - } - else /*some other flusher is cleaning up the flush ranges*/ - break; - } - - return frange; -} - gdk_return log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) { @@ -2977,8 +2985,8 @@ log_tflush(logger* lg, ulng log_file_id, GDKfatal("Could not create new log file\n"); // TODO: does not have to be fatal (yet) do_rotate(lg); (void) ATOMIC_DEC(&frange->refcount); + (void) do_flush_range_cleanup(lg); assert(ATOMIC_GET(&frange->refcount) == 0); - (void) do_flush_range_cleanup(lg); return log_commit(lg); } @@ -3198,17 +3206,18 @@ log_tstart(logger *lg, bool flushnow, ul /* I am waiting until all existing flushers are done */ MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock); } - lg->flushnow = flushnow; assert(ATOMIC_GET(&lg->nr_flushers) == 0); ulng end = ATOMIC_GET(¤t->end); assert(!ATOMIC_GET(¤t->flushed_end) || ATOMIC_GET(¤t->flushed_end) == end); if (ATOMIC_GET(¤t->pend) < end) { lg->id++; - log_open_output(lg); + if (log_open_output(lg) != GDK_SUCCEED) + GDKfatal("Could not create new log file\n"); // TODO: does not have to be fatal (yet) } + lg->flushnow = flushnow; } - if (check_rotation_conditions(lg)) { + else if (check_rotation_conditions(lg)) { lg->id++; if (log_open_output(lg) != GDK_SUCCEED) GDKfatal("Could not create new log file\n"); // TODO: does not have to be fatal (yet) @@ -3216,8 +3225,10 @@ log_tstart(logger *lg, bool flushnow, ul do_rotate(lg); rotation_unlock(lg); - if (lg->flushnow && lg->saved_id+1 < lg->id) + if (lg->flushnow && lg->saved_id+1 < lg->id) { + (void) do_flush_range_cleanup(lg); log_flush(lg, (1ULL<<63)); + } (void) ATOMIC_INC(&lg->current->refcount); (void) ATOMIC_INC(&lg->current->end); @@ -3236,7 +3247,5 @@ log_tstart(logger *lg, bool flushnow, ul return GDK_FAIL; } - log_unlock(lg); - rotation_unlock(lg); return GDK_SUCCEED; } _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org