On Fri, Dec 25, 2015 at 7:21 AM, Thomas Munro <thomas.mu...@enterprisedb.com> wrote: > On Fri, Dec 25, 2015 at 8:50 AM, Masahiko Sawada <sawada.m...@gmail.com> > wrote: >> On Wed, Dec 23, 2015 at 8:45 AM, Thomas Munro >> <thomas.mu...@enterprisedb.com> wrote: >>> On Wed, Dec 23, 2015 at 3:50 PM, Thomas Munro >>> <thomas.mu...@enterprisedb.com> wrote: >>>> If you got rid of SyncRepGetSyncStandbysOnePriority as suggested >>>> above, then this function could be renamed to SyncRepGetSyncStandbys. >>>> I think it would be a tiny bit nicer if it also took a Size n argument >>>> along with the output buffer pointer. >> >> Sorry, I could not get your point. SyncRepGetSyncStandbysPriority() >> function uses synchronous_standby_num which is global variable. >> But you mean that the number of synchronous standbys is given as >> function argument? > > Yeah, I was thinking of it as the output buffer size which I would be > inclined to make more explicit (I am still coming to terms with the > use of global variables in Postgres) but it doesn't matter, please > disregard that suggestion. > >>>> As for the body of that function (which I won't paste here), it >>>> contains an algorithm to find the top K elements in an array of N >>>> elements. It does that with a linear search through the top K seen so >>>> far for each value in the input array, so its worst case is O(KN) >>>> comparisons. Some of the sorting gurus on this list might have >>>> something to say about that but my take is that it seems fine for the >>>> tiny values of K and N that we're dealing with here, and it's nice >>>> that it doesn't need any space other than the output buffer, unlike >>>> some other top-K algorithms which would win for larger inputs. >> >> Yeah, it's improvement point. >> But I'm assumed that the number of synchronous replication is not >> large, so I use this algorithm as first version. >> And I think that its worst case is O(K(N-K)). Am I missing something? > > You're right, I was dropping that detail, in the tradition of the > hand-wavy school of big-O notation. (I suppose you could skip the > inner loop when the priority is lower than the current lowest > priority, giving a O(N) best case when the walsenders are perfectly > ordered by coincidence. Probably a bad idea or just not worth > worrying about.)
Thank you for reviewing the patch. Yeah, I added the logic that skip the inner loop. > >> Attached latest version patch. > > +/* > + * Obtain currently synced LSN location: write and flush, using priority > - * In 9.1 we support only a single synchronous standby, chosen from a > - * priority list of synchronous_standby_names. Before it can become the > + * In 9.6 we support multiple synchronous standby, chosen from a priority > > s/standby/standbys/ > > + * list of synchronous_standby_names. Before it can become the > > s/Before it can become the/Before any standby can become a/ > > * synchronous standby it must have caught up with the primary; that may > * take some time. Once caught up, the current highest priority standby > > s/standby/standbys/ > > * will release waiters from the queue. > > +bool > +SyncRepGetSyncLsnsPriority(XLogRecPtr *write_pos, XLogRecPtr *flush_pos) > +{ > + int sync_standbys[synchronous_standby_num]; > > I think this should be sync_standbys[SYNC_REP_MAX_SYNC_STANDBY_NUM]. > (Variable sized arrays are a feature of C99 and PostgreSQL is written > in C89.) > > +/* > + * Populate a caller-supplied array which much have enough space for > + * synchronous_standby_num. Returns position of standbys currently > + * considered as synchronous, and its length. > + */ > +int > +SyncRepGetSyncStandbys(int *sync_standbys) > > s/much/must/ (my bad, in previous email). > > + ereport(ERROR, > + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), > + errmsg("The number of synchronous standbys must be smaller than the > number of listed : %d", > + synchronous_standby_num))); > > How about "the number of synchronous standbys exceeds the length of > the standby list: %d"? Error messages usually start with lower case, > ':' is not usually preceded by a space. > > + ereport(ERROR, > + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), > + errmsg("The number of synchronous standbys must be between 1 and %d : %d", > > s/The/the/, s/ : /: / Fixed you mentioned. Attached latest v5 patch. Please review it. Regards, -- Masahiko Sawada
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 7f85b88..0c78919 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -29,10 +29,10 @@ * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * - * In 9.1 we support only a single synchronous standby, chosen from a - * priority list of synchronous_standby_names. Before it can become the - * synchronous standby it must have caught up with the primary; that may - * take some time. Once caught up, the current highest priority standby + * In 9.6 we support multiple synchronous standbys, chosen from a priority + * list of synchronous_standby_names. Before any standby can become a + * synchronous standbys it must have caught up with the primary; that may + * take some time. Once caught up, the current highest priority standbys * will release waiters from the queue. * * Portions Copyright (c) 2010-2016, PostgreSQL Global Development Group @@ -59,9 +59,15 @@ /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; +int synchronous_replication_method; +int synchronous_standby_num; #define SyncStandbysDefined() \ - (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') + (SyncRepStandbyNames != NULL && \ + SyncRepStandbyNames[0] != '\0' && \ + synchronous_standby_num > 0) + +#define SYNC_REP_MAX_SYNC_STANDBY_NUM 256 static bool announce_next_takeover = true; @@ -349,57 +355,185 @@ SyncRepInitConfig(void) } /* - * Find the WAL sender servicing the synchronous standby with the lowest - * priority value, or NULL if no synchronous standby is connected. If there - * are multiple standbys with the same lowest priority value, the first one - * found is selected. The caller must hold SyncRepLock. + * Is this wal sender managing a standby that is streaming and + * listed as a synchronous standby? */ -WalSnd * -SyncRepGetSynchronousStandby(void) +bool +SyncRepActiveListedWalSender(int num) { - WalSnd *result = NULL; - int result_priority = 0; - int i; + volatile WalSnd *walsnd = &WalSndCtl->walsnds[num]; + + /* Must be active */ + if (walsnd->pid == 0) + return false; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + return false; + + /* Must be synchronous */ + if (walsnd->sync_standby_priority == 0) + return false; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + return false; + + return true; +} + +/* + * Get both LSNs: write and flush, according to replication method. + * And confirm whether we have advanced to LSN or not. + */ +bool +SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos) +{ + XLogRecPtr tmp_write_pos; + XLogRecPtr tmp_flush_pos; + bool ret = false; + + /* '1-priority' and 'priority' method */ + if (synchronous_replication_method == SYNC_REP_METHOD_1_PRIORITY || + synchronous_replication_method == SYNC_REP_METHOD_PRIORITY) + ret = SyncRepGetSyncLsnsPriority(&tmp_write_pos, &tmp_flush_pos); + + /* Have we advanced LSN? */ + if (ret) + { + if (MyWalSnd->write >= tmp_write_pos) + *write_pos = tmp_write_pos; + if (MyWalSnd->flush >= tmp_flush_pos) + *flush_pos = tmp_flush_pos; + + return true; + } + + return false; +} + +/* + * Populate a caller-supplied array which must have enough space for + * synchronous_standby_num. Returns position of standbys currently + * considered as synchronous, and its length. + */ +int +SyncRepGetSyncStandbys(int *sync_standbys) +{ + int num_sync = 0; + + + /* '1-priority' and 'priority' method */ + if (synchronous_replication_method == SYNC_REP_METHOD_1_PRIORITY || + synchronous_replication_method == SYNC_REP_METHOD_PRIORITY) + num_sync = SyncRepGetSyncStandbysPriority(sync_standbys); + + return num_sync; +} + +/* + * Populates a caller-supplied buffer with the walsnds indexes of the + * highest priority active synchronous standbys, up to the a limit of + * 'synchronous_standby_num'. The order of the results is undefined. + * Return the number of results actually written. + */ +int +SyncRepGetSyncStandbysPriority(int *sync_standbys) +{ + int priority = 0; + int num_sync = 0; + int i; for (i = 0; i < max_wal_senders; i++) { /* Use volatile pointer to prevent code rearrangement */ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; - int this_priority; + int j; - /* Must be active */ - if (walsnd->pid == 0) + /* Is this wal sender considerable one? */ + if (!SyncRepActiveListedWalSender(i)) continue; - /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) - continue; + if (num_sync == synchronous_standby_num && + walsnd->sync_standby_priority < priority) + { + for (j = 0; j < num_sync; j++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[j]]; + + /* Found lowest priority standby, so replace it */ + if (walsndloc->sync_standby_priority == priority && + walsnd->sync_standby_priority < priority) + sync_standbys[j] = i; + + /* Update highest priority standby */ + if (priority < walsndloc->sync_standby_priority) + priority = walsndloc->sync_standby_priority; + } + } + else + { + sync_standbys[num_sync] = i; + num_sync++; - /* Must be synchronous */ - this_priority = walsnd->sync_standby_priority; - if (this_priority == 0) - continue; + /* Keep track highest priority standby */ + if (priority < walsnd->sync_standby_priority) + priority = walsnd->sync_standby_priority; + } + } - /* Must have a lower priority value than any previous ones */ - if (result != NULL && result_priority <= this_priority) - continue; + return num_sync; +} - /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) +/* + * Obtain currently synced LSN location: write and flush, using priority + * method. + */ +bool +SyncRepGetSyncLsnsPriority(XLogRecPtr *write_pos, XLogRecPtr *flush_pos) +{ + int sync_standbys[SYNC_REP_MAX_SYNC_STANDBY_NUM]; + int num_sync; + int i; + XLogRecPtr synced_write = InvalidXLogRecPtr; + XLogRecPtr synced_flush = InvalidXLogRecPtr; + + num_sync = SyncRepGetSyncStandbysPriority(sync_standbys); + + /* Just return, if sync standby is not enough */ + if (num_sync < synchronous_standby_num) + { + return false; + } + + for (i = 0; i < num_sync; i++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; + + SpinLockAcquire(&walsndloc->mutex); + + /* Store first candidate */ + if (XLogRecPtrIsInvalid(synced_write) && XLogRecPtrIsInvalid(synced_flush)) + { + synced_write = walsndloc->write; + synced_flush = walsndloc->flush; + SpinLockRelease(&walsndloc->mutex); continue; + } - result = (WalSnd *) walsnd; - result_priority = this_priority; + /* Keep/Collect the earliest write and flush LSNs among prioritized standbys */ + if (synced_write > walsndloc->write) + synced_write = walsndloc->write; + if (synced_flush > walsndloc->flush) + synced_flush = walsndloc->flush; - /* - * If priority is equal to 1, there cannot be any other WAL senders - * with a lower priority, so we're done. - */ - if (this_priority == 1) - return result; + SpinLockRelease(&walsndloc->mutex); } - return result; + *write_pos = synced_write; + *flush_pos = synced_flush; + + return true; } /* @@ -413,9 +547,9 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - WalSnd *syncWalSnd; - int numwrite = 0; - int numflush = 0; + XLogRecPtr write_pos = InvalidXLogRecPtr; + XLogRecPtr flush_pos = InvalidXLogRecPtr; + int numwrite, numflush; /* * If this WALSender is serving a standby that is not on the list of @@ -428,23 +562,12 @@ SyncRepReleaseWaiters(void) XLogRecPtrIsInvalid(MyWalSnd->flush)) return; - /* - * We're a potential sync standby. Release waiters if we are the highest - * priority standby. - */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - syncWalSnd = SyncRepGetSynchronousStandby(); - - /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); - /* - * If we aren't managing the highest priority standby then just leave. - */ - if (syncWalSnd != MyWalSnd) + /* Get currently synced LSNs according to replication method */ + if (!(SyncRepSyncedLsnAdvancedTo(&write_pos, &flush_pos))) { LWLockRelease(SyncRepLock); - announce_next_takeover = true; return; } @@ -452,14 +575,14 @@ SyncRepReleaseWaiters(void) * Set the lsn first so that when we wake backends they will release up to * this location. */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < write_pos) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flush_pos) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } @@ -498,6 +621,7 @@ SyncRepGetStandbyPriority(void) ListCell *l; int priority = 0; bool found = false; + int num = 0; /* * Since synchronous cascade replication is not allowed, we always set the @@ -506,6 +630,10 @@ SyncRepGetStandbyPriority(void) if (am_cascading_walsender) return 0; + /* If no synchronous standby allowed, no cake for this WAL sender */ + if (!SyncStandbysDefined()) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); @@ -521,8 +649,16 @@ SyncRepGetStandbyPriority(void) foreach(l, elemlist) { - char *standby_name = (char *) lfirst(l); + char *standby_name; + + if (num == 0 && + synchronous_replication_method != SYNC_REP_METHOD_1_PRIORITY) + { + num = lfirst_int(l); + continue; + } + standby_name = (char *) lfirst(l); priority++; if (pg_strcasecmp(standby_name, application_name) == 0 || @@ -683,7 +819,6 @@ SyncRepQueueIsOrderedByLSN(int mode) * Synchronous Replication functions executed by any process * =========================================================== */ - bool check_synchronous_standby_names(char **newval, void **extra, GucSource source) { @@ -733,3 +868,42 @@ assign_synchronous_commit(int newval, void *extra) break; } } + +void +ProcessSynchronousReplicationConfig(void) +{ + char *rawstring; + List *elemlist; + + + /* Need a modifiable copy of string */ + rawstring = pstrdup(SyncRepStandbyNames); + + /* Parse string into list of identifiers */ + SplitIdentifierString(rawstring, ',', &elemlist); + + /* Store them into globl variables */ + if (synchronous_replication_method == SYNC_REP_METHOD_1_PRIORITY) + synchronous_standby_num = 1; + else + synchronous_standby_num = pg_atoi(lfirst(list_head(elemlist)), sizeof(int), 0); + + /* In 'priority' method, additional validation checks for synchronous_standby_num */ + if (synchronous_replication_method == SYNC_REP_METHOD_PRIORITY) + { + if ((list_length(elemlist) - 1) < synchronous_standby_num) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("the number of synchronous standbys exceed the length of the standby list: %d", + synchronous_standby_num))); + + if (1 > synchronous_standby_num || + synchronous_standby_num > SYNC_REP_MAX_SYNC_STANDBY_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("the number of synchronous standbys must be between 1 and %d: %d", + SYNC_REP_MAX_SYNC_STANDBY_NUM, synchronous_standby_num))); + } + + return; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c03e045..8586af4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2749,9 +2749,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - WalSnd *sync_standby; + int *sync_standbys; + int num_sync; int i; + /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, @@ -2777,11 +2779,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); + sync_standbys = (int *) palloc(sizeof(int) * synchronous_standby_num); + /* - * Get the currently active synchronous standby. + * Get the currently active synchronous standbys. */ LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standby = SyncRepGetSynchronousStandby(); + num_sync = SyncRepGetSyncStandbys(sync_standbys); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) @@ -2854,18 +2858,34 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[7] = CStringGetTextDatum("async"); - else if (walsnd == sync_standby) - values[7] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + { + int j; + bool found = false; + + for (j = 0; j < num_sync; j++) + { + /* Found sync standby */ + if (i == sync_standbys[j]) + { + values[7] = CStringGetTextDatum("sync"); + found = true; + break; + } + } + if (!found) + values[7] = CStringGetTextDatum("potential"); + } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } /* clean up and return the tuplestore */ + pfree(sync_standbys); tuplestore_donestoring(tupstore); + return (Datum) 0; } diff --git a/src/backend/utils/misc/guc-file.l b/src/backend/utils/misc/guc-file.l index 48052f9..c2e5812 100644 --- a/src/backend/utils/misc/guc-file.l +++ b/src/backend/utils/misc/guc-file.l @@ -18,6 +18,7 @@ #include "miscadmin.h" #include "storage/fd.h" #include "utils/guc.h" +#include "replication/syncrep.h" /* @@ -155,6 +156,12 @@ ProcessConfigFile(GucContext context) */ (void) ProcessConfigFileInternal(context, true, elevel); + /* + * After read all synchronous replication configuration parameter, we apply + * settings according to replication method. + */ + ProcessSynchronousReplicationConfig(); + /* Clean up */ MemoryContextSwitchTo(caller_cxt); MemoryContextDelete(config_cxt); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 38ba82f..1d6f530 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -310,6 +310,12 @@ static const struct config_enum_entry xmloption_options[] = { {NULL, 0, false} }; +static const struct config_enum_entry synchronous_replication_method_options[] = { + {"1-priority", SYNC_REP_METHOD_1_PRIORITY, false}, + {"priority", SYNC_REP_METHOD_PRIORITY, false}, + {NULL, 0, false} +}; + /* * Although only "on", "off", and "safe_encoding" are documented, we * accept all the likely variants of "on" and "off". @@ -3672,6 +3678,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"synchronous_replication_method", PGC_SIGHUP, REPLICATION_MASTER, + gettext_noop("Method for multiple synchronous replication."), + NULL + }, + &synchronous_replication_method, + SYNC_REP_METHOD_1_PRIORITY, synchronous_replication_method_options, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 029114f..b79b965 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -241,6 +241,8 @@ #synchronous_standby_names = '' # standby servers that provide sync rep # comma-separated list of application_name # from standby(s); '*' = all +#synchronous_standby_num = 0 # number of standby servers using sync rep +#synchronous_replication_method = '1-priority' # 1-priority, priority #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed # - Standby Servers - diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 96e059b..a9f816c 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -31,8 +31,14 @@ #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* SyncRepMethod */ +#define SYNC_REP_METHOD_1_PRIORITY 0 +#define SYNC_REP_METHOD_PRIORITY 1 + /* user-settable parameters for synchronous replication */ extern char *SyncRepStandbyNames; +extern int synchronous_replication_method; +extern int synchronous_standby_num; /* called by user backend */ extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); @@ -49,9 +55,19 @@ extern void SyncRepUpdateSyncStandbysDefined(void); /* forward declaration to avoid pulling in walsender_private.h */ struct WalSnd; -extern struct WalSnd *SyncRepGetSynchronousStandby(void); + +extern int SyncRepGetSyncStandbys(int *sync_standbys); +extern bool SyncRepSyncedLsnAdvancedTo(XLogRecPtr *write_pos, XLogRecPtr *flush_pos); +extern bool SyncRepActiveListedWalSender(int num); + +/* '1-priority' and 'priority' method */ +extern int SyncRepGetSyncStandbysPriority(int *sync_standbys); +extern bool SyncRepGetSyncLsnsPriority(XLogRecPtr *write_pos, XLogRecPtr *flush_pos); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra); +/* Process configuration parameter related to synchronous replication */ +extern void ProcessSynchronousReplicationConfig(void); + #endif /* _SYNCREP_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers