Changeset: 428672c49b28 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/428672c49b28
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger.h
        gdk/gdk_logger_internals.h
        sql/storage/bat/bat_logger.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: pax-log
Log Message:

First working version of log_tcommit:
Use the commit_queue to minimize the required WAL commit messages
and second flushes.


diffs (236 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1265,6 +1265,10 @@ log_read_transaction(logger *lg)
                                }
                        }
                        break;
+               case LOG_COMMIT:
+                       assert(l.id > 0);
+                       // TODO
+                       break;
                default:
                        err = LOG_ERR;
                }
@@ -2198,6 +2202,7 @@ log_load(int debug, const char *fn, cons
        MT_lock_destroy(&lg->rotation_lock);
        MT_lock_destroy(&lg->flush_lock);
        log_queue_destroy(&lg->flush_queue);
+       log_queue_destroy(&lg->commit_queue);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->local_dir);
@@ -2267,6 +2272,7 @@ log_new(int debug, const char *fn, const
        MT_lock_init(&lg->flush_lock, "flush_lock");
 
        log_queue_initialize(&lg->flush_queue, "flush_queue_semaphore", 
"flush_queue_lock");
+       log_queue_initialize(&lg->commit_queue, "commit_queue_semaphore", 
"commit_queue_lock");
 
        if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
                return lg;
@@ -2316,6 +2322,7 @@ log_destroy(logger *lg)
        MT_lock_destroy(&lg->rotation_lock);
        MT_lock_destroy(&lg->flush_lock);
        log_queue_destroy(&lg->flush_queue);
+       log_queue_destroy(&lg->commit_queue);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->buf);
@@ -2937,8 +2944,8 @@ log_tend(logger *lg)
        l.flag = LOG_END;
        l.id = lg->tid;
 
-       if ((result = log_write_format(lg, &l)) != GDK_SUCCEED)
-               (void) ATOMIC_DEC(&lg->refcount);
+       result = log_write_format(lg, &l);
+       (void) ATOMIC_DEC(&lg->refcount);
 
        return result;
 }
@@ -2956,7 +2963,7 @@ log_tdone(logger *lg, ulng commit_ts)
 }
 
 gdk_return
-log_tcommit(logger *lg, ulng commit_ts)
+log_tcommit(logger *lg, ulng commit_ts, unsigned int commit_queue_number)
 {
        if (lg->debug & 1)
                fprintf(stderr, "#log_tcommit " LLFMT "\n", commit_ts);
@@ -2968,25 +2975,44 @@ log_tcommit(logger *lg, ulng commit_ts)
                return GDK_SUCCEED;
        }
 
-       gdk_return result;
-       logformat l;
-       l.flag = LOG_COMMIT;
-       l.id = 0; // number of transactions to be committed;
-
-       if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) {
+       log_queue* commit_queue = &lg->commit_queue;
+
+       if (log_queue_has_number(commit_queue, commit_queue_number)) {
+
+               log_queue* cq = &lg->commit_queue;
+
+               const int cql = log_queue_length(cq);
+               gdk_return result;
+               logformat l;
+               l.flag = LOG_COMMIT;
+               l.id = cql; // number of transactions to be committed;
+
+               /* if the log file being rotated at the moment,
+                * wait for it to finish*/
+               MT_lock_set(&lg->rotation_lock);
+               (void) ATOMIC_INC(&lg->refcount);
+               MT_lock_unset(&lg->rotation_lock);
+
+               if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) {
+                       (void) ATOMIC_DEC(&lg->refcount);
+                       return result;
+               }
+               else {
+                       log_queue_truncate_left(cq, cql);
+               }
+
                (void) ATOMIC_DEC(&lg->refcount);
-               return result;
        }
 
        return GDK_SUCCEED;
 }
 
 gdk_return
-log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {
+log_tflush(logger* lg, ulng log_file_id, ulng commit_ts, unsigned int* 
commit_queue_number) {
 
        if (lg->flushnow) {
                lg->flushnow = 0;
-               if (!commit_ts) log_tdone(lg, commit_ts); // TODO: check if 
+               if (!commit_queue_number) log_tdone(lg, commit_ts); // TODO: 
check if 
                return log_commit(lg);
        }
 
@@ -2994,8 +3020,6 @@ log_tflush(logger* lg, ulng log_file_id,
                return GDK_SUCCEED;
        }
 
-       (void) ATOMIC_DEC(&lg->refcount);
-
        MT_lock_set(&lg->rotation_lock);
        if (log_file_id == lg->id) { // TODO: introduce lg->flushed and get rid 
of log_file_id in signature
                MT_lock_unset(&lg->rotation_lock);
@@ -3028,8 +3052,11 @@ log_tflush(logger* lg, ulng log_file_id,
                * no need to do anything */
                MT_lock_unset(&lg->rotation_lock);
 
-
-       if (!commit_ts) log_tdone(lg, commit_ts);
+       if (commit_queue_number) {
+               *commit_queue_number = 
log_queue_request_number(&lg->commit_queue);
+       }
+
+       if (!commit_queue_number) log_tdone(lg, commit_ts);
        MT_lock_unset(&lg->flush_lock);
        // TODO: request number for commit message queue.
 
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -73,8 +73,8 @@ gdk_export gdk_return log_bat_group_end(
 
 gdk_export gdk_return log_tstart(logger *lg, bool flushnow, ulng *log_file_id);
 gdk_export gdk_return log_tend(logger *lg);
-gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng 
commit_ts); /* Flush the WAL to disk using group commit */
-gdk_export gdk_return log_tcommit(logger *lg, ulng commit_ts); /* Flush the 
WAL to disk using group commit */
+gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng commit_ts, 
unsigned int* commit_queue_number); /* Flush the WAL to disk using group commit 
*/
+gdk_export gdk_return log_tcommit(logger *lg, ulng commit_ts, unsigned int 
commit_queue_number);
 
 gdk_export gdk_return log_tsequence(logger *lg, int seq, lng id);
 gdk_export log_bid log_find_bat(logger *lg, log_id id);
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -53,7 +53,7 @@ struct logger {
        stream *input_log;      /* current stream to flush */
        lng end;                /* end of pre-allocated blocks for faster 
f(data)sync */ // TODO: only incremen when actual files writes occur.
 
-       ATOMIC_TYPE refcount; /* Number of active writers and flushers in the 
logger */ // TODO check refcount in c->log and c->end
+       ATOMIC_TYPE refcount; /* Number of active writers in the logger */ // 
TODO check if atomicity + rotation_lock is redundant, check refcount in c->log 
and c->end
        MT_Lock rotation_lock;
        MT_Lock lock;
        MT_Lock flush_lock; /* so only one transaction can flush to disk at any 
given time */
@@ -79,6 +79,7 @@ struct logger {
        size_t bufsize;
 
        log_queue flush_queue;
+       log_queue commit_queue;
 };
 
 struct old_logger {
diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c
--- a/sql/storage/bat/bat_logger.c
+++ b/sql/storage/bat/bat_logger.c
@@ -3150,14 +3150,14 @@ bl_tend(sqlstore *store)
 }
 
 static int
-bl_tflush(sqlstore *store, ulng log_file_id, ulng commit_ts)
+bl_tflush(sqlstore *store, ulng log_file_id, ulng commit_ts, unsigned int* 
commit_queue_number)
 {
-       return log_tflush(store->logger, log_file_id, commit_ts) == GDK_SUCCEED 
? LOG_OK : LOG_ERR;
+       return log_tflush(store->logger, log_file_id, commit_ts, 
commit_queue_number) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
 }
 
 static int
-bl_tcommit(sqlstore *store, ulng commit_ts) {
-       return log_tcommit(store->logger, commit_ts) == GDK_SUCCEED ? LOG_OK : 
LOG_ERR;
+bl_tcommit(sqlstore *store, ulng commit_ts, unsigned int commit_queue_number) {
+       return log_tcommit(store->logger, commit_ts, commit_queue_number) == 
GDK_SUCCEED ? LOG_OK : LOG_ERR;
 }
 
 static int
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -282,8 +282,8 @@ typedef int (*logger_get_sequence_fptr) 
 typedef int (*log_isnew_fptr)(struct sqlstore *store);
 typedef int (*log_tstart_fptr) (struct sqlstore *store, bool flush, ulng 
*log_file_id);
 typedef int (*log_tend_fptr) (struct sqlstore *store);
-typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng 
commit_ts);
-typedef int (*log_tcommit_fptr) (struct sqlstore *store, ulng commit_ts);
+typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng 
commit_ts, unsigned int* commit_queue_number);
+typedef int (*log_tcommit_fptr) (struct sqlstore *store, ulng commit_ts, 
unsigned int commit_queue_number);
 typedef lng (*log_save_id_fptr) (struct sqlstore *store);
 typedef int (*log_tsequence_fptr) (struct sqlstore *store, int seq, lng id);
 
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3986,17 +3986,23 @@ sql_trans_commit(sql_trans *tr)
                store_unlock(store);
                /* flush the log structure */
                if (log) {
+                       unsigned int commit_queue_number;
                        if (!flush)
                                MT_lock_unset(&store->commit); /* release the 
commit log when flushing to disk */
                        if (ok == LOG_OK)
-                               ok = store->logger_api.log_tflush(store, 
log_file_id, commit_ts); /* first flush/sync */
+                               ok = store->logger_api.log_tflush(store, 
log_file_id, commit_ts, &commit_queue_number); /* first flush/sync */
                        if (!flush)
                                MT_lock_set(&store->commit);
                        if (ok == LOG_OK)
-                               ok = store->logger_api.log_tcommit(store, 
commit_ts); /* write final commit and  */
-                       if (flush)
+                               ok = store->logger_api.log_tcommit(store, 
commit_ts, commit_queue_number); /* write final commit message */
+                       if (!flush)
+                               MT_lock_unset(&store->commit);
+                       if (ok == LOG_OK)
+                               ok = store->logger_api.log_tflush(store, 
log_file_id, commit_ts, NULL); /* second flush/sync */
+                       if (!flush)
+                               MT_lock_set(&store->commit);
+                        if (flush)
                                MT_lock_unset(&store->flush);
-                       
                }
                MT_lock_unset(&store->commit);
                list_destroy(tr->changes);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to