Changeset: 9cfebdb5fbe1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/9cfebdb5fbe1
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger_internals.h
        gdk/gdk_logger_old.c
Branch: logger-cleanup
Log Message:

fix flush range cleanup


diffs (206 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1064,14 +1064,14 @@ static gdk_return
 log_open_output(logger *lg)
 {
        logged_range *new_range = 
(logged_range*)GDKmalloc(sizeof(logged_range));
-       ATOMIC_INIT(&new_range->refcount, 1);
 
 
        if (!new_range) {
                TRC_CRITICAL(GDK, "allocation failure\n");
                return GDK_FAIL;
        }
-
+       ATOMIC_INIT(&new_range->refcount, 1);
+       ATOMIC_INIT(&new_range->last_ts, 0);
        ATOMIC_INIT(&new_range->end, 0);
        ATOMIC_INIT(&new_range->pend, 0);
        ATOMIC_INIT(&new_range->flushed_end, 0);
@@ -1108,7 +1108,6 @@ log_open_output(logger *lg)
                GDKfree(filename);
        }
        new_range->id = lg->id;
-       new_range->last_ts = 0;
        new_range->next = NULL;
        logged_range* current = lg->current;
        assert(current && current->next == NULL);
@@ -2323,9 +2322,9 @@ log_next_logfile(logger *lg, ulng ts)
        int m = (GDKdebug & FORCEMITOMASK)?1000:100;
        if (!lg->pending || !lg->pending->next)
                return 0;
-       if (lg->pending != lg->current && lg->pending->last_ts <= ts) {
+       if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != 
lg->current && (ulng) ATOMIC_GET(&lg->pending->last_ts) <= ts) {
                logged_range *p = lg->pending;
-               for(int i = 1; i<m && p->next && p->next != lg->current && 
p->last_ts <= ts; i++)
+               for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next 
&& p->next != lg->current && (ulng) ATOMIC_GET(&p->last_ts) <= ts; i++)
                        p = p->next;
                return p->id;
        }
@@ -2348,8 +2347,8 @@ gdk_return
 log_activate(logger *lg)
 {
        gdk_return res = GDK_SUCCEED;
-       rotation_lock(lg); /*protects against potential concurrent write in 
log_tflush*/
-       if (lg->current->drops > 100000 && ATOMIC_GET(&lg->current->end) > 0 && 
lg->saved_id+1 == lg->id) {
+       rotation_lock(lg);
+       if (lg->current->drops > 100000 && ((ulng) 
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && 
lg->saved_id+1 == lg->id) {
                lg->id++;
                log_close_output(lg);
                /* start new file */
@@ -2863,7 +2862,7 @@ check_rotation_conditions(logger *lg) {
        const lng log_large = (GDKdebug & FORCEMITOMASK)?LOG_MINI:LOG_LARGE;
        return
                (lg->saved_id+1 < lg->id && lg->current->drops > 100000) ||
-               (p > log_large || (((lng) ATOMIC_GET(&lg->current->end) * 1024) 
> log_large));
+               (p > log_large || (((lng) ATOMIC_GET(&lg->current->end) - (lng) 
ATOMIC_GET(&lg->current->pend)) * 1024) > log_large);
 }
 
 gdk_return
@@ -2924,8 +2923,42 @@ log_tdone(logger* lg, logged_range *rang
        if (lg->debug & 1)
                fprintf(stderr, "#log_tdone " LLFMT "\n", commit_ts);
 
-       if (range->last_ts < commit_ts)
-               range->last_ts = commit_ts;
+       if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
+               ATOMIC_SET(&range->last_ts, commit_ts);
+}
+
+static void
+do_rotate(logger *lg) {
+       logged_range* next      = lg->current->next;
+       if (next) {
+               assert(ATOMIC_GET(&next->refcount) == 1);
+               lng end = ATOMIC_GET(&lg->current->end);
+               ATOMIC_SET(&next->pend, end);
+               ATOMIC_SET(&next->end, end);
+               logged_range* pcurrent = lg->current;
+               assert(ATOMIC_GET(&pcurrent->refcount) > 0);
+               lg->current = lg->current->next;
+               (void) ATOMIC_DEC(&pcurrent->refcount);
+       }
+}
+
+static logged_range*
+do_flush_range_cleanup(logger* lg) {
+
+       logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
+       while ( frange->next) {
+               if (ATOMIC_GET(&frange->refcount))
+                       break;
+               if (ATOMIC_PTR_CAS(&lg->flush_ranges, &frange, frange->next)) {
+                       close_stream(frange->output_log);
+                       frange->output_log = NULL;
+                       frange = frange->next;
+               }
+               else /*some other flusher is cleaning up the flush ranges*/
+                       break;
+       }
+
+       return frange;
 }
 
 gdk_return
@@ -2933,32 +2966,27 @@ log_tflush(logger* lg, ulng log_file_id,
 
        if (lg->flushnow) {
                logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
+               assert(frange == lg->current);
+               ulng end = ATOMIC_GET(&lg->current->end);
+               assert(end > ATOMIC_GET(&lg->current->pend));
+               ATOMIC_SET(&lg->current->flushed_end, end);
+               log_tdone(lg, lg->current, commit_ts);
+               lg->id++;
+               lg->flushnow = 0;
+               if (log_open_output(lg) != GDK_SUCCEED)
+                       GDKfatal("Could not create new log file\n"); // TODO: 
does not have to be fatal (yet)
+               do_rotate(lg);
                (void) ATOMIC_DEC(&frange->refcount);
                assert(ATOMIC_GET(&frange->refcount) == 0);
-               assert(frange == lg->current);
-               assert(ATOMIC_GET(&lg->current->end) 
==ATOMIC_GET(&lg->current->pend));
-               lg->flushnow = 0;
-               if (log_commit(lg) == GDK_SUCCEED)
-                       log_tdone(lg, lg->current, commit_ts);
-               else
-                       return GDK_FAIL;
+               (void) do_flush_range_cleanup(lg);
+               return log_commit(lg);
        }
 
        if (LOG_DISABLED(lg)) {
                return GDK_SUCCEED;
        }
 
-       logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
-       logged_range* first = frange;
-       while ( frange->next) {
-               if (ATOMIC_GET(&frange->refcount))
-                       break;
-               close_stream(frange->output_log);
-               frange->output_log = NULL;
-               frange = frange->next;
-       }
-       if (frange != first)
-               ATOMIC_PTR_CAS(&lg->flush_ranges, &first, frange);
+       logged_range* frange = do_flush_range_cleanup(lg);
 
        lng end = (lng) log_file_id;
        while ((lng) ATOMIC_GET(&frange->end) < end) {
@@ -3156,20 +3184,6 @@ log_find_bat(logger *lg, log_id id)
 }
 
 
-static inline void
-do_rotate(logger *lg) {
-       logged_range* next      = lg->current->next;
-       if (next) {
-               assert(ATOMIC_GET(&next->refcount) == 1);
-               lng end = ATOMIC_GET(&lg->current->end);
-               ATOMIC_SET(&next->pend, end);
-               ATOMIC_SET(&next->end, end);
-               logged_range* pcurrent = lg->current;
-               assert(ATOMIC_GET(&pcurrent->refcount) > 0);
-               lg->current = lg->current->next;
-               (void) ATOMIC_DEC(&pcurrent->refcount);
-       }
-}
 
 gdk_return
 log_tstart(logger *lg, bool flushnow, ulng *log_file_id)
@@ -3188,7 +3202,7 @@ log_tstart(logger *lg, bool flushnow, ul
                assert(ATOMIC_GET(&lg->nr_flushers) == 0);
 
                ulng end = ATOMIC_GET(&current->end);
-               assert(ATOMIC_GET(&current->flushed_end) == end);
+               assert(!ATOMIC_GET(&current->flushed_end) || 
ATOMIC_GET(&current->flushed_end) == end);
                if (ATOMIC_GET(&current->pend) < end) {
                        lg->id++;
                        log_open_output(lg);
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -14,8 +14,8 @@
 #define FLUSH_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum 
number of transactions committing simultaneously */
 
 typedef struct logged_range_t {
-       ulng id;                        /* log file id */
-       ulng last_ts;           /* last stored timestamp */
+       ulng id;                                /* log file id */
+       ATOMIC_TYPE last_ts;    /* last stored timestamp */
        struct logged_range_t *next;
        ATOMIC_TYPE refcount;
        ATOMIC_TYPE end;
diff --git a/gdk/gdk_logger_old.c b/gdk/gdk_logger_old.c
--- a/gdk/gdk_logger_old.c
+++ b/gdk/gdk_logger_old.c
@@ -1658,7 +1658,6 @@ logger_load(const char *fn, char filenam
                BBPrelease(bids[p]);
        }
        logbat_destroy(lg->del);
-       GDKfree(lg->local_dir);
        GDKfree(lg);
        return GDK_FAIL;
 }
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to