On 2017-05-04 17:00:04 -0700, Andres Freund wrote: > Attached is a prototype patch for that.
Oops. Andres
>From b6eb46e376e40f3e2e9a55d16b1b37b27904564b Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Thu, 4 May 2017 16:40:52 -0700 Subject: [PATCH 1/2] WIP: Fix off-by-one around GetLastImportantRecPtr. --- src/backend/postmaster/bgwriter.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index dcb4cf249c..d409d977c0 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -325,10 +325,11 @@ BackgroundWriterMain(void) /* * Only log if enough time has passed and interesting records have - * been inserted since the last snapshot. + * been inserted since the last snapshot (it's <= because + * last_snapshot_lsn points at the end+1 of the record). */ if (now >= timeout && - last_snapshot_lsn < GetLastImportantRecPtr()) + last_snapshot_lsn <= GetLastImportantRecPtr()) { last_snapshot_lsn = LogStandbySnapshot(); last_snapshot_ts = now; -- 2.12.0.264.gd6db3f2165.dirty
>From 7ed2aeb832029f5602566a665b3f4dbe8baedfcd Mon Sep 17 00:00:00 2001 From: Andres Freund <and...@anarazel.de> Date: Thu, 4 May 2017 16:48:00 -0700 Subject: [PATCH 2/2] WIP: Possibly more robust snapbuild approach. --- contrib/test_decoding/expected/ondisk_startup.out | 15 +- contrib/test_decoding/specs/ondisk_startup.spec | 8 +- src/backend/replication/logical/decode.c | 3 - src/backend/replication/logical/snapbuild.c | 386 +++++++++++----------- src/include/replication/snapbuild.h | 25 +- 5 files changed, 215 insertions(+), 222 deletions(-) diff --git a/contrib/test_decoding/expected/ondisk_startup.out b/contrib/test_decoding/expected/ondisk_startup.out index 65115c830a..c7b1f45b46 100644 --- a/contrib/test_decoding/expected/ondisk_startup.out +++ b/contrib/test_decoding/expected/ondisk_startup.out @@ -1,21 +1,30 @@ Parsed test spec with 3 sessions -starting permutation: s2txid s1init s3txid s2alter s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start -step s2txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +starting permutation: s2b s2txid s1init s3b s3txid s2alter s2c s2b s2txid s3c s2c s1insert s1checkpoint s1start s1insert s1alter s1insert s1start +step s2b: BEGIN; +step s2txid: SELECT txid_current() IS NULL; ?column? f step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...> -step s3txid: BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; +step s3b: BEGIN; +step s3txid: SELECT txid_current() IS NULL; ?column? f step s2alter: ALTER TABLE do_write ADD COLUMN addedbys2 int; step s2c: COMMIT; +step s2b: BEGIN; +step s2txid: SELECT txid_current() IS NULL; +?column? + +f +step s3c: COMMIT; step s1init: <... completed> ?column? init +step s2c: COMMIT; step s1insert: INSERT INTO do_write DEFAULT VALUES; step s1checkpoint: CHECKPOINT; step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false'); diff --git a/contrib/test_decoding/specs/ondisk_startup.spec b/contrib/test_decoding/specs/ondisk_startup.spec index 8223705639..12c57a813d 100644 --- a/contrib/test_decoding/specs/ondisk_startup.spec +++ b/contrib/test_decoding/specs/ondisk_startup.spec @@ -24,7 +24,8 @@ step "s1alter" { ALTER TABLE do_write ADD COLUMN addedbys1 int; } session "s2" setup { SET synchronous_commit=on; } -step "s2txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s2b" { BEGIN; } +step "s2txid" { SELECT txid_current() IS NULL; } step "s2alter" { ALTER TABLE do_write ADD COLUMN addedbys2 int; } step "s2c" { COMMIT; } @@ -32,7 +33,8 @@ step "s2c" { COMMIT; } session "s3" setup { SET synchronous_commit=on; } -step "s3txid" { BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT txid_current() IS NULL; } +step "s3b" { BEGIN; } +step "s3txid" { SELECT txid_current() IS NULL; } step "s3c" { COMMIT; } # Force usage of ondisk snapshot by starting and not finishing a @@ -40,4 +42,4 @@ step "s3c" { COMMIT; } # reached. In combination with a checkpoint forcing a snapshot to be # written and a new restart point computed that'll lead to the usage # of the snapshot. -permutation "s2txid" "s1init" "s3txid" "s2alter" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start" +permutation "s2b" "s2txid" "s1init" "s3b" "s3txid" "s2alter" "s2c" "s2b" "s2txid" "s3c" "s2c" "s1insert" "s1checkpoint" "s1start" "s1insert" "s1alter" "s1insert" "s1start" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5c13d26099..68825ef598 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -622,9 +622,6 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, { int i; - SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid, - parsed->nsubxacts, parsed->subxacts); - for (i = 0; i < parsed->nsubxacts; i++) { ReorderBufferAbort(ctx->reorder, parsed->subxacts[i], diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 068d214fa1..1176d2059b 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -56,23 +56,34 @@ * * * The snapbuild machinery is starting up in several stages, as illustrated - * by the following graph: + * by the following graph describing the SnapBuild->state transitions: + * * +-------------------------+ - * +----|SNAPBUILD_START |-------------+ + * +----| START |-------------+ * | +-------------------------+ | * | | | * | | | - * | running_xacts with running xacts | + * | running_xacts #1 | * | | | * | | | * | v | * | +-------------------------+ v - * | |SNAPBUILD_FULL_SNAPSHOT |------------>| + * | | BUILDING_SNAPSHOT |------------>| * | +-------------------------+ | + * | | | + * | | | + * | running_xacts #2, xacts from #1 finished | + * | | | + * | | | + * | v | + * | +-------------------------+ v + * | | FULL_SNAPSHOT |------------>| + * | +-------------------------+ | + * | | | * running_xacts | saved snapshot * with zero xacts | at running_xacts's lsn * | | | - * | all running toplevel TXNs finished | + * | running_xacts with xacts from #2 finished | * | | | * | v | * | +-------------------------+ | @@ -83,7 +94,7 @@ * record is read that is sufficiently new (above the safe xmin horizon), * there's a state transition. If there were no running xacts when the * running_xacts record was generated, we'll directly go into CONSISTENT - * state, otherwise we'll switch to the FULL_SNAPSHOT state. Having a full + * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full * snapshot means that all transactions that start henceforth can be decoded * in their entirety, but transactions that started previously can't. In * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously @@ -184,6 +195,14 @@ struct SnapBuild ReorderBuffer *reorder; /* + * When can the next state be reached? + * + * FIXME: More accurate name, possibly split into two? + * FIXME: need to be moved into ->running.xmin or such for ABI compat. + */ + TransactionId started_collection_at; + + /* * Information about initially running transactions * * When we start building a snapshot there already may be transactions in @@ -203,7 +222,7 @@ struct SnapBuild size_t xcnt; /* number of used xip entries */ size_t xcnt_space; /* allocated size of xip */ TransactionId *xip; /* running xacts array, xidComparator-sorted */ - } running; + } running_old; /* * Array of transactions which could have catalog changes that committed @@ -249,12 +268,6 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* transaction state manipulation functions */ -static void SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); - -/* ->running manipulation */ -static bool SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid); - /* ->committed manipulation */ static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); @@ -269,6 +282,7 @@ static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr /* xlog reading helper functions for SnapBuildProcessRecord */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); +static void SnapBuildWaitSnapshot(xl_running_xacts *running); /* serialization functions */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); @@ -700,7 +714,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) * we got into the SNAPBUILD_FULL_SNAPSHOT state. */ if (builder->state < SNAPBUILD_CONSISTENT && - SnapBuildTxnIsRunning(builder, xid)) + TransactionIdPrecedes(xid, builder->started_collection_at)) return false; /* @@ -769,38 +783,6 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, } /* - * Check whether `xid` is currently 'running'. - * - * Running transactions in our parlance are transactions which we didn't - * observe from the start so we can't properly decode their contents. They - * only exist after we freshly started from an < CONSISTENT snapshot. - */ -static bool -SnapBuildTxnIsRunning(SnapBuild *builder, TransactionId xid) -{ - Assert(builder->state < SNAPBUILD_CONSISTENT); - Assert(TransactionIdIsNormal(builder->running.xmin)); - Assert(TransactionIdIsNormal(builder->running.xmax)); - - if (builder->running.xcnt && - NormalTransactionIdFollows(xid, builder->running.xmin) && - NormalTransactionIdPrecedes(xid, builder->running.xmax)) - { - TransactionId *search = - bsearch(&xid, builder->running.xip, builder->running.xcnt_space, - sizeof(TransactionId), xidComparator); - - if (search != NULL) - { - Assert(*search == xid); - return true; - } - } - - return false; -} - -/* * Add a new Snapshot to all transactions we're decoding that currently are * in-progress so they can see new catalog contents made by the transaction * that just committed. This is necessary because those in-progress @@ -922,63 +904,6 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) } /* - * Common logic for SnapBuildAbortTxn and SnapBuildCommitTxn dealing with - * keeping track of the amount of running transactions. - */ -static void -SnapBuildEndTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) -{ - if (builder->state == SNAPBUILD_CONSISTENT) - return; - - /* - * NB: This handles subtransactions correctly even if we started from - * suboverflowed xl_running_xacts because we only keep track of toplevel - * transactions. Since the latter are always allocated before their - * subxids and since they end at the same time it's sufficient to deal - * with them here. - */ - if (SnapBuildTxnIsRunning(builder, xid)) - { - Assert(builder->running.xcnt > 0); - - if (!--builder->running.xcnt) - { - /* - * None of the originally running transaction is running anymore, - * so our incrementally built snapshot now is consistent. - */ - ereport(LOG, - (errmsg("logical decoding found consistent point at %X/%X", - (uint32) (lsn >> 32), (uint32) lsn), - errdetail("Transaction ID %u finished; no more running transactions.", - xid))); - builder->state = SNAPBUILD_CONSISTENT; - } - } -} - -/* - * Abort a transaction, throw away all state we kept. - */ -void -SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn, - TransactionId xid, - int nsubxacts, TransactionId *subxacts) -{ - int i; - - for (i = 0; i < nsubxacts; i++) - { - TransactionId subxid = subxacts[i]; - - SnapBuildEndTxn(builder, lsn, subxid); - } - - SnapBuildEndTxn(builder, lsn, xid); -} - -/* * Handle everything that needs to be done when a transaction commits */ void @@ -1022,11 +947,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, TransactionId subxid = subxacts[nxact]; /* - * make sure txn is not tracked in running txn's anymore, switch state - */ - SnapBuildEndTxn(builder, lsn, subxid); - - /* * If we're forcing timetravel we also need visibility information * about subtransaction, so keep track of subtransaction's state. */ @@ -1055,12 +975,6 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } - /* - * Make sure toplevel txn is not tracked in running txn's anymore, switch - * state to consistent if possible. - */ - SnapBuildEndTxn(builder, lsn, xid); - if (forced_timetravel) { elog(DEBUG2, "forced transaction %u to do timetravel.", xid); @@ -1250,9 +1164,45 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * * a) There were no running transactions when the xl_running_xacts record * was inserted, jump to CONSISTENT immediately. We might find such a - * state we were waiting for b) or c). + * state while waiting for c) or d), e). * - * b) Wait for all toplevel transactions that were running to end. We + * b) This (in a previous run) or another decoding slot serialized a + * snapshot to disk that we can use. Can't use this method for the + * initial snapshot when slot is being created and needs full snapshot + * for export or direct use, as that snapshot will only contain catalog + * modifying transactions. + * + * c) First incrementally build a snapshot for catalog tuples + * (BUILDING_SNAPSHOT), that requires all, already in-progress, + * transactions to finish. Every transaction starting after that + * (FULL_SNAPSHOT state), has enough information to be decoded. But + * for older running transactions no viable snapshot exists yet, so + * CONSISTENT will only be reached once all of those have finished. + * + * c) In BUILDING_SNAPSHOT state (see d) ), and this xl_running_xacts' + * oldestRunningXid is >= than nextXid from when we switched to + * BUILDING_SNAPSHOT. Switch to FULL_SNAPSHOT. + * + * d) In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts' + * oldestRunningXid is >= than nextXid from when we switched to + * FULL_SNAPSHOT. Switch to CONSISTENT. + * + * e) In START state, and a xl_running_xacts record with running xacts is + * encountered. In that case, switch to BUILDING_SNAPSHOT state, and + * record xl_running_xacts->nextXid. Once all running xacts have + * finished (i.e. they're all >= nextXid), we have a complete snapshot. + * It might look that we could use xl_running_xact's ->xids information + * to get there quicker, but that is problematic because transactions + * marked as running, might already have inserted their commit record - + * it's infeasible to change that with locking. + + * + * d) In BUILDING_SNAPSHOT state (see c) ), and this xl_running_xacts' + * oldestRunningXid is newer than the + * + + + Wait for all toplevel transactions that were running to end. We * simply track the number of in-progress toplevel transactions and * lower it whenever one commits or aborts. When that number * (builder->running.xcnt) reaches zero, we can go from FULL_SNAPSHOT @@ -1264,11 +1214,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * subtransactions - and by extension suboverflowed xl_running_xacts - * at all. * - * c) This (in a previous run) or another decoding slot serialized a - * snapshot to disk that we can use. Can't use this method for the - * initial snapshot when slot is being created and needs full snapshot - * for export or direct use, as that snapshot will only contain catalog - * modifying transactions. + * Unfortunately there's a race condition around LogStandbySnapshot(), + * where transactions might have logged their commit record, before + * xl_running_xacts itself is logged. In that case the decoding logic + * would have missed that fact. Thus + * + * d) xl_running_xacts shows us that transaction(s) assumed to be still + * running have actually already finished. Adjust their status. * --- */ @@ -1291,10 +1243,13 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn /* * a) No transaction were running, we can jump to consistent. * + * This is not affected by races, because we can miss transaction commits, + * but we can't miss transactions starting (XXX: Not true if we relax locking!). + * * NB: We might have already started to incrementally assemble a snapshot, * so we need to be careful to deal with that. */ - if (running->xcnt == 0) + if (running->oldestRunningXid == running->nextXid) { if (builder->start_decoding_at == InvalidXLogRecPtr || builder->start_decoding_at <= lsn) @@ -1310,9 +1265,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn Assert(TransactionIdIsNormal(builder->xmax)); /* no transactions running now */ - builder->running.xcnt = 0; - builder->running.xmin = InvalidTransactionId; - builder->running.xmax = InvalidTransactionId; + builder->running_old.xcnt = 0; + builder->running_old.xmin = InvalidTransactionId; + builder->running_old.xmax = InvalidTransactionId; builder->state = SNAPBUILD_CONSISTENT; @@ -1323,30 +1278,29 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn return false; } - /* c) valid on disk state and not building full snapshot */ + /* b) valid on disk state and not building full snapshot */ else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { /* there won't be any state to cleanup */ return false; } - /* - * b) first encounter of a useable xl_running_xacts record. If we had - * found one earlier we would either track running transactions (i.e. - * builder->running.xcnt != 0) or be consistent (this function wouldn't - * get called). + * c) transition from START to BUILDING_SNAPSHOT. + * + * In START state, and a xl_running_xacts record with running xacts is + * encountered. In that case, switch to BUILDING_SNAPSHOT state, and + * record xl_running_xacts->nextXid. Once all running xacts have finished + * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It + * might look that we could use xl_running_xact's ->xids information to + * get there quicker, but that is problematic because transactions marked + * as running, might already have inserted their commit record - it's + * infeasible to change that with locking. */ - else if (!builder->running.xcnt) + else if (builder->state == SNAPBUILD_START) { - int off; - - /* - * We only care about toplevel xids as those are the ones we - * definitely see in the wal stream. As snapbuild.c tracks committed - * instead of running transactions we don't need to know anything - * about uncommitted subtransactions. - */ + builder->started_collection_at = running->nextXid; + builder->state = SNAPBUILD_BUILDING_SNAPSHOT; /* * Start with an xmin/xmax that's correct for future, when all the @@ -1360,59 +1314,59 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn Assert(TransactionIdIsNormal(builder->xmin)); Assert(TransactionIdIsNormal(builder->xmax)); - builder->running.xcnt = running->xcnt; - builder->running.xcnt_space = running->xcnt; - builder->running.xip = - MemoryContextAlloc(builder->context, - builder->running.xcnt * sizeof(TransactionId)); - memcpy(builder->running.xip, running->xids, - builder->running.xcnt * sizeof(TransactionId)); - - /* sort so we can do a binary search */ - qsort(builder->running.xip, builder->running.xcnt, - sizeof(TransactionId), xidComparator); - - builder->running.xmin = builder->running.xip[0]; - builder->running.xmax = builder->running.xip[running->xcnt - 1]; - - /* makes comparisons cheaper later */ - TransactionIdRetreat(builder->running.xmin); - TransactionIdAdvance(builder->running.xmax); - - builder->state = SNAPBUILD_FULL_SNAPSHOT; - ereport(LOG, (errmsg("logical decoding found initial starting point at %X/%X", (uint32) (lsn >> 32), (uint32) lsn), - errdetail_plural("%u transaction needs to finish.", - "%u transactions need to finish.", - builder->running.xcnt, - (uint32) builder->running.xcnt))); + errdetail("Waiting for transactions (approximately %d) older than %u to end.", + running->xcnt, running->nextXid))); - /* - * Iterate through all xids, wait for them to finish. - * - * This isn't required for the correctness of decoding, but to allow - * isolationtester to notice that we're currently waiting for - * something. - */ - for (off = 0; off < builder->running.xcnt; off++) - { - TransactionId xid = builder->running.xip[off]; + SnapBuildWaitSnapshot(running); + } + /* + * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT. + * + * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid + * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This + * means all transactions starting afterwards have enough information to + * be decoded. Switch to FULL_SNAPSHOT. + */ + else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && + TransactionIdPrecedesOrEquals(builder->started_collection_at, + running->oldestRunningXid)) + { + builder->state = SNAPBUILD_FULL_SNAPSHOT; + builder->started_collection_at = running->nextXid; - /* - * Upper layers should prevent that we ever need to wait on - * ourselves. Check anyway, since failing to do so would either - * result in an endless wait or an Assert() failure. - */ - if (TransactionIdIsCurrentTransactionId(xid)) - elog(ERROR, "waiting for ourselves"); + SnapBuildWaitSnapshot(running); - XactLockTableWait(xid, NULL, NULL, XLTW_None); - } - - /* nothing could have built up so far, so don't perform cleanup */ - return false; + ereport(LOG, + (errmsg("logical decoding found initial consistent point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("Waiting for transactions (approximately %d) older than %u to end.", + running->xcnt, running->nextXid))); + } + /* + * c) transition from FULL_SNAPSHOT to CONSISTENT. + * + * In FULL_SNAPSHOT state (see d) ), and this xl_running_xacts' + * oldestRunningXid is >= than nextXid from when we switched to + * FULL_SNAPSHOT. This means all transactions that are currently in + * progress have a catalog snapshot, and all their changes have been + * collected. Switch to CONSISTENT. + */ + else if (builder->state == SNAPBUILD_FULL_SNAPSHOT && + TransactionIdPrecedesOrEquals(builder->started_collection_at, + running->oldestRunningXid)) + { + builder->state = SNAPBUILD_CONSISTENT; + ereport(LOG, + (errmsg("logical decoding found consistent point at %X/%X", + (uint32) (lsn >> 32), (uint32) lsn), + errdetail("There are no old transactions anymore."))); + } + else + { + SnapBuildWaitSnapshot(running); } /* @@ -1421,8 +1375,35 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * records so incremental cleanup can be performed. */ return true; + } +/* + * Iterate through all xids in record, wait for them to finish. + * + * This isn't required for the correctness of decoding, but to allow + * isolationtester to notice that we're currently waiting for something. + */ +static void +SnapBuildWaitSnapshot(xl_running_xacts *running) +{ + int off; + + for (off = 0; off < running->xcnt; off++) + { + TransactionId xid = running->xids[off]; + + /* + * Upper layers should prevent that we ever need to wait on + * ourselves. Check anyway, since failing to do so would either + * result in an endless wait or an Assert() failure. + */ + if (TransactionIdIsCurrentTransactionId(xid)) + elog(ERROR, "waiting for ourselves"); + + XactLockTableWait(xid, NULL, NULL, XLTW_None); + } +} /* ----------------------------------- * Snapshot serialization support @@ -1572,7 +1553,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) errmsg("could not remove file \"%s\": %m", path))); needed_length = sizeof(SnapBuildOnDisk) + - sizeof(TransactionId) * builder->running.xcnt_space + + sizeof(TransactionId) * builder->running_old.xcnt_space + sizeof(TransactionId) * builder->committed.xcnt; ondisk_c = MemoryContextAllocZero(builder->context, needed_length); @@ -1591,7 +1572,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.context = NULL; ondisk->builder.snapshot = NULL; ondisk->builder.reorder = NULL; - ondisk->builder.running.xip = NULL; + ondisk->builder.running_old.xip = NULL; ondisk->builder.committed.xip = NULL; COMP_CRC32C(ondisk->checksum, @@ -1599,8 +1580,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) sizeof(SnapBuild)); /* copy running xacts */ - sz = sizeof(TransactionId) * builder->running.xcnt_space; - memcpy(ondisk_c, builder->running.xip, sz); + sz = sizeof(TransactionId) * builder->running_old.xcnt_space; + memcpy(ondisk_c, builder->running_old.xip, sz); COMP_CRC32C(ondisk->checksum, ondisk_c, sz); ondisk_c += sz; @@ -1763,10 +1744,10 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); /* restore running xacts information */ - sz = sizeof(TransactionId) * ondisk.builder.running.xcnt_space; - ondisk.builder.running.xip = MemoryContextAllocZero(builder->context, sz); + sz = sizeof(TransactionId) * ondisk.builder.running_old.xcnt_space; + ondisk.builder.running_old.xip = MemoryContextAllocZero(builder->context, sz); pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, ondisk.builder.running.xip, sz); + readBytes = read(fd, ondisk.builder.running_old.xip, sz); pgstat_report_wait_end(); if (readBytes != sz) { @@ -1776,7 +1757,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) errmsg("could not read file \"%s\", read %d of %d: %m", path, readBytes, (int) sz))); } - COMP_CRC32C(checksum, ondisk.builder.running.xip, sz); + COMP_CRC32C(checksum, ondisk.builder.running_old.xip, sz); /* restore committed xacts information */ sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; @@ -1842,11 +1823,12 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } ondisk.builder.committed.xip = NULL; - builder->running.xcnt = ondisk.builder.running.xcnt; - if (builder->running.xip) - pfree(builder->running.xip); - builder->running.xcnt_space = ondisk.builder.running.xcnt_space; - builder->running.xip = ondisk.builder.running.xip; + /* FIXME: remove */ + builder->running_old.xcnt = ondisk.builder.running_old.xcnt; + if (builder->running_old.xip) + pfree(builder->running_old.xip); + builder->running_old.xcnt_space = ondisk.builder.running_old.xcnt_space; + builder->running_old.xip = ondisk.builder.running_old.xip; /* our snapshot is not interesting anymore, build a new one */ if (builder->snapshot != NULL) @@ -1867,8 +1849,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) return true; snapshot_not_interesting: - if (ondisk.builder.running.xip != NULL) - pfree(ondisk.builder.running.xip); + if (ondisk.builder.running_old.xip != NULL) + pfree(ondisk.builder.running_old.xip); if (ondisk.builder.committed.xip != NULL) pfree(ondisk.builder.committed.xip); return false; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 494751d70a..ccb5f831c4 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -20,24 +20,30 @@ typedef enum /* * Initial state, we can't do much yet. */ - SNAPBUILD_START, + SNAPBUILD_START = -1, + + /* + * Collecting committed transactions, to build the initial catalog + * snapshot. + */ + SNAPBUILD_BUILDING_SNAPSHOT = 0, /* * We have collected enough information to decode tuples in transactions * that started after this. * * Once we reached this we start to collect changes. We cannot apply them - * yet because the might be based on transactions that were still running - * when we reached them yet. + * yet, because they might be based on transactions that were still running + * when FULL_SNAPSHOT was reached. */ - SNAPBUILD_FULL_SNAPSHOT, + SNAPBUILD_FULL_SNAPSHOT = 1, /* - * Found a point after hitting built_full_snapshot where all transactions - * that were running at that point finished. Till we reach that we hold - * off calling any commit callbacks. + * Found a point after SNAPBUILD_FULL_SNAPSHOT where all transactions that + * were running at that point finished. Till we reach that we hold off + * calling any commit callbacks. */ - SNAPBUILD_CONSISTENT + SNAPBUILD_CONSISTENT = 2 } SnapBuildState; /* forward declare so we don't have to expose the struct to the public */ @@ -73,9 +79,6 @@ extern bool SnapBuildXactNeedsSkip(SnapBuild *snapstate, XLogRecPtr ptr); extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, int nsubxacts, TransactionId *subxacts); -extern void SnapBuildAbortTxn(SnapBuild *builder, XLogRecPtr lsn, - TransactionId xid, int nsubxacts, - TransactionId *subxacts); extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn); extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, -- 2.12.0.264.gd6db3f2165.dirty
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers