Changeset: 131122ca32d6 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/131122ca32d6
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger.h
        sql/storage/bat/bat_logger.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: pax-log
Log Message:

Introducing second sync.


diffs (167 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2885,6 +2885,51 @@ log_tend(logger *lg)
                (void) ATOMIC_DEC(&lg->refcount);
        return result;
 }
+
+static gdk_return
+log_tdone(logger *lg, ulng commit_ts)
+{
+       if (lg->debug & 1)
+               fprintf(stderr, "#log_tdone " LLFMT "\n", commit_ts);
+
+       if (lg->current) {
+               lg->current->last_ts = commit_ts;
+       }
+       return GDK_SUCCEED;
+}
+
+static gdk_return
+log_tcommit(logger *lg, ulng commit_ts)
+{
+       if (lg->debug & 1)
+               fprintf(stderr, "#log_tcommit " LLFMT "\n", commit_ts);
+
+       lg->end++;
+       if (LOG_DISABLED(lg)) {
+               return GDK_SUCCEED;
+       }
+
+       gdk_return result;
+       logformat l;
+       l.flag = LOG_COMMIT;
+       l.id = 0; // No purpose for now
+
+       if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) {
+               (void) ATOMIC_DEC(&lg->refcount);
+               return result;
+       }
+
+       if (mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
+                                       (!(GDKdebug & NOSYNCMASK) && 
mnstr_fsync(lg->output_log)) ||
+                                       new_logfile(lg) != GDK_SUCCEED) {
+
+       }
+
+
+
+       return GDK_SUCCEED;
+}
+
 static int
 request_number_flush_queue(logger *lg) {
        // Semaphore protects ring buffer structure in queue against overflowing
@@ -2934,18 +2979,6 @@ flush_queue_length(logger *lg) {
        return fql;
 }
 
-static gdk_return
-log_tdone(logger *lg, ulng commit_ts)
-{
-       if (lg->debug & 1)
-               fprintf(stderr, "#log_tdone " LLFMT "\n", commit_ts);
-
-       if (lg->current) {
-               lg->current->last_ts = commit_ts;
-       }
-       return GDK_SUCCEED;
-}
-
 gdk_return
 log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {
 
@@ -2959,7 +2992,7 @@ log_tflush(logger* lg, ulng log_file_id,
                return GDK_SUCCEED;
        }
 
-       if (log_file_id == lg->id) {
+       if (log_file_id == lg->id) { // TODO: this check might be a data race
                unsigned int number = request_number_flush_queue(lg);
 
                MT_lock_set(&lg->flush_lock);
@@ -2971,7 +3004,7 @@ log_tflush(logger* lg, ulng log_file_id,
                        if (mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
                                        (!(GDKdebug & NOSYNCMASK) && 
mnstr_fsync(lg->output_log)) ||
                                        new_logfile(lg) != GDK_SUCCEED) {
-                               /* flush failed */
+                               /* first flush failed */
                                MT_lock_unset(&lg->flush_lock);
                                (void) ATOMIC_DEC(&lg->refcount);
                                return GDK_FAIL;
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -74,6 +74,7 @@ gdk_export gdk_return log_bat_group_end(
 gdk_export gdk_return log_tstart(logger *lg, bool flushnow, ulng *log_file_id);
 gdk_export gdk_return log_tend(logger *lg);
 gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng 
commit_ts); /* Flush the WAL to disk using group commit */
+gdk_export gdk_return log_tcommit(logger *lg, ulng commit_ts); /* Flush the 
WAL to disk using group commit */
 
 gdk_export gdk_return log_tsequence(logger *lg, int seq, lng id);
 gdk_export log_bid log_find_bat(logger *lg, log_id id);
diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c
--- a/sql/storage/bat/bat_logger.c
+++ b/sql/storage/bat/bat_logger.c
@@ -3156,6 +3156,11 @@ bl_tflush(sqlstore *store, ulng log_file
 }
 
 static int
+bl_tcommit(sqlstore *store, ulng commit_ts) {
+       return log_tcommit(store->logger, commit_ts) == GDK_SUCCEED ? LOG_OK : 
LOG_ERR;
+}
+
+static int
 bl_sequence(sqlstore *store, int seq, lng id)
 {
        return log_tsequence(store->logger, seq, id) == GDK_SUCCEED ? LOG_OK : 
LOG_ERR;
@@ -3553,6 +3558,7 @@ bat_logger_init( logger_functions *lf )
        lf->log_tstart = bl_tstart;
        lf->log_tend = bl_tend;
        lf->log_tflush = bl_tflush;
+       lf->log_tcommit = bl_tcommit;
        lf->log_tsequence = bl_sequence;
        lf->get_snapshot_files = bl_snapshot;
 }
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -282,7 +282,8 @@ typedef int (*logger_get_sequence_fptr) 
 typedef int (*log_isnew_fptr)(struct sqlstore *store);
 typedef int (*log_tstart_fptr) (struct sqlstore *store, bool flush, ulng 
*log_file_id);
 typedef int (*log_tend_fptr) (struct sqlstore *store);
-typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng 
commit_tis);
+typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng 
commit_ts);
+typedef int (*log_tcommit_fptr) (struct sqlstore *store, ulng commit_ts);
 typedef lng (*log_save_id_fptr) (struct sqlstore *store);
 typedef int (*log_tsequence_fptr) (struct sqlstore *store, int seq, lng id);
 
@@ -316,6 +317,7 @@ typedef struct logger_functions {
        log_tstart_fptr log_tstart;
        log_tend_fptr log_tend;
        log_tflush_fptr log_tflush;
+       log_tcommit_fptr log_tcommit;
        log_save_id_fptr log_save_id;
        log_tsequence_fptr log_tsequence;
 } logger_functions;
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3989,11 +3989,14 @@ sql_trans_commit(sql_trans *tr)
                        if (!flush)
                                MT_lock_unset(&store->commit); /* release the 
commit log when flushing to disk */
                        if (ok == LOG_OK)
-                               ok = store->logger_api.log_tflush(store, 
log_file_id, commit_ts); /* flush/sync */
+                               ok = store->logger_api.log_tflush(store, 
log_file_id, commit_ts); /* first flush/sync */
                        if (!flush)
-                               MT_lock_set(&store->commit); /* release the 
commit log when flushing to disk */
+                               MT_lock_set(&store->commit);
+                       if (ok == LOG_OK)
+                               ok = store->logger_api.log_tcommit(store, 
commit_ts); /* write final commit and  */
                        if (flush)
                                MT_lock_unset(&store->flush);
+                       
                }
                MT_lock_unset(&store->commit);
                list_destroy(tr->changes);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to