My first idea was to implement parallel apply of transactions. But to do it we need to track dependencies between transactions. Right now Postgres can apply transactions in parallel, but only if they are streamed (which is done only for large transactions) and serialize them by commits. It is possible to enforce parallel apply of short transactions using `debug_logical_replication_streaming` but then performance is ~2x times slower than in case of sequential apply by single worker. By removing serialization by commits, it is possible to speedup apply 3x times and make subscriber apply changes faster then producer can produce them even with multiple clients. But it is possible only if transactions are independent and it can be enforced only by tracking dependencies which seems to be very non-trivial and invasive.
I still do not completely give up with tracking dependencies approach, but decided first to try more simple solution - prefetching. It is already used for physical replication. Certainly in case of physical replication it is much simpler, because each WAL record contains list of accessed blocks.
In case of logical replication prefetching can be done either by prefetching access to replica identity index (usually primary key), either by executing replication command by some background worker Certainly first case is much more easy. We just perform index lookup in prefetch worker and it loads accessed index and heap pages in shared buffer, so main apply worker does not need to read something from disk.
But it works well only for DELETE and HOT UPDATE operations.In the second case we normally execute the LR command in background worker and then abort transaction. Certainly in this case we are doing the same work twice. But assumption is the same: parallel prefetch workers should load affected pages, speeding up work of the main apply worker.
I have implemented some PoC (see attached patch). And get first results of efficiency of such prefetching.
*** First scenario (update-only). Publisher: ```create table t(pk integer primary key, counter integer, filler text default repeat('x', 1000)) with (fillfactor=10);
insert into t values (generate_series(1,100000), 0); create publication pub1 for table t; ``` Subscriber: ```create table t(pk integer primary key, counter integer, filler text default repeat('x', 1000)) with (fillfactor=10); create subscription sub1 connection 'port=54321 dbname=postgres' publication pub1;
```Then I wait until replication is synced, stop subscriber and do random dot updates in 10 sessions at publisher:
``` pgbench -T 100 -c 10 -M prepared -n -f update.sql -p 54321 -d postgres ``` where update.sql is: ``` \set pk random(1, 100000) update t set counter=counter+1 where pk=:pk; ```Then I start subscriber and measure how much time is needed for it to caught up.
Results: no prefetch: 2:00 min prefetch (replica identity only): 0:55 min prefetch (all): 1:10 minThis is definitely the best case for replica-identity index only prefetch (update-only and no other indexes).
How to interpret this results?Without prefetch applying updates takes about two times more at subscriber than performing this updates at publisher.
It means that under huge workload subscriber has no chances to caught up.With prefetching replica identity index, apply time is even smaller than time needed to perform updates at publisher. Performing the whole operation and transaction abort certainly adds more overhead. But still improvement is quite significant.
Please also notice that this results were obtains at the system with larger amount of RAM (64Gb) and fast SSD. With data set not fitting in RAM and much slower disks, the difference is expected to be more significant.
I have tried to simulate it be adding 0.1msec delay to pg_preadv. When I add artificial 0.1msec `preadv` delay, I got the following results: no prefetch: 7:40 prefetch (replica identity only): 3:10 min prefetch (all): 3:09In this case apply takes much more time than 100 seconds during which updates are performed at publisher. Prefetch can improve speed about two times,
but it doesn't allow subcriber to caught-up. *** Second scenario: inserts with secondary random key. Publisher: ``` create table t(pk serial primary key, sk integer, counter integer default 0) insert into t (sk) select random()*10000000 from generate_series(1,10000000) create index on t(sk) create publication pub1 for table t ``` Subscriber: ``` create table t(pk integer primary key, sk integer, counter integer) create index on t(sk)create subscription sub1 connection 'port=54321 dbname=postgres' publication pub1
``` workload: ``` pgbench -T 100 -c 10 -M prepared -n -f insert.sql -p 54321 -d postgres ``` where insert.sql: ``` INSERT INTO t (sk) VALUES (random()*10000000); ``` Results (with 0.1msec delay) are the followingL no prefetch: 10:10 min prefetch (identity): 8:25 min prefetch (full): 5:50minHere as expected prefetching only primary key doesn't provide some big improvement. But replaying insert command in prefetch worker allows to speedup apply almost twice.
Please notice that this approach requires minimal changes in Postgres, because all infrastructure of parallel apply workers is already present and we can reuse the same apply code (with minimal changes) for performing prefetch. I only have to introduce extra tuple lock types (no-lock and try-lock) to minimize overhead and lock conflicts between prefetch and main apply workers. Still it can not completely prevent locks conflicts and deadlocks in prefetch workers. Looks like more work is needed here. Also I set `wal_level=minimal` in prefetch workers to avoid WAL-logging overhead.
Number of prefetch workers is specified by `max_parallel_prefetch_workers_per_subscription` GUC. If it is zero (default) then no prefetching is performed. Prefetch mode is controlled by `prefetch_replica_identity_only` GUC . By default it is true which makes prefetch efficient for hot updates, deletes or inserts in table with just one index (primary key).
Attached please find patch and two shell scripts used to produce this test results. Also it may be more convenient to inspect this patch as PR: https://github.com/knizhnik/postgres/pull/3
I wonder if such LR prefetching approach is considered to be useful?Or it is better to investigate other ways to improve LR apply speed (parallel apply)?
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 53ddd25c42d..c9c4223b22e 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel, * invoking table_tuple_lock. */ static bool -should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) +should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode lockmode) { bool refetch = false; @@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd) break; case TM_Updated: /* XXX: Improve handling here */ - if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); - else - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent update, retrying"))); - refetch = true; + if (lockmode != LockTupleTryExclusive) + { + if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); + else + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent update, retrying"))); + refetch = true; + } break; case TM_Deleted: - /* XXX: Improve handling here */ - ereport(LOG, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("concurrent delete, retrying"))); - refetch = true; + if (lockmode != LockTupleTryExclusive) + { + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + refetch = true; + } break; case TM_Invisible: elog(ERROR, "attempted to lock invisible tuple"); @@ -236,8 +242,16 @@ retry: */ if (TransactionIdIsValid(xwait)) { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; + if (lockmode == LockTupleTryExclusive) + { + found = false; + break; + } + else if (lockmode != LockTupleNoLock) + { + XactLockTableWait(xwait, NULL, NULL, XLTW_None); + goto retry; + } } /* Found our tuple and it's not locked */ @@ -246,7 +260,7 @@ retry: } /* Found tuple, try to lock it in the lockmode. */ - if (found) + if (found && lockmode != LockTupleNoLock) { TM_FailureData tmfd; TM_Result res; @@ -256,14 +270,14 @@ retry: res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), - lockmode, + lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode, LockWaitBlock, 0 /* don't follow updates */ , &tmfd); PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + if (should_refetch_tuple(res, &tmfd, lockmode)) goto retry; } @@ -395,16 +409,23 @@ retry: */ if (TransactionIdIsValid(xwait)) { - XactLockTableWait(xwait, NULL, NULL, XLTW_None); - goto retry; + if (lockmode == LockTupleTryExclusive) + { + found = false; + break; + } + else if (lockmode != LockTupleNoLock) + { + XactLockTableWait(xwait, NULL, NULL, XLTW_None); + goto retry; + } } - /* Found our tuple and it's not locked */ break; } /* Found tuple, try to lock it in the lockmode. */ - if (found) + if (found && lockmode != LockTupleNoLock) { TM_FailureData tmfd; TM_Result res; @@ -414,14 +435,14 @@ retry: res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), outslot, GetCurrentCommandId(false), - lockmode, + lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode, LockWaitBlock, 0 /* don't follow updates */ , &tmfd); PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + if (should_refetch_tuple(res, &tmfd, lockmode)) goto retry; } @@ -508,7 +529,7 @@ retry: PopActiveSnapshot(); - if (should_refetch_tuple(res, &tmfd)) + if (should_refetch_tuple(res, &tmfd, LockTupleShare)) goto retry; return true; diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index d25085d3515..d2c426ecab7 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo) * Try to get a parallel apply worker from the pool. If none is available then * start a new one. */ -static ParallelApplyWorkerInfo * +ParallelApplyWorkerInfo * pa_launch_parallel_worker(void) { MemoryContext oldcontext; @@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void) } } + +static void +pa_apply_dispatch(StringInfo s) +{ + if (MyParallelShared->do_prefetch) + { + PG_TRY(); + { + apply_dispatch(s); + } + PG_CATCH(); + { + HOLD_INTERRUPTS(); + + elog(DEBUG1, "Failed to prefetch LR operation"); + + /* TODO: should we somehow dump the error or just silently ignore it? */ + /* EmitErrorReport(); */ + FlushErrorState(); + + RESUME_INTERRUPTS(); + + lr_prefetch_errors += 1; + } + PG_END_TRY(); + if (!prefetch_replica_identity_only) + { + /* We need to abort transaction to undo insert */ + AbortCurrentTransaction(); + } + } + else + { + apply_dispatch(s); + } +} + /* Parallel apply worker main loop. */ static void LogicalParallelApplyLoop(shm_mq_handle *mqh) @@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) */ s.cursor += SIZE_STATS_MESSAGE; - apply_dispatch(&s); + pa_apply_dispatch(&s); } else if (shmq_res == SHM_MQ_WOULD_BLOCK) { @@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg) InitializingApplyWorker = false; - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + if (!MyParallelShared->do_prefetch) + { + /* Setup replication origin tracking. */ + StartTransactionCommand(); + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, originname, sizeof(originname)); - originid = replorigin_by_name(originname, false); - - /* - * The parallel apply worker doesn't need to monopolize this replication - * origin which was already acquired by its leader process. - */ - replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); - replorigin_session_origin = originid; - CommitTransactionCommand(); + originid = replorigin_by_name(originname, false); + /* + * The parallel apply worker doesn't need to monopolize this replication + * origin which was already acquired by its leader process. + */ + replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); + replorigin_session_origin = originid; + CommitTransactionCommand(); + } + else + { + /* Do not write WAL for prefetch */ + wal_level = WAL_LEVEL_MINIMAL; + } /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. @@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) shm_mq_result result; TimestampTz startTime = 0; - Assert(!IsTransactionState()); - Assert(!winfo->serialize_changes); + if (!winfo->shared->do_prefetch) + { + Assert(!IsTransactionState()); + Assert(!winfo->serialize_changes); + } /* * We don't try to send data to parallel worker for 'immediate' mode. This diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 4aed0dfcebb..ff2eaad5462 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -50,6 +50,7 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +int max_parallel_prefetch_workers_per_subscription = 2; LogicalRepWorker *MyLogicalRepWorker = NULL; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fd11805a44c..8ff0076dad3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0; /* Are we initializing an apply worker? */ bool InitializingApplyWorker = false; +#define INIT_PREFETCH_BUF_SIZE (128*1024) +static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS]; +static int prefetch_worker_rr = 0; +static int n_prefetch_workers; + +bool prefetch_replica_identity_only = true; + +size_t lr_prefetch_hits; +size_t lr_prefetch_misses; +size_t lr_prefetch_errors; +size_t lr_prefetch_inserts; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -329,6 +341,11 @@ bool InitializingApplyWorker = false; static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) +/* + * If operation is performed by parallel prefetch worker + */ +#define is_prefetching() (am_parallel_apply_worker() && MyParallelShared->do_prefetch) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) TransApplyAction apply_action; StringInfoData original_msg; + if (is_prefetching()) + { + return false; + } + apply_action = get_transaction_apply_action(stream_xid, &winfo); /* not in streaming mode */ @@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode) RelationGetRelationName(rel)))); } +#define SAFE_APPLY(call) \ + if (is_prefetching()) \ + { \ + PG_TRY(); \ + { \ + call; \ + } \ + PG_CATCH(); \ + { \ + HOLD_INTERRUPTS(); \ + elog(DEBUG1, "Failed to prefetch LR operation");\ + FlushErrorState(); \ + RESUME_INTERRUPTS(); \ + lr_prefetch_errors += 1; \ + } \ + PG_END_TRY(); \ + } else { \ + call; \ + } + + /* * Handle INSERT message. */ @@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s) ResultRelInfo *relinfo = edata->targetRelInfo; ExecOpenIndices(relinfo, false); - apply_handle_insert_internal(edata, relinfo, remoteslot); + SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, remoteslot)); ExecCloseIndices(relinfo); } @@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata, !relinfo->ri_RelationDesc->rd_rel->relhasindex || RelationGetIndexList(relinfo->ri_RelationDesc) == NIL); - /* Caller will not have done this bit. */ - Assert(relinfo->ri_onConflictArbiterIndexes == NIL); - InitConflictIndexes(relinfo); + if (is_prefetching() && prefetch_replica_identity_only) + { + TupleTableSlot *localslot = NULL; + LogicalRepRelMapEntry *relmapentry = edata->targetRel; + Relation localrel = relinfo->ri_RelationDesc; + EPQState epqstate; - /* Do the insert. */ - TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); - ExecSimpleRelationInsert(relinfo, estate, remoteslot); + EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL); + + (void)FindReplTupleInLocalRel(edata, localrel, + &relmapentry->remoterel, + relmapentry->localindexoid, + remoteslot, &localslot); + } + else + { + /* Caller will not have done this bit. */ + Assert(relinfo->ri_onConflictArbiterIndexes == NIL); + InitConflictIndexes(relinfo); + + /* Do the insert. */ + TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT); + ExecSimpleRelationInsert(relinfo, estate, remoteslot); + } + if (is_prefetching()) + { + lr_prefetch_inserts += 1; + } } /* @@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s) apply_handle_tuple_routing(edata, remoteslot, &newtup, CMD_UPDATE); else - apply_handle_update_internal(edata, edata->targetRelInfo, - remoteslot, &newtup, rel->localindexoid); + SAFE_APPLY(apply_handle_update_internal(edata, edata->targetRelInfo, + remoteslot, &newtup, rel->localindexoid)); finish_edata(edata); @@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata, localindexoid, remoteslot, &localslot); + if (is_prefetching()) + { + if (found) + lr_prefetch_hits += 1; + else + lr_prefetch_misses += 1; + if (prefetch_replica_identity_only) + goto Cleanup; + } + /* * Tuple found. * @@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, remoteslot, newslot, list_make1(&conflicttuple)); } - /* Cleanup. */ + Cleanup: ExecCloseIndices(relinfo); EvalPlanQualEnd(&epqstate); } @@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s) ResultRelInfo *relinfo = edata->targetRelInfo; ExecOpenIndices(relinfo, false); - apply_handle_delete_internal(edata, relinfo, - remoteslot, rel->localindexoid); + SAFE_APPLY(apply_handle_delete_internal(edata, relinfo, + remoteslot, rel->localindexoid)); ExecCloseIndices(relinfo); } @@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata, found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid, remoteslot, &localslot); + if (is_prefetching()) + { + if (found) + lr_prefetch_hits += 1; + else + lr_prefetch_misses += 1; + goto Cleanup; + } + /* If found delete it. */ if (found) { @@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, remoteslot, NULL, list_make1(&conflicttuple)); } - /* Cleanup. */ + Cleanup: EvalPlanQualEnd(&epqstate); } @@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, EState *estate = edata->estate; bool found; + LockTupleMode lockmode = is_prefetching() ? prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : LockTupleExclusive; + /* * Regardless of the top-level operation, we're performing a read here, so * check for SELECT privileges. @@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel, #endif found = RelationFindReplTupleByIndex(localrel, localidxoid, - LockTupleExclusive, + lockmode, remoteslot, *localslot); } else - found = RelationFindReplTupleSeq(localrel, LockTupleExclusive, + found = RelationFindReplTupleSeq(localrel, lockmode, remoteslot, *localslot); return found; @@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, switch (operation) { case CMD_INSERT: - apply_handle_insert_internal(edata, partrelinfo, - remoteslot_part); + SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo, + remoteslot_part)); break; case CMD_DELETE: - apply_handle_delete_internal(edata, partrelinfo, - remoteslot_part, - part_entry->localindexoid); + SAFE_APPLY(apply_handle_delete_internal(edata, partrelinfo, + remoteslot_part, + part_entry->localindexoid)); break; case CMD_UPDATE: @@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot = localslot; + if (is_prefetching()) + return; + /* Store the new tuple for conflict reporting */ slot_store_data(newslot, part_entry, newtup); @@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, { TupleTableSlot *newslot; + if (is_prefetching()) + return; + /* Store the new tuple for conflict reporting */ newslot = table_slot_create(partrel, &estate->es_tupleTable); slot_store_data(newslot, part_entry, newtup); @@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, slot_getallattrs(remoteslot); } MemoryContextSwitchTo(oldctx); - apply_handle_insert_internal(edata, partrelinfo_new, - remoteslot_part); + SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new, + remoteslot_part)); } EvalPlanQualEnd(&epqstate); @@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) MemoryContextSwitchTo(ApplyMessageContext); } - /* Update statistics of the worker. */ static void UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) @@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } } +#define MSG_CODE_OFFSET (1 + 8*3) + +static void +lr_do_prefetch(char* buf, int len) +{ + ParallelApplyWorkerInfo* winfo; + + if (buf[0] != 'w') + return; + + switch (buf[MSG_CODE_OFFSET]) + { + case LOGICAL_REP_MSG_INSERT: + case LOGICAL_REP_MSG_UPDATE: + case LOGICAL_REP_MSG_DELETE: + /* Round robin prefetch worker */ + winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers]; + pa_send_data(winfo, len, buf); + break; + + case LOGICAL_REP_MSG_TYPE: + case LOGICAL_REP_MSG_RELATION: + /* broadcast to all prefetch workers */ + for (int i = 0; i < n_prefetch_workers; i++) + { + winfo = prefetch_worker[i]; + pa_send_data(winfo, len, buf); + } + break; + + default: + /* Ignore other messages */ + break; + } +} + /* * Apply main loop. */ @@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + char* prefetch_buf = NULL; + size_t prefetch_buf_pos = 0; + size_t prefetch_buf_used = 0; + size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received) "LogicalStreamingContext", ALLOCSET_DEFAULT_SIZES); + if (max_parallel_prefetch_workers_per_subscription != 0) + { + int i; + for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++) + { + prefetch_worker[i] = pa_launch_parallel_worker(); + if (!prefetch_worker[i]) + { + elog(LOG, "Launch only %d prefetch worklers from %d", + i, max_parallel_prefetch_workers_per_subscription); + break; + } + prefetch_worker[i]->in_use = true; + prefetch_worker[i]->shared->do_prefetch = true; + } + n_prefetch_workers = i; + prefetch_buf = palloc(prefetch_buf_size); + } + /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { pgsocket fd = PGINVALID_SOCKET; int rc; - int len; + int32 len; char *buf = NULL; bool endofstream = false; + bool no_more_data = false; long wait_time; CHECK_FOR_INTERRUPTS(); @@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received) len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); - if (len != 0) + /* Loop to process all available data (without blocking). */ + for (;;) { - /* Loop to process all available data (without blocking). */ - for (;;) - { - CHECK_FOR_INTERRUPTS(); + CHECK_FOR_INTERRUPTS(); - if (len == 0) + if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used) + { + prefetch_buf_used = 0; + do { - break; - } - else if (len < 0) + if (prefetch_buf_used + len + 4 > prefetch_buf_size) + { + prefetch_buf_size *= 2; + elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size); + prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size); + } + memcpy(&prefetch_buf[prefetch_buf_used], &len, 4); + memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len); + prefetch_buf_used += 4 + len; + if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE) + break; + len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); + } while (len > 0); + + no_more_data = len <= 0; + + for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len) { - ereport(LOG, - (errmsg("data stream from publisher has ended"))); - endofstream = true; - break; + memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4); + lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len); } - else - { - int c; - StringInfoData s; + memcpy(&len, prefetch_buf, 4); + buf = &prefetch_buf[4]; + prefetch_buf_pos = len + 4; + } - if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - } + if (len == 0) + { + break; + } + else if (len < 0) + { + ereport(LOG, + (errmsg("data stream from publisher has ended"))); + endofstream = true; + break; + } + else + { + int c; + StringInfoData s; - /* Reset timeout. */ - last_recv_timestamp = GetCurrentTimestamp(); - ping_sent = false; + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } - /* Ensure we are reading the data into our memory context. */ - MemoryContextSwitchTo(ApplyMessageContext); + /* Reset timeout. */ + last_recv_timestamp = GetCurrentTimestamp(); + ping_sent = false; - initReadOnlyStringInfo(&s, buf, len); + /* Ensure we are reading the data into our memory context. */ + MemoryContextSwitchTo(ApplyMessageContext); - c = pq_getmsgbyte(&s); + initReadOnlyStringInfo(&s, buf, len); - if (c == 'w') - { - XLogRecPtr start_lsn; - XLogRecPtr end_lsn; - TimestampTz send_time; + c = pq_getmsgbyte(&s); - start_lsn = pq_getmsgint64(&s); - end_lsn = pq_getmsgint64(&s); - send_time = pq_getmsgint64(&s); + if (c == 'w') + { + XLogRecPtr start_lsn; + XLogRecPtr end_lsn; + TimestampTz send_time; - if (last_received < start_lsn) - last_received = start_lsn; + start_lsn = pq_getmsgint64(&s); + end_lsn = pq_getmsgint64(&s); + send_time = pq_getmsgint64(&s); - if (last_received < end_lsn) - last_received = end_lsn; + if (last_received < start_lsn) + last_received = start_lsn; - UpdateWorkerStats(last_received, send_time, false); + if (last_received < end_lsn) + last_received = end_lsn; - apply_dispatch(&s); - } - else if (c == 'k') - { - XLogRecPtr end_lsn; - TimestampTz timestamp; - bool reply_requested; + UpdateWorkerStats(last_received, send_time, false); - end_lsn = pq_getmsgint64(&s); - timestamp = pq_getmsgint64(&s); - reply_requested = pq_getmsgbyte(&s); + apply_dispatch(&s); + } + else if (c == 'k') + { + XLogRecPtr end_lsn; + TimestampTz timestamp; + bool reply_requested; - if (last_received < end_lsn) - last_received = end_lsn; + end_lsn = pq_getmsgint64(&s); + timestamp = pq_getmsgint64(&s); + reply_requested = pq_getmsgbyte(&s); - send_feedback(last_received, reply_requested, false); - UpdateWorkerStats(last_received, timestamp, true); - } - /* other message types are purposefully ignored */ + if (last_received < end_lsn) + last_received = end_lsn; - MemoryContextReset(ApplyMessageContext); + send_feedback(last_received, reply_requested, false); + UpdateWorkerStats(last_received, timestamp, true); } + /* other message types are purposefully ignored */ + MemoryContextReset(ApplyMessageContext); + } + if (prefetch_buf_pos < prefetch_buf_used) + { + memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4); + buf = &prefetch_buf[prefetch_buf_pos + 4]; + prefetch_buf_pos += 4 + len; + } + else if (prefetch_buf_used != 0 && no_more_data) + { + break; + } + else + { len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); } } diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 511dc32d519..3b254898663 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -76,6 +76,7 @@ #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/aio.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" @@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"prefetch_replica_identity_only", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."), + NULL, + }, + &prefetch_replica_identity_only, + true, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL @@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"max_parallel_prefetch_workers_per_subscription", + PGC_SIGHUP, + REPLICATION_SUBSCRIBERS, + gettext_noop("Maximum number of parallel prefetch workers per subscription."), + NULL, + }, + &max_parallel_prefetch_workers_per_subscription, + 2, 0, MAX_LR_PREFETCH_WORKERS, + NULL, NULL, NULL + }, + { {"max_active_replication_origins", PGC_POSTMASTER, diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h index 0b534e30603..88f5d2e4cc5 100644 --- a/src/include/nodes/lockoptions.h +++ b/src/include/nodes/lockoptions.h @@ -56,6 +56,10 @@ typedef enum LockTupleMode LockTupleNoKeyExclusive, /* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */ LockTupleExclusive, + /* Do not lock tuple */ + LockTupleNoLock, + /* Try explusive lock, silent give up in case of conflict */ + LockTupleTryExclusive, } LockTupleMode; #endif /* LOCKOPTIONS_H */ diff --git a/src/include/port/pg_iovec.h b/src/include/port/pg_iovec.h index 90be3af449d..8fefeb8c245 100644 --- a/src/include/port/pg_iovec.h +++ b/src/include/port/pg_iovec.h @@ -53,6 +53,7 @@ struct iovec static inline ssize_t pg_preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset) { + pg_usleep(100); #if HAVE_DECL_PREADV /* * Avoid a small amount of argument copying overhead in the kernel if diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f3305..19d1a8d466b 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,6 +15,8 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..c6745e77efc 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared */ PartialFileSetState fileset_state; FileSet fileset; + + /* + * Prefetch worker + */ + bool do_prefetch; } ParallelApplyWorkerShared; /* @@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +#define MAX_LR_PREFETCH_WORKERS 128 +extern PGDLLIMPORT size_t lr_prefetch_hits; +extern PGDLLIMPORT size_t lr_prefetch_misses; +extern PGDLLIMPORT size_t lr_prefetch_errors; +extern PGDLLIMPORT size_t lr_prefetch_inserts; + +extern bool prefetch_replica_identity_only; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->in_use && \ +extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType action); + +#define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) +extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void); static inline bool am_tablesync_worker(void)
apply-updates.sh
Description: Bourne shell script
apply-inserts.sh
Description: Bourne shell script