Hello. Thank you for the new version. At Thu, 1 Nov 2018 11:58:52 +0100, Alexander Kukushkin <cyberd...@gmail.com> wrote in <CAFh8B=kax0uwdyzxn3xzpgrqhhrbiowwfhwstdg0fvj4is2...@mail.gmail.com> > Hi, > > Attached rebased version patch to the current HEAD and created commit fest > entry > On Fri, 21 Sep 2018 at 13:43, Alexander Kukushkin <cyberd...@gmail.com> wrote: > > > > Hi, > > > > On 20 September 2018 at 08:18, Kyotaro HORIGUCHI > > <horiguchi.kyot...@lab.ntt.co.jp> wrote: > > > > > > > > Instaed, we can iterally "reserve" connection slots for the > > > specific use by providing ProcGlobal->walsenderFreeProcs. The > > > slots are never stolen for other usage. Allowing additional > > > walsenders comes after all the slots are filled to grab an > > > available "normal" slot, it works as the same as the current > > > behavior when walsender_reserved_connectsions = 0. > > > > > > What do you think about this? > > > > Sounds reasonable, please see the updated patch.
InitializeMaxBackends() MaxBackends = MaxConnections + autovacuum_max_workers + 1 + - max_worker_processes; + max_worker_processes + replication_reserved_connections; This means walsender doesn't comsume a connection, which is different from the current behavior. We should reserve a part of MaxConnections for walsenders. (in PostmasterMain, max_wal_senders is counted as a part of MaxConnections) + if (am_walsender && replication_reserved_connections < max_wal_senders + && *procgloballist == NULL) + procgloballist = &ProcGlobal->freeProcs; Currently exccesive number of walsenders are rejected in InitWalSenderSlot and emit the following error. > ereport(FATAL, > (errcode(ERRCODE_TOO_MANY_CONNECTIONS), > errmsg("number of requested standby connections " > "exceeds max_wal_senders (currently %d)", > max_wal_senders))); With this patch, if max_wal_senders = replication_reserved_connections = 3 and the fourth walreceiver comes, we will get "FATAL: sorry, too many clients already" instead. It should be fixed. When r_r_conn = 2 and max_wal_senders = 3 and the three walsenders are active, in an exreme case where a new replication connection comes at the same time another is exiting, we could end up using two normal slots despite that one slot is vacant in reserved slots. If we want to avoid the case, we need to limit the number of normal slots to use. I don't think it is acceptable as is but basically something like the attached would do that. regards. -- Kyotaro Horiguchi NTT Open Source Software Center
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3b6c636077..f86c05e8e0 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2250,6 +2250,7 @@ static void InitWalSenderSlot(void) { int i; + bool reject = false; /* * WalSndCtl should be set up already (we inherit this by fork() or @@ -2258,41 +2259,63 @@ InitWalSenderSlot(void) Assert(WalSndCtl != NULL); Assert(MyWalSnd == NULL); + /* limit the maximum number of non-reserved slots to use */ + if (MyProc->procgloballist == &ProcGlobal->freeProcs) + { + int max_normal_slots + = max_wal_senders - replication_reserved_connections; + + if (max_normal_slots <= 0) + { + /* we mustn't use a normal slot */ + reject = true; + } + else if (pg_atomic_add_fetch_u32(&WalSndCtl->n_using_normal_slots, 1) + > max_normal_slots) + { + pg_atomic_sub_fetch_u32(&WalSndCtl->n_using_normal_slots, 1); + reject = true; + } + } + /* * Find a free walsender slot and reserve it. If this fails, we must be * out of WalSnd structures. */ - for (i = 0; i < max_wal_senders; i++) + if (!reject) { - WalSnd *walsnd = &WalSndCtl->walsnds[i]; - - SpinLockAcquire(&walsnd->mutex); - - if (walsnd->pid != 0) + for (i = 0; i < max_wal_senders; i++) { - SpinLockRelease(&walsnd->mutex); - continue; - } - else - { - /* - * Found a free slot. Reserve it for us. - */ - walsnd->pid = MyProcPid; - walsnd->sentPtr = InvalidXLogRecPtr; - walsnd->write = InvalidXLogRecPtr; - walsnd->flush = InvalidXLogRecPtr; - walsnd->apply = InvalidXLogRecPtr; - walsnd->writeLag = -1; - walsnd->flushLag = -1; - walsnd->applyLag = -1; - walsnd->state = WALSNDSTATE_STARTUP; - walsnd->latch = &MyProc->procLatch; - SpinLockRelease(&walsnd->mutex); - /* don't need the lock anymore */ - MyWalSnd = (WalSnd *) walsnd; + WalSnd *walsnd = &WalSndCtl->walsnds[i]; - break; + SpinLockAcquire(&walsnd->mutex); + + if (walsnd->pid != 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + else + { + /* + * Found a free slot. Reserve it for us. + */ + walsnd->pid = MyProcPid; + walsnd->sentPtr = InvalidXLogRecPtr; + walsnd->write = InvalidXLogRecPtr; + walsnd->flush = InvalidXLogRecPtr; + walsnd->apply = InvalidXLogRecPtr; + walsnd->writeLag = -1; + walsnd->flushLag = -1; + walsnd->applyLag = -1; + walsnd->state = WALSNDSTATE_STARTUP; + walsnd->latch = &MyProc->procLatch; + SpinLockRelease(&walsnd->mutex); + /* don't need the lock anymore */ + MyWalSnd = (WalSnd *) walsnd; + + break; + } } } if (MyWalSnd == NULL) @@ -2322,6 +2345,10 @@ WalSndKill(int code, Datum arg) /* Mark WalSnd struct as no longer being in use. */ walsnd->pid = 0; SpinLockRelease(&walsnd->mutex); + + /* decrement usage count of normal connection slots if needed */ + if (MyProc->procgloballist == &ProcGlobal->freeProcs) + pg_atomic_sub_fetch_u32(&WalSndCtl->n_using_normal_slots, 1); } /* @@ -3047,6 +3074,9 @@ WalSndShmemInit(void) SpinLockInit(&walsnd->mutex); } + + /* set the number of non-reserved slots walsenders can use */ + pg_atomic_init_u32(&WalSndCtl->n_using_normal_slots, 0); } } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 2d04a8204a..c461cf2f12 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -333,8 +333,7 @@ InitProcess(void) * Try to use ProcGlobal->freeProcs as a fallback when * all reserved walsender slots are already busy. */ - if (am_walsender && replication_reserved_connections < max_wal_senders - && *procgloballist == NULL) + if (am_walsender && *procgloballist == NULL) procgloballist = &ProcGlobal->freeProcs; MyProc = *procgloballist; diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 4b90477936..bc84c287a4 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -14,6 +14,7 @@ #include "access/xlog.h" #include "nodes/nodes.h" +#include "port/atomics.h" #include "replication/syncrep.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -101,6 +102,8 @@ typedef struct */ bool sync_standbys_defined; + pg_atomic_uint32 n_using_normal_slots; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData;