Changeset: e2da898a6a26 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/e2da898a6a26 Modified Files: sql/storage/store.c Branch: nilmask Log Message:
merged with default diffs (truncated from 690 to 300 lines): diff --git a/documentation/source/manual_pages/mclient.rst b/documentation/source/manual_pages/mclient.rst --- a/documentation/source/manual_pages/mclient.rst +++ b/documentation/source/manual_pages/mclient.rst @@ -196,7 +196,8 @@ SQL Options **--rows=**\ *nr* (**-r** *nr*) If specified, query results will be paged by an internal pager at the - specified number of lines. + specified number of lines. If set to **0** (zero), use the height of + the terminal. The default is **-1** which means no pager is used. **--width=**\ *nr* (**-w** *nr*) Specify the width of the screen. The default is the (initial) width @@ -260,7 +261,8 @@ General Commands **\\r** *rows* Use an internal pager using *rows* per page. If *rows* is **-1**, - stop using the internal pager. + stop using the internal pager, if *rows* is **0**, use the height of + the terminal. SQL Commands ------------ diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -35,7 +35,7 @@ static gdk_return log_del_bat(logger *lg #define LOG_CREATE 5 #define LOG_DESTROY 6 #define LOG_SEQ 7 -#define LOG_CLEAR 8 // DEPRECATED +#define LOG_CLEAR 8 /* DEPRECATED */ #define LOG_BAT_GROUP 9 #ifdef NATIVE_WIN32 @@ -61,7 +61,7 @@ static const char *log_commands[] = { "LOG_CREATE", "LOG_DESTROY", "LOG_SEQ", - "", // LOG_CLEAR IS DEPRECATED + "", /* LOG_CLEAR IS DEPRECATED */ "LOG_BAT_GROUP", }; @@ -376,10 +376,10 @@ log_read_updates(logger *lg, trans *tr, if (mnstr_readLng(lg->input_log, &offset) != 1) return LOG_ERR; if (cands) { - // This const range actually represents a segment of candidates corresponding to updated bat entries + /* This const range actually represents a segment of candidates corresponding to updated bat entries */ if (BATcount(*cands) == 0 || lg->flushing) { - // when flushing, we only need the offset and count of the last segment of inserts. + /* when flushing, we only need the offset and count of the last segment of inserts. */ assert((*cands)->ttype == TYPE_void); BATtseqbase(*cands, (oid) offset); BATsetcount(*cands, (BUN) nr); @@ -407,7 +407,7 @@ log_read_updates(logger *lg, trans *tr, BBPreclaim(dense); } - // We have to read the value to update the read cursor + /* We have to read the value to update the read cursor */ size_t tlen = lg->rbufsize; void *t = rt(lg->rbuf, &tlen, lg->input_log, 1); if (t == NULL) { @@ -573,7 +573,7 @@ log_read_updates(logger *lg, trans *tr, if (tr_grow(tr) == GDK_SUCCEED) { tr->changes[tr->nr].type = l->flag; if (l->flag==LOG_UPDATE_BULK && offset == -1) { - assert(cands); // bat r is part of a group of bats logged together. + assert(cands); /* bat r is part of a group of bats logged together. */ struct canditer ci; canditer_init(&ci, NULL, *cands); const oid first = canditer_peek(&ci); @@ -1077,15 +1077,14 @@ log_open_output(logger *lg) } ATOMIC_INIT(&new_range->refcount, 1); ATOMIC_INIT(&new_range->last_ts, 0); - ATOMIC_INIT(&new_range->end, 0); - ATOMIC_INIT(&new_range->pend, 0); - ATOMIC_INIT(&new_range->flushed_end, 0); - ATOMIC_INIT(&new_range->drops, 0); + ATOMIC_INIT(&new_range->flushed_ts, 0); + new_range->drops = 0; new_range->id = lg->id; new_range->next = NULL; logged_range* current = lg->current; assert(current && current->next == NULL); current->next = new_range; + ATOMIC_INC(&lg->nr_open_files); return GDK_SUCCEED; } @@ -1107,6 +1106,7 @@ log_close_output(logger *lg) close_stream(lg->current->output_log); } lg->current->output_log = NULL; + ATOMIC_INC(&lg->nr_open_files); } static gdk_return @@ -1154,7 +1154,7 @@ log_read_transaction(logger *lg) if (!lg->flushing) ATOMIC_AND(&GDKdebug, ~CHECKMASK); - BAT* cands = NULL; // used in case of LOG_BAT_GROUP + BAT* cands = NULL; /* used in case of LOG_BAT_GROUP */ while (err == LOG_OK && (ok=log_read_format(lg, &l))) { if (l.flag == 0 && l.id == 0) { @@ -1225,7 +1225,7 @@ log_read_transaction(logger *lg) err = LOG_EOF; else { if (l.id > 0) { - // START OF LOG_BAT_GROUP + /* START OF LOG_BAT_GROUP */ cands = COLnew(0, TYPE_void, 0, SYSTRANS); if (!cands) err = LOG_ERR; @@ -1234,7 +1234,7 @@ log_read_transaction(logger *lg) * above option earlier */ err = LOG_ERR; } else { - // END OF LOG_BAT_GROUP + /* END OF LOG_BAT_GROUP */ BBPunfix(cands->batCacheid); cands = NULL; } @@ -1315,7 +1315,6 @@ log_readlog(logger *lg, const char *file * (even if we would log aborts in the logs). So we simply * abort and move to the next log file */ return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED; - //return GDK_SUCCEED; } /* @@ -1380,8 +1379,8 @@ check_version(logger *lg, FILE *fp, cons } /* old_logger_load always closes fp */ if (old_logger_load(lg, fn, logdir, fp, version, filename) != GDK_SUCCEED) { - //loads drop no longer needed catalog, snapshots bats - //convert catalog_oid -> catalog_id (lng->int) + /*loads drop no longer needed catalog, snapshots bats */ + /*convert catalog_oid -> catalog_id (lng->int) */ GDKerror("Incompatible database version %06d, " "this server supports version %06d.\n%s", version, lg->version, @@ -2122,6 +2121,7 @@ log_load(const char *fn, const char *log logbat_destroy(lg->dseqs); ATOMIC_DESTROY(&lg->current->refcount); ATOMIC_DESTROY(&lg->nr_flushers); + ATOMIC_DESTROY(&lg->nr_open_files); MT_lock_destroy(&lg->lock); MT_lock_destroy(&lg->rotation_lock); GDKfree(lg->fn); @@ -2194,6 +2194,7 @@ log_new(int debug, const char *fn, const MT_lock_init(&lg->flush_lock, "flush_lock"); MT_cond_init(&lg->excl_flush_cv); ATOMIC_INIT(&lg->nr_flushers, 0); + ATOMIC_INIT(&lg->nr_open_files, 0); if (log_load(fn, logdir, lg, filename) == GDK_SUCCEED) { return lg; @@ -2208,7 +2209,7 @@ do_flush_range_cleanup(logger* lg) logged_range* frange = lg->flush_ranges; logged_range* first = frange; - while ( frange->next) { + while (frange->next) { if (ATOMIC_GET(&frange->refcount) > 1) break; frange = frange->next; @@ -2229,6 +2230,7 @@ do_flush_range_cleanup(logger* lg) TRC_INFO(WAL, "closing output log %s", mnstr_name(frange->output_log)); close_stream(frange->output_log); frange->output_log = NULL; + ATOMIC_INC(&lg->nr_open_files); } } return flast; @@ -2246,10 +2248,7 @@ log_destroy(logger *lg) logged_range *n = p->next; ATOMIC_DESTROY(&p->refcount); ATOMIC_DESTROY(&p->last_ts); - ATOMIC_DESTROY(&p->end); - ATOMIC_DESTROY(&p->pend); - ATOMIC_DESTROY(&p->flushed_end); - ATOMIC_DESTROY(&p->drops); + ATOMIC_DESTROY(&p->flushed_ts); GDKfree(p); p = n; } @@ -2286,6 +2285,7 @@ log_destroy(logger *lg) MT_lock_destroy(&lg->rotation_lock); MT_lock_destroy(&lg->flush_lock); ATOMIC_DESTROY(&lg->nr_flushers); + ATOMIC_DESTROY(&lg->nr_open_files); GDKfree(lg->fn); GDKfree(lg->dir); GDKfree(lg->rbuf); @@ -2333,9 +2333,13 @@ log_next_logfile(logger *lg, ulng ts) int m = (ATOMIC_GET(&GDKdebug) & FORCEMITOMASK)?1000:100; if (!lg->pending || !lg->pending->next) return 0; - if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && (ulng) ATOMIC_GET(&lg->pending->last_ts) <= ts) { + if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != lg->current && lg->pending != lg->flush_ranges && + (ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng) ATOMIC_GET(&lg->pending->flushed_ts) && + (ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) { logged_range *p = lg->pending; - for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current && (ulng) ATOMIC_GET(&p->last_ts) <= ts; i++) + for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next && p->next != lg->current && p->next != lg->flush_ranges && + (ulng) ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts) && + (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++) p = p->next; return p->id; } @@ -2359,10 +2363,6 @@ do_rotate(logger *lg) { logged_range* next = lg->current->next; if (next) { assert(ATOMIC_GET(&next->refcount) == 1); - ulng end = ATOMIC_GET(&lg->current->end); - ATOMIC_SET(&next->pend, end); - ATOMIC_SET(&next->end, end); - assert(ATOMIC_GET(&lg->current->refcount) > 0); lg->current = lg->current->next; } } @@ -2370,20 +2370,18 @@ do_rotate(logger *lg) { gdk_return log_activate(logger *lg) { - bool flush = false; + bool flush_cleanup = false; gdk_return res = GDK_SUCCEED; rotation_lock(lg); - if (!lg->flushnow && !lg->current->next && lg->current->drops > 100000 && ((ulng) ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && lg->saved_id+1 == lg->id) { + if (!lg->flushnow && !lg->current->next && lg->current->drops > 100000 && (ulng) ATOMIC_GET(&lg->current->last_ts) > 0 && lg->saved_id+1 == lg->id && ATOMIC_GET(&lg->current->refcount) == 1 /* no pending work on this file */) { lg->id++; /* start new file */ res = log_open_output(lg); - if(ATOMIC_GET(&lg->current->refcount) == 1) { - flush = true; - do_rotate(lg); - } + flush_cleanup = true; + do_rotate(lg); } rotation_unlock(lg); - if (flush) + if (flush_cleanup) (void) do_flush_range_cleanup(lg); return res; } @@ -2458,10 +2456,10 @@ log_flush(logger *lg, ulng ts) rotation_unlock(lg); } if (res != LOG_ERR) { - while(olid <= lid) { + while(olid < lid) { /* Try to cleanup, remove old log file, continue on failure! */ + olid++; (void)log_cleanup(lg, olid); - olid++; } } if (res == LOG_OK) @@ -2604,7 +2602,6 @@ internal_log_bat(logger *lg, BAT *b, log if (LOG_DISABLED(lg) || !nr) { /* logging is switched off */ - ATOMIC_ADD(&lg->current->end, nr); if (nr) return la_bat_update_count(lg, id, offset+cnt, lg->tid); return GDK_SUCCEED; @@ -2612,7 +2609,7 @@ internal_log_bat(logger *lg, BAT *b, log gdk_return (*wt) (const void *, stream *, size_t) = BATatoms[b->ttype].atomWrite; - if (lg->total_cnt == 0) // signals single bulk message or first part of bat logged in parts + if (lg->total_cnt == 0) /* signals single bulk message or first part of bat logged in parts */ if (log_write_format(lg, &l) != GDK_SUCCEED || !mnstr_writeLng(lg->current->output_log, total_cnt?total_cnt:cnt) || mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1 || @@ -2623,7 +2620,7 @@ internal_log_bat(logger *lg, BAT *b, log if (!total_cnt) total_cnt = cnt; lg->total_cnt += cnt; - if (lg->total_cnt == total_cnt) // This is the last to be logged part of this bat, we can already reset the total_cnt + if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */ lg->total_cnt = 0; /* if offset is just for the log, but BAT is already sliced, reset offset */ @@ -2706,7 +2703,6 @@ log_bat_persists(logger *lg, BAT *b, log return GDK_FAIL; } } - ATOMIC_INC(&lg->current->end); TRC_DEBUG(WAL, "id (%d) bat (%d)\n", id, b->batCacheid); gdk_return r = internal_log_bat(lg, b, id, 0, BATcount(b), 0, 0); _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org