Changeset: 428672c49b28 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/428672c49b28 Modified Files: gdk/gdk_logger.c gdk/gdk_logger.h gdk/gdk_logger_internals.h sql/storage/bat/bat_logger.c sql/storage/sql_storage.h sql/storage/store.c Branch: pax-log Log Message:
First working version of log_tcommit: Use the commit_queue to minimize the required WAL commit messages and second flushes. diffs (236 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -1265,6 +1265,10 @@ log_read_transaction(logger *lg) } } break; + case LOG_COMMIT: + assert(l.id > 0); + // TODO + break; default: err = LOG_ERR; } @@ -2198,6 +2202,7 @@ log_load(int debug, const char *fn, cons MT_lock_destroy(&lg->rotation_lock); MT_lock_destroy(&lg->flush_lock); log_queue_destroy(&lg->flush_queue); + log_queue_destroy(&lg->commit_queue); GDKfree(lg->fn); GDKfree(lg->dir); GDKfree(lg->local_dir); @@ -2267,6 +2272,7 @@ log_new(int debug, const char *fn, const MT_lock_init(&lg->flush_lock, "flush_lock"); log_queue_initialize(&lg->flush_queue, "flush_queue_semaphore", "flush_queue_lock"); + log_queue_initialize(&lg->commit_queue, "commit_queue_semaphore", "commit_queue_lock"); if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) { return lg; @@ -2316,6 +2322,7 @@ log_destroy(logger *lg) MT_lock_destroy(&lg->rotation_lock); MT_lock_destroy(&lg->flush_lock); log_queue_destroy(&lg->flush_queue); + log_queue_destroy(&lg->commit_queue); GDKfree(lg->fn); GDKfree(lg->dir); GDKfree(lg->buf); @@ -2937,8 +2944,8 @@ log_tend(logger *lg) l.flag = LOG_END; l.id = lg->tid; - if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) - (void) ATOMIC_DEC(&lg->refcount); + result = log_write_format(lg, &l); + (void) ATOMIC_DEC(&lg->refcount); return result; } @@ -2956,7 +2963,7 @@ log_tdone(logger *lg, ulng commit_ts) } gdk_return -log_tcommit(logger *lg, ulng commit_ts) +log_tcommit(logger *lg, ulng commit_ts, unsigned int commit_queue_number) { if (lg->debug & 1) fprintf(stderr, "#log_tcommit " LLFMT "\n", commit_ts); @@ -2968,25 +2975,44 @@ log_tcommit(logger *lg, ulng commit_ts) return GDK_SUCCEED; } - gdk_return result; - logformat l; - l.flag = LOG_COMMIT; - l.id = 0; // number of transactions to be committed; - - if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) { + log_queue* commit_queue = &lg->commit_queue; + + if (log_queue_has_number(commit_queue, commit_queue_number)) { + + log_queue* cq = &lg->commit_queue; + + const int cql = log_queue_length(cq); + gdk_return result; + logformat l; + l.flag = LOG_COMMIT; + l.id = cql; // number of transactions to be committed; + + /* if the log file being rotated at the moment, + * wait for it to finish*/ + MT_lock_set(&lg->rotation_lock); + (void) ATOMIC_INC(&lg->refcount); + MT_lock_unset(&lg->rotation_lock); + + if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) { + (void) ATOMIC_DEC(&lg->refcount); + return result; + } + else { + log_queue_truncate_left(cq, cql); + } + (void) ATOMIC_DEC(&lg->refcount); - return result; } return GDK_SUCCEED; } gdk_return -log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) { +log_tflush(logger* lg, ulng log_file_id, ulng commit_ts, unsigned int* commit_queue_number) { if (lg->flushnow) { lg->flushnow = 0; - if (!commit_ts) log_tdone(lg, commit_ts); // TODO: check if + if (!commit_queue_number) log_tdone(lg, commit_ts); // TODO: check if return log_commit(lg); } @@ -2994,8 +3020,6 @@ log_tflush(logger* lg, ulng log_file_id, return GDK_SUCCEED; } - (void) ATOMIC_DEC(&lg->refcount); - MT_lock_set(&lg->rotation_lock); if (log_file_id == lg->id) { // TODO: introduce lg->flushed and get rid of log_file_id in signature MT_lock_unset(&lg->rotation_lock); @@ -3028,8 +3052,11 @@ log_tflush(logger* lg, ulng log_file_id, * no need to do anything */ MT_lock_unset(&lg->rotation_lock); - - if (!commit_ts) log_tdone(lg, commit_ts); + if (commit_queue_number) { + *commit_queue_number = log_queue_request_number(&lg->commit_queue); + } + + if (!commit_queue_number) log_tdone(lg, commit_ts); MT_lock_unset(&lg->flush_lock); // TODO: request number for commit message queue. diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h --- a/gdk/gdk_logger.h +++ b/gdk/gdk_logger.h @@ -73,8 +73,8 @@ gdk_export gdk_return log_bat_group_end( gdk_export gdk_return log_tstart(logger *lg, bool flushnow, ulng *log_file_id); gdk_export gdk_return log_tend(logger *lg); -gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng commit_ts); /* Flush the WAL to disk using group commit */ -gdk_export gdk_return log_tcommit(logger *lg, ulng commit_ts); /* Flush the WAL to disk using group commit */ +gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng commit_ts, unsigned int* commit_queue_number); /* Flush the WAL to disk using group commit */ +gdk_export gdk_return log_tcommit(logger *lg, ulng commit_ts, unsigned int commit_queue_number); gdk_export gdk_return log_tsequence(logger *lg, int seq, lng id); gdk_export log_bid log_find_bat(logger *lg, log_id id); diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h --- a/gdk/gdk_logger_internals.h +++ b/gdk/gdk_logger_internals.h @@ -53,7 +53,7 @@ struct logger { stream *input_log; /* current stream to flush */ lng end; /* end of pre-allocated blocks for faster f(data)sync */ // TODO: only incremen when actual files writes occur. - ATOMIC_TYPE refcount; /* Number of active writers and flushers in the logger */ // TODO check refcount in c->log and c->end + ATOMIC_TYPE refcount; /* Number of active writers in the logger */ // TODO check if atomicity + rotation_lock is redundant, check refcount in c->log and c->end MT_Lock rotation_lock; MT_Lock lock; MT_Lock flush_lock; /* so only one transaction can flush to disk at any given time */ @@ -79,6 +79,7 @@ struct logger { size_t bufsize; log_queue flush_queue; + log_queue commit_queue; }; struct old_logger { diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c --- a/sql/storage/bat/bat_logger.c +++ b/sql/storage/bat/bat_logger.c @@ -3150,14 +3150,14 @@ bl_tend(sqlstore *store) } static int -bl_tflush(sqlstore *store, ulng log_file_id, ulng commit_ts) +bl_tflush(sqlstore *store, ulng log_file_id, ulng commit_ts, unsigned int* commit_queue_number) { - return log_tflush(store->logger, log_file_id, commit_ts) == GDK_SUCCEED ? LOG_OK : LOG_ERR; + return log_tflush(store->logger, log_file_id, commit_ts, commit_queue_number) == GDK_SUCCEED ? LOG_OK : LOG_ERR; } static int -bl_tcommit(sqlstore *store, ulng commit_ts) { - return log_tcommit(store->logger, commit_ts) == GDK_SUCCEED ? LOG_OK : LOG_ERR; +bl_tcommit(sqlstore *store, ulng commit_ts, unsigned int commit_queue_number) { + return log_tcommit(store->logger, commit_ts, commit_queue_number) == GDK_SUCCEED ? LOG_OK : LOG_ERR; } static int diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h --- a/sql/storage/sql_storage.h +++ b/sql/storage/sql_storage.h @@ -282,8 +282,8 @@ typedef int (*logger_get_sequence_fptr) typedef int (*log_isnew_fptr)(struct sqlstore *store); typedef int (*log_tstart_fptr) (struct sqlstore *store, bool flush, ulng *log_file_id); typedef int (*log_tend_fptr) (struct sqlstore *store); -typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng commit_ts); -typedef int (*log_tcommit_fptr) (struct sqlstore *store, ulng commit_ts); +typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng commit_ts, unsigned int* commit_queue_number); +typedef int (*log_tcommit_fptr) (struct sqlstore *store, ulng commit_ts, unsigned int commit_queue_number); typedef lng (*log_save_id_fptr) (struct sqlstore *store); typedef int (*log_tsequence_fptr) (struct sqlstore *store, int seq, lng id); diff --git a/sql/storage/store.c b/sql/storage/store.c --- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -3986,17 +3986,23 @@ sql_trans_commit(sql_trans *tr) store_unlock(store); /* flush the log structure */ if (log) { + unsigned int commit_queue_number; if (!flush) MT_lock_unset(&store->commit); /* release the commit log when flushing to disk */ if (ok == LOG_OK) - ok = store->logger_api.log_tflush(store, log_file_id, commit_ts); /* first flush/sync */ + ok = store->logger_api.log_tflush(store, log_file_id, commit_ts, &commit_queue_number); /* first flush/sync */ if (!flush) MT_lock_set(&store->commit); if (ok == LOG_OK) - ok = store->logger_api.log_tcommit(store, commit_ts); /* write final commit and */ - if (flush) + ok = store->logger_api.log_tcommit(store, commit_ts, commit_queue_number); /* write final commit message */ + if (!flush) + MT_lock_unset(&store->commit); + if (ok == LOG_OK) + ok = store->logger_api.log_tflush(store, log_file_id, commit_ts, NULL); /* second flush/sync */ + if (!flush) + MT_lock_set(&store->commit); + if (flush) MT_lock_unset(&store->flush); - } MT_lock_unset(&store->commit); list_destroy(tr->changes); _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org