Hi all, In 9.6 development cycle, we had been discussed about configuration syntax for a long time while considering expanding. As a result, we had new dedicated language for multiple synchronous replication, but it supports only priority method. We know that quorum commit is very useful for many users and can expand dedicated language easily for quorum commit. So I'd like to propose quorum commit for multiple synchronous replication here.
The followings are changes attached patches made. - Add new syntax 'Any N ( node1, node2, ... )' to synchornous_standby_names for quorum commit. - In quorum commit, the master can return commit to client after received ACK from *at least* any N servers of listed standbys. - sync_priority of all listed servers are same, 1. - Add regression test for quorum commit. I was thinking that the syntax for quorum method would use '[ ... ]' but it will be confused with '( ... )' priority method used. 001 patch adds 'Any N ( ... )' style syntax but I know that we still might need to discuss about better syntax, discussion is very welcome. Attached draft patch, please give me feedback. Regards, -- Masahiko Sawada
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 67249d8..0ce5399 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -76,9 +76,9 @@ char *SyncRepStandbyNames; #define SyncStandbysDefined() \ (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') -static bool announce_next_takeover = true; +SyncRepConfigData *SyncRepConfig = NULL; -static SyncRepConfigData *SyncRepConfig = NULL; +static bool announce_next_takeover = true; static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); @@ -89,7 +89,12 @@ static bool SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr, bool *am_sync); +static bool SyncRepGetNNewestSyncRecPtr(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + int pos, bool *am_sync); static int SyncRepGetStandbyPriority(void); +static int cmp_lsn(const void *a, const void *b); #ifdef USE_ASSERT_CHECKING static bool SyncRepQueueIsOrderedByLSN(int mode); @@ -391,7 +396,7 @@ SyncRepReleaseWaiters(void) XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; - bool got_oldest; + bool got_recptr; bool am_sync; int numwrite = 0; int numflush = 0; @@ -418,11 +423,16 @@ SyncRepReleaseWaiters(void) LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* - * Check whether we are a sync standby or not, and calculate the oldest - * positions among all sync standbys. + * Check whether we are a sync standby or not, and calculate the synced + * positions among all sync standbys using method. */ - got_oldest = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, - &applyPtr, &am_sync); + if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY) + got_recptr = SyncRepGetOldestSyncRecPtr(&writePtr, &flushPtr, + &applyPtr, &am_sync); + else /* SYNC_REP_QUORUM */ + got_recptr = SyncRepGetNNewestSyncRecPtr(&writePtr, &flushPtr, + &applyPtr, SyncRepConfig->num_sync, + &am_sync); /* * If we are managing a sync standby, though we weren't prior to this, @@ -440,7 +450,7 @@ SyncRepReleaseWaiters(void) * If the number of sync standbys is less than requested or we aren't * managing a sync standby then just leave. */ - if (!got_oldest || !am_sync) + if (!got_recptr || !am_sync) { LWLockRelease(SyncRepLock); announce_next_takeover = !am_sync; @@ -476,6 +486,88 @@ SyncRepReleaseWaiters(void) } /* + * Calculate the 'pos' newest Write, Flush and Apply positions among sync standbys. + * + * Return false if the number of sync standbys is less than + * synchronous_standby_names specifies. Otherwise return true and + * store the 'pos' newest positions into *writePtr, *flushPtr, *applyPtr. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +static bool +SyncRepGetNNewestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, int pos, bool *am_sync) +{ + XLogRecPtr *write_array; + XLogRecPtr *flush_array; + XLogRecPtr *apply_array; + List *sync_standbys; + ListCell *cell; + int len; + int i = 0; + + *writePtr = InvalidXLogRecPtr; + *flushPtr = InvalidXLogRecPtr; + *applyPtr = InvalidXLogRecPtr; + *am_sync = false; + + /* Get standbys that are considered as synchronous at this moment */ + sync_standbys = SyncRepGetSyncStandbys(am_sync); + + /* + * Quick exit if we are not managing a sync standby or there are not + * enough synchronous standbys. + */ + if (!(*am_sync) || + SyncRepConfig == NULL || + list_length(sync_standbys) < SyncRepConfig->num_sync) + { + list_free(sync_standbys); + return false; + } + + len = list_length(sync_standbys); + write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len); + + /* + * Scan through all sync standbys and calculate 'pos' Newest + * Write, Flush and Apply positions. + */ + foreach (cell, sync_standbys) + { + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + + SpinLockAcquire(&walsnd->mutex); + write_array[i] = walsnd->write; + flush_array[i]= walsnd->flush; + apply_array[i] = walsnd->flush; + SpinLockRelease(&walsnd->mutex); + + i++; + } + + /* Sort each array in descending order to get 'pos' newest element */ + qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn); + qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn); + + /* Get 'pos' newest Write, Flush, Apply positions */ + *writePtr = write_array[pos - 1]; + *flushPtr = flush_array[pos - 1]; + *applyPtr = apply_array[pos - 1]; + + pfree(write_array); + pfree(flush_array); + pfree(apply_array); + list_free(sync_standbys); + + return true; +} + +/* * Calculate the oldest Write, Flush and Apply positions among sync standbys. * * Return false if the number of sync standbys is less than @@ -513,12 +605,12 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, } /* - * Scan through all sync standbys and calculate the oldest Write, Flush - * and Apply positions. + * Scan through all sync standbys and calculate the oldest + * Write, Flush and Apply positions. */ - foreach(cell, sync_standbys) + foreach (cell, sync_standbys) { - WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; + WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)]; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; @@ -542,17 +634,88 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, } /* - * Return the list of sync standbys, or NIL if no sync standby is connected. + * Return the list of sync standbys using according to synchronous method, + * or NIL if no sync standby is connected. The caller must hold SyncRepLock. * - * If there are multiple standbys with the same priority, - * the first one found is selected preferentially. - * The caller must hold SyncRepLock. + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +List * +SyncRepGetSyncStandbys(bool *am_sync) +{ + /* Set default result */ + if (am_sync != NULL) + *am_sync = false; + + /* Quick exit if sync replication is not requested */ + if (SyncRepConfig == NULL) + return NIL; + + if (SyncRepConfig->sync_method == SYNC_REP_PRIORITY) + return SyncRepGetSyncStandbysPriority(am_sync); + else /* SYNC_REP_QUORUM */ + return SyncRepGetSyncStandbysQuorum(am_sync); +} + +/* + * Return the list of sync standbys using quorum method, or + * NIL if no sync standby is connected. In quorum method, all standby + * priorities are same, that is 1. So this function returns the list of + * standbys except for the standbys which are not active, or connected + * as async. + * + * On return, *am_sync is set to true if this walsender is connecting to + * sync standby. Otherwise it's set to false. + */ +List * +SyncRepGetSyncStandbysQuorum(bool *am_sync) +{ + List *result = NIL; + int i; + + for (i = 0; i < max_wal_senders; i++) + { + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + /* Must be active */ + if (walsnd->pid == 0) + continue; + + /* Must be streaming */ + if (walsnd->state != WALSNDSTATE_STREAMING) + continue; + + /* Must be synchronous */ + if (walsnd->sync_standby_priority == 0) + continue; + + /* Must have a valid flush position */ + if (XLogRecPtrIsInvalid(walsnd->flush)) + continue; + + /* + * Consider this standby as candidate of sync and append + * it to the result. + */ + result = lappend_int(result, i); + if (am_sync != NULL && walsnd == MyWalSnd) + *am_sync = true; + } + + return result; +} + +/* + * Return the list of sync standbys using priority method, or + * NIL if no sync standby is connected. In priority method, + * if there are multiple standbys with the same priority, + * the first one found is selected perferentially. * * On return, *am_sync is set to true if this walsender is connecting to * sync standby. Otherwise it's set to false. */ List * -SyncRepGetSyncStandbys(bool *am_sync) +SyncRepGetSyncStandbysPriority(bool *am_sync) { List *result = NIL; List *pending = NIL; @@ -565,14 +728,6 @@ SyncRepGetSyncStandbys(bool *am_sync) volatile WalSnd *walsnd; /* Use volatile pointer to prevent code * rearrangement */ - /* Set default result */ - if (am_sync != NULL) - *am_sync = false; - - /* Quick exit if sync replication is not requested */ - if (SyncRepConfig == NULL) - return NIL; - lowest_priority = SyncRepConfig->nmembers; next_highest_priority = lowest_priority + 1; @@ -754,6 +909,10 @@ SyncRepGetStandbyPriority(void) standby_name += strlen(standby_name) + 1; } + /* In quroum method, all sync standby priorities are always 1 */ + if (found && SyncRepConfig->sync_method == SYNC_REP_QUORUM) + priority = 1; + return (found ? priority : 0); } @@ -897,6 +1056,23 @@ SyncRepQueueIsOrderedByLSN(int mode) #endif /* + * Compare lsn in order to sort array in descending order. + */ +static int +cmp_lsn(const void *a, const void *b) +{ + XLogRecPtr lsn1 = *((const XLogRecPtr *) a); + XLogRecPtr lsn2 = *((const XLogRecPtr *) b); + + if (lsn1 > lsn2) + return -1; + else if (lsn1 == lsn2) + return 0; + else + return 1; +} + +/* * =========================================================== * Synchronous Replication functions executed by any process * =========================================================== diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y index 35c2776..7026a96 100644 --- a/src/backend/replication/syncrep_gram.y +++ b/src/backend/replication/syncrep_gram.y @@ -21,7 +21,7 @@ SyncRepConfigData *syncrep_parse_result; char *syncrep_parse_error_msg; static SyncRepConfigData *create_syncrep_config(const char *num_sync, - List *members); + List *members, int sync_method); /* * Bison doesn't allocate anything that needs to live across parser calls, @@ -46,7 +46,7 @@ static SyncRepConfigData *create_syncrep_config(const char *num_sync, SyncRepConfigData *config; } -%token <str> NAME NUM JUNK +%token <str> NAME NUM JUNK ANY %type <config> result standby_config %type <list> standby_list @@ -60,8 +60,9 @@ result: ; standby_config: - standby_list { $$ = create_syncrep_config("1", $1); } - | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3); } + standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); } + | NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); } + | ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); } ; standby_list: @@ -77,7 +78,7 @@ standby_name: static SyncRepConfigData * -create_syncrep_config(const char *num_sync, List *members) +create_syncrep_config(const char *num_sync, List *members, int sync_method) { SyncRepConfigData *config; int size; @@ -98,6 +99,7 @@ create_syncrep_config(const char *num_sync, List *members) config->config_size = size; config->num_sync = atoi(num_sync); + config->sync_method = sync_method; config->nmembers = list_length(members); ptr = config->member_names; foreach(lc, members) diff --git a/src/backend/replication/syncrep_scanner.l b/src/backend/replication/syncrep_scanner.l index d20662e..e229663 100644 --- a/src/backend/replication/syncrep_scanner.l +++ b/src/backend/replication/syncrep_scanner.l @@ -54,6 +54,7 @@ digit [0-9] ident_start [A-Za-z\200-\377_] ident_cont [A-Za-z\200-\377_0-9\$] identifier {ident_start}{ident_cont}* +any_ident any|ANY|Any dquote \" xdstart {dquote} @@ -64,6 +65,10 @@ xdinside [^"]+ %% {space}+ { /* ignore */ } +{any_ident} { + yylval.str = pstrdup(yytext); + return ANY; + } {xdstart} { initStringInfo(&xdbuf); BEGIN(xd); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a0dba19..16ad2f8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2861,7 +2861,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) if (priority == 0) values[7] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) - values[7] = CStringGetTextDatum("sync"); + values[7] = SyncRepConfig->sync_method == SYNC_REP_PRIORITY ? + CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[7] = CStringGetTextDatum("potential"); } diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index e4e0e27..4ec1e47 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -32,6 +32,10 @@ #define SYNC_REP_WAITING 1 #define SYNC_REP_WAIT_COMPLETE 2 +/* sync_method of SyncRepConfigData */ +#define SYNC_REP_PRIORITY 0 +#define SYNC_REP_QUORUM 1 + /* * Struct for the configuration of synchronous replication. * @@ -45,10 +49,13 @@ typedef struct SyncRepConfigData int num_sync; /* number of sync standbys that we need to * wait for */ int nmembers; /* number of members in the following list */ + int sync_method; /* synchronous method */ /* member_names contains nmembers consecutive nul-terminated C strings */ char member_names[FLEXIBLE_ARRAY_MEMBER]; } SyncRepConfigData; +extern SyncRepConfigData *SyncRepConfig; + /* communication variables for parsing synchronous_standby_names GUC */ extern SyncRepConfigData *syncrep_parse_result; extern char *syncrep_parse_error_msg; @@ -68,6 +75,8 @@ extern void SyncRepReleaseWaiters(void); /* called by wal sender and user backend */ extern List *SyncRepGetSyncStandbys(bool *am_sync); +extern List *SyncRepGetSyncStandbysPriority(bool *am_sync); +extern List *SyncRepGetSyncStandbysQuorum(bool *am_sync); /* called by checkpointer */ extern void SyncRepUpdateSyncStandbysDefined(void);
diff --git a/src/test/recovery/t/007_sync_rep.pl b/src/test/recovery/t/007_sync_rep.pl index baf4477..6fa5522 100644 --- a/src/test/recovery/t/007_sync_rep.pl +++ b/src/test/recovery/t/007_sync_rep.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 8; +use Test::More tests => 10; # Query checking sync_priority and sync_state of each standby my $check_sql = @@ -172,3 +172,25 @@ test_sync_state( standby2|1|sync standby4|1|potential), 'potential standby found earlier in array is promoted to sync'); + +# Check that the state of standbys listed as a voter are having +# same priority when synchronous_standby_names uses quorum method. +test_sync_state( + $node_master, qq(standby1|1|quorum +standby2|1|quorum +standby4|0|async), + '2 quorum and 1 async', + 'Any 2(standby1, standby2)'); + +# Start Standby3 which will be considered in 'quorum' state. +$node_standby_3->start; + +# Check that set setting of 'Any 2(*)' chooses all standbys as +# voter. +test_sync_state( + $node_master, qq(standby1|1|quorum +standby2|1|quorum +standby3|1|quorum +standby4|1|quorum), + 'all standbys are considered as voter for quorum commit', + 'Any 2(*)');
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers