Changeset: bb09d2c561ea for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/bb09d2c561ea
Modified Files:
        gdk/gdk_logger.c
Branch: logger-cleanup
Log Message:

more cleanups


diffs (164 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2231,9 +2231,42 @@ log_new(int debug, const char *fn, const
        return NULL;
 }
 
+static logged_range*
+do_flush_range_cleanup(logger* lg) {
+
+       logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
+       logged_range* first = frange;
+       
+       while ( frange->next) {
+               if (ATOMIC_GET(&frange->refcount) > 1)
+                       break;
+               frange = frange->next;
+       }
+       if (first == frange)
+               return first;
+
+       logged_range* flast = frange;
+       if (ATOMIC_PTR_CAS(&lg->flush_ranges, &first, flast)) {
+               for (frange = first; frange && frange != flast; frange = 
frange->next) {
+                       (void) ATOMIC_DEC(&frange->refcount);
+                       close_stream(frange->output_log);
+                       frange->output_log = NULL;
+                       frange = frange->next;
+               }
+       }
+       /* else some other flusher is cleaning up the flush ranges*/
+
+       return flast;
+}
+
 void
 log_destroy(logger *lg)
 {
+       log_close_input(lg);
+       logged_range* last = do_flush_range_cleanup(lg);
+       (void) last;
+       assert(last == lg->current && last == (logged_range*) 
ATOMIC_PTR_GET(&lg->flush_ranges));
+       log_close_output(lg);
        for (logged_range *p = lg->pending; p; ){
                logged_range *n = p->next;
                GDKfree(p);
@@ -2268,7 +2301,6 @@ log_destroy(logger *lg)
                logbat_destroy(lg->catalog_lid);
                log_unlock(lg);
        }
-       ATOMIC_DESTROY(&lg->current->refcount);
        MT_lock_destroy(&lg->lock);
        MT_lock_destroy(&lg->rotation_lock);
        MT_lock_destroy(&lg->flush_lock);
@@ -2277,8 +2309,6 @@ log_destroy(logger *lg)
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->buf);
-       log_close_input(lg);
-       log_close_output(lg);
        GDKfree(lg);
 }
 
@@ -2348,9 +2378,8 @@ log_activate(logger *lg)
 {
        gdk_return res = GDK_SUCCEED;
        rotation_lock(lg);
-       if (lg->current->drops > 100000 && ((ulng) 
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && 
lg->saved_id+1 == lg->id) {
+       if (!lg->current->next && lg->current->drops > 100000 && ((ulng) 
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && 
lg->saved_id+1 == lg->id) {
                lg->id++;
-               log_close_output(lg);
                /* start new file */
                res = log_open_output(lg);
        }
@@ -2504,7 +2533,6 @@ log_constant(logger *lg, int type, ptr v
        }
 
        ok = wt(val, lg->current->output_log, 1);
-       log_unlock(lg);
 
        if (lg->debug & 1)
                fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
@@ -2938,29 +2966,9 @@ do_rotate(logger *lg) {
                logged_range* pcurrent = lg->current;
                assert(ATOMIC_GET(&pcurrent->refcount) > 0);
                lg->current = lg->current->next;
-               (void) ATOMIC_DEC(&pcurrent->refcount);
        }
 }
 
-static logged_range*
-do_flush_range_cleanup(logger* lg) {
-
-       logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
-       while ( frange->next) {
-               if (ATOMIC_GET(&frange->refcount))
-                       break;
-               if (ATOMIC_PTR_CAS(&lg->flush_ranges, &frange, frange->next)) {
-                       close_stream(frange->output_log);
-                       frange->output_log = NULL;
-                       frange = frange->next;
-               }
-               else /*some other flusher is cleaning up the flush ranges*/
-                       break;
-       }
-
-       return frange;
-}
-
 gdk_return
 log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {
 
@@ -2977,8 +2985,8 @@ log_tflush(logger* lg, ulng log_file_id,
                        GDKfatal("Could not create new log file\n"); // TODO: 
does not have to be fatal (yet)
                do_rotate(lg);
                (void) ATOMIC_DEC(&frange->refcount);
+               (void) do_flush_range_cleanup(lg);
                assert(ATOMIC_GET(&frange->refcount) == 0);
-               (void) do_flush_range_cleanup(lg);
                return log_commit(lg);
        }
 
@@ -3198,17 +3206,18 @@ log_tstart(logger *lg, bool flushnow, ul
                        /* I am waiting until all existing flushers are done */
                        MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
                }
-               lg->flushnow = flushnow;
                assert(ATOMIC_GET(&lg->nr_flushers) == 0);
 
                ulng end = ATOMIC_GET(&current->end);
                assert(!ATOMIC_GET(&current->flushed_end) || 
ATOMIC_GET(&current->flushed_end) == end);
                if (ATOMIC_GET(&current->pend) < end) {
                        lg->id++;
-                       log_open_output(lg);
+                       if (log_open_output(lg) != GDK_SUCCEED)
+                               GDKfatal("Could not create new log file\n"); // 
TODO: does not have to be fatal (yet)
                }
+               lg->flushnow = flushnow;
        }
-       if (check_rotation_conditions(lg)) {
+       else if (check_rotation_conditions(lg)) {
                lg->id++;
                if (log_open_output(lg) != GDK_SUCCEED)
                        GDKfatal("Could not create new log file\n"); // TODO: 
does not have to be fatal (yet)
@@ -3216,8 +3225,10 @@ log_tstart(logger *lg, bool flushnow, ul
        do_rotate(lg);
        rotation_unlock(lg);
 
-       if (lg->flushnow && lg->saved_id+1 < lg->id)
+       if (lg->flushnow && lg->saved_id+1 < lg->id) {
+               (void) do_flush_range_cleanup(lg);
                log_flush(lg, (1ULL<<63));
+       }
 
        (void) ATOMIC_INC(&lg->current->refcount);
        (void) ATOMIC_INC(&lg->current->end);
@@ -3236,7 +3247,5 @@ log_tstart(logger *lg, bool flushnow, ul
                return GDK_FAIL;
        }
 
-       log_unlock(lg);
-       rotation_unlock(lg);
        return GDK_SUCCEED;
 }
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to