There is well known Postgres problem that logical replication subscriber can not caught-up with publisher just because LR changes are applied by single worker and at publisher changes are made by multiple concurrent backends. The problem is not logical replication specific: physical replication stream is also handled by single walreceiver. But for physical replication Postgres now implements prefetch: looking at WAL record blocks it is quite easy to predict which pages will be required for redo and prefetch them. With logical replication situation is much more complicated.

My first idea was to implement parallel apply of transactions. But to do it we need to track dependencies between transactions. Right now Postgres can apply transactions in parallel, but only if they are streamed  (which is done only for large transactions) and serialize them by commits. It is possible to enforce parallel apply of short transactions using `debug_logical_replication_streaming` but then performance is ~2x times slower than in case of sequential apply by single worker. By removing serialization by commits, it is possible to speedup apply 3x times and make subscriber apply changes faster then producer can produce them even with multiple clients. But it is possible only if transactions are independent and it can be enforced only by tracking dependencies which seems to be very non-trivial and invasive.

I still do not completely give up with tracking dependencies approach, but decided first to try more simple solution - prefetching. It is already used for physical replication. Certainly in case of physical replication it is much simpler, because each WAL record contains list of accessed blocks.

In case of logical replication prefetching can be done either by prefetching access to replica identity index (usually primary key), either by executing replication command by some background worker Certainly first case is much more easy. We just perform index lookup in prefetch worker and it loads accessed index and heap pages in shared buffer, so main apply worker does not need to read something from disk.
But it works well only for DELETE and HOT UPDATE operations.

In the second case we normally execute the LR command in background worker and then abort transaction. Certainly in this case we are doing the same work twice. But assumption is the same: parallel prefetch workers should load affected pages, speeding up work of the main apply worker.

I have implemented some PoC (see attached patch). And get first results of efficiency of such prefetching.

*** First scenario (update-only).

Publisher:
```
create table t(pk integer primary key, counter integer, filler text default repeat('x', 1000)) with (fillfactor=10);
insert into t values (generate_series(1,100000), 0);
create publication pub1 for table t;
```

Subscriber:
```
create table t(pk integer primary key, counter integer, filler text default repeat('x', 1000)) with (fillfactor=10); create subscription sub1 connection 'port=54321 dbname=postgres' publication pub1;
```

Then I wait until replication is synced, stop subscriber and do random dot updates in 10 sessions at publisher:

```
pgbench -T 100 -c 10 -M prepared -n -f update.sql -p 54321 -d postgres
```

where update.sql is:

```
\set pk random(1, 100000)
update t set counter=counter+1 where pk=:pk;
```

Then I start subscriber and measure how much time is needed for it to caught up.
Results:

no prefetch: 2:00 min
prefetch (replica identity only): 0:55 min
prefetch (all): 1:10 min

This is definitely the best case for replica-identity index only prefetch (update-only and no other indexes).
How to interpret this results?

Without prefetch applying updates takes about two times  more at subscriber than performing this updates at publisher.
It means that under huge workload subscriber has no chances to caught up.

With prefetching replica identity index, apply time is even smaller than time needed to perform updates at publisher. Performing the whole operation and transaction abort certainly adds more overhead. But still improvement is quite significant.

Please also notice that this results were obtains at the system with larger amount of RAM (64Gb) and fast SSD. With data set not fitting in RAM and much slower disks, the difference is expected to be more significant.
I have tried to simulate it be adding 0.1msec delay to pg_preadv.
When I add artificial 0.1msec `preadv` delay, I got the following results:

no prefetch: 7:40
prefetch (replica identity only): 3:10 min
prefetch (all): 3:09


In this case apply takes much more time than 100 seconds during which updates are performed at publisher. Prefetch can improve speed about two times,
but it doesn't allow subcriber to caught-up.



*** Second scenario: inserts with secondary random key.


Publisher:

```
create table t(pk serial primary key, sk integer, counter integer default 0)
insert into t (sk) select random()*10000000 from generate_series(1,10000000)
create index on t(sk)
create publication pub1 for table t
```

Subscriber:
```

create table t(pk integer primary key, sk integer, counter integer)
create index on t(sk)
create subscription sub1 connection 'port=54321 dbname=postgres' publication pub1
```

workload:

```
pgbench -T 100 -c 10 -M prepared -n -f insert.sql -p 54321 -d postgres

```

where insert.sql:

```
INSERT INTO t (sk) VALUES (random()*10000000);
```

Results (with 0.1msec delay)  are the followingL

no prefetch: 10:10 min
prefetch (identity): 8:25 min
prefetch (full): 5:50min

Here as expected prefetching only primary key doesn't provide some big improvement. But replaying insert command in prefetch worker allows to speedup apply almost twice.

Please notice that this approach requires minimal changes in Postgres, because all infrastructure of parallel apply workers is already present and we can reuse the same apply code (with minimal changes) for performing prefetch. I only have to introduce extra tuple lock types (no-lock and try-lock) to minimize overhead and lock conflicts between prefetch and main apply workers. Still it can not completely prevent locks conflicts and deadlocks in prefetch workers. Looks like more work is needed here. Also I set `wal_level=minimal` in prefetch workers to avoid  WAL-logging overhead.

Number of prefetch workers is specified by `max_parallel_prefetch_workers_per_subscription` GUC. If it is zero (default) then no prefetching is performed. Prefetch mode is controlled by `prefetch_replica_identity_only` GUC . By default it is true which makes prefetch efficient for hot updates, deletes or inserts in table with just one index (primary key).


Attached please find patch and two shell scripts used to produce this test results. Also it may be more convenient to inspect this patch as PR: https://github.com/knizhnik/postgres/pull/3

I wonder if such LR prefetching approach is considered to be useful?
Or it is better to investigate other ways to improve LR apply speed (parallel apply)?
diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index 53ddd25c42d..c9c4223b22e 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, 
Relation idxrel,
  * invoking table_tuple_lock.
  */
 static bool
-should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode 
lockmode)
 {
        bool            refetch = false;
 
@@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
                        break;
                case TM_Updated:
                        /* XXX: Improve handling here */
-                       if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
-                               ereport(LOG,
-                                               
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                errmsg("tuple to be locked was 
already moved to another partition due to concurrent update, retrying")));
-                       else
-                               ereport(LOG,
-                                               
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                                errmsg("concurrent update, 
retrying")));
-                       refetch = true;
+                       if (lockmode != LockTupleTryExclusive)
+                       {
+                               if 
(ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+                                       ereport(LOG,
+                                                       
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                        errmsg("tuple to be 
locked was already moved to another partition due to concurrent update, 
retrying")));
+                               else
+                                       ereport(LOG,
+                                                       
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                        errmsg("concurrent 
update, retrying")));
+                               refetch = true;
+                       }
                        break;
                case TM_Deleted:
-                       /* XXX: Improve handling here */
-                       ereport(LOG,
-                                       
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
-                                        errmsg("concurrent delete, 
retrying")));
-                       refetch = true;
+                       if (lockmode != LockTupleTryExclusive)
+                       {
+                               /* XXX: Improve handling here */
+                               ereport(LOG,
+                                               
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+                                                errmsg("concurrent delete, 
retrying")));
+                               refetch = true;
+                       }
                        break;
                case TM_Invisible:
                        elog(ERROR, "attempted to lock invisible tuple");
@@ -236,8 +242,16 @@ retry:
                 */
                if (TransactionIdIsValid(xwait))
                {
-                       XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-                       goto retry;
+                       if (lockmode == LockTupleTryExclusive)
+                       {
+                               found = false;
+                               break;
+                       }
+                       else if (lockmode != LockTupleNoLock)
+                       {
+                               XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+                               goto retry;
+                       }
                }
 
                /* Found our tuple and it's not locked */
@@ -246,7 +260,7 @@ retry:
        }
 
        /* Found tuple, try to lock it in the lockmode. */
-       if (found)
+       if (found && lockmode != LockTupleNoLock)
        {
                TM_FailureData tmfd;
                TM_Result       res;
@@ -256,14 +270,14 @@ retry:
                res = table_tuple_lock(rel, &(outslot->tts_tid), 
GetActiveSnapshot(),
                                                           outslot,
                                                           
GetCurrentCommandId(false),
-                                                          lockmode,
+                                                          lockmode == 
LockTupleTryExclusive ? LockTupleExclusive : lockmode,
                                                           LockWaitBlock,
                                                           0 /* don't follow 
updates */ ,
                                                           &tmfd);
 
                PopActiveSnapshot();
 
-               if (should_refetch_tuple(res, &tmfd))
+               if (should_refetch_tuple(res, &tmfd, lockmode))
                        goto retry;
        }
 
@@ -395,16 +409,23 @@ retry:
                 */
                if (TransactionIdIsValid(xwait))
                {
-                       XactLockTableWait(xwait, NULL, NULL, XLTW_None);
-                       goto retry;
+                       if (lockmode == LockTupleTryExclusive)
+                       {
+                               found = false;
+                               break;
+                       }
+                       else if (lockmode != LockTupleNoLock)
+                       {
+                               XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+                               goto retry;
+                       }
                }
-
                /* Found our tuple and it's not locked */
                break;
        }
 
        /* Found tuple, try to lock it in the lockmode. */
-       if (found)
+       if (found && lockmode != LockTupleNoLock)
        {
                TM_FailureData tmfd;
                TM_Result       res;
@@ -414,14 +435,14 @@ retry:
                res = table_tuple_lock(rel, &(outslot->tts_tid), 
GetActiveSnapshot(),
                                                           outslot,
                                                           
GetCurrentCommandId(false),
-                                                          lockmode,
+                                                          lockmode == 
LockTupleTryExclusive ? LockTupleExclusive : lockmode,
                                                           LockWaitBlock,
                                                           0 /* don't follow 
updates */ ,
                                                           &tmfd);
 
                PopActiveSnapshot();
 
-               if (should_refetch_tuple(res, &tmfd))
+               if (should_refetch_tuple(res, &tmfd, lockmode))
                        goto retry;
        }
 
@@ -508,7 +529,7 @@ retry:
 
        PopActiveSnapshot();
 
-       if (should_refetch_tuple(res, &tmfd))
+       if (should_refetch_tuple(res, &tmfd, LockTupleShare))
                goto retry;
 
        return true;
diff --git a/src/backend/replication/logical/applyparallelworker.c 
b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..d2c426ecab7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
  * Try to get a parallel apply worker from the pool. If none is available then
  * start a new one.
  */
-static ParallelApplyWorkerInfo *
+ParallelApplyWorkerInfo *
 pa_launch_parallel_worker(void)
 {
        MemoryContext oldcontext;
@@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void)
        }
 }
 
+
+static void
+pa_apply_dispatch(StringInfo s)
+{
+       if (MyParallelShared->do_prefetch)
+       {
+               PG_TRY();
+               {
+                       apply_dispatch(s);
+               }
+               PG_CATCH();
+               {
+                       HOLD_INTERRUPTS();
+
+                       elog(DEBUG1, "Failed to prefetch LR operation");
+
+                       /* TODO: should we somehow dump the error or just 
silently ignore it? */
+                       /* EmitErrorReport(); */
+                       FlushErrorState();
+
+                       RESUME_INTERRUPTS();
+
+                       lr_prefetch_errors += 1;
+               }
+               PG_END_TRY();
+               if (!prefetch_replica_identity_only)
+               {
+                       /* We need to abort transaction to undo insert */
+                       AbortCurrentTransaction();
+               }
+       }
+       else
+       {
+               apply_dispatch(s);
+       }
+}
+
 /* Parallel apply worker main loop. */
 static void
 LogicalParallelApplyLoop(shm_mq_handle *mqh)
@@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
                         */
                        s.cursor += SIZE_STATS_MESSAGE;
 
-                       apply_dispatch(&s);
+                       pa_apply_dispatch(&s);
                }
                else if (shmq_res == SHM_MQ_WOULD_BLOCK)
                {
@@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg)
 
        InitializingApplyWorker = false;
 
-       /* Setup replication origin tracking. */
-       StartTransactionCommand();
-       ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+       if (!MyParallelShared->do_prefetch)
+       {
+               /* Setup replication origin tracking. */
+               StartTransactionCommand();
+               ReplicationOriginNameForLogicalRep(MySubscription->oid, 
InvalidOid,
                                                                           
originname, sizeof(originname));
-       originid = replorigin_by_name(originname, false);
-
-       /*
-        * The parallel apply worker doesn't need to monopolize this replication
-        * origin which was already acquired by its leader process.
-        */
-       replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-       replorigin_session_origin = originid;
-       CommitTransactionCommand();
+               originid = replorigin_by_name(originname, false);
 
+               /*
+                * The parallel apply worker doesn't need to monopolize this 
replication
+                * origin which was already acquired by its leader process.
+                */
+               replorigin_session_setup(originid, 
MyLogicalRepWorker->leader_pid);
+               replorigin_session_origin = originid;
+               CommitTransactionCommand();
+       }
+       else
+       {
+               /* Do not write WAL for prefetch */
+               wal_level = WAL_LEVEL_MINIMAL;
+       }
        /*
         * Setup callback for syscache so that we know when something changes in
         * the subscription relation state.
@@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size 
nbytes, const void *data)
        shm_mq_result result;
        TimestampTz startTime = 0;
 
-       Assert(!IsTransactionState());
-       Assert(!winfo->serialize_changes);
+       if (!winfo->shared->do_prefetch)
+       {
+               Assert(!IsTransactionState());
+               Assert(!winfo->serialize_changes);
+       }
 
        /*
         * We don't try to send data to parallel worker for 'immediate' mode. 
This
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..ff2eaad5462 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
 int                    max_logical_replication_workers = 4;
 int                    max_sync_workers_per_subscription = 2;
 int                    max_parallel_apply_workers_per_subscription = 2;
+int                    max_parallel_prefetch_workers_per_subscription = 2;
 
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index fd11805a44c..8ff0076dad3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
 /* Are we initializing an apply worker? */
 bool           InitializingApplyWorker = false;
 
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = true;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the 
subskiplsn.
@@ -329,6 +341,11 @@ bool               InitializingApplyWorker = false;
 static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
 #define is_skipping_changes() 
(unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
 
+/*
+ * If operation is performed by parallel prefetch worker
+ */
+#define is_prefetching()       (am_parallel_apply_worker() && 
MyParallelShared->do_prefetch)
+
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
@@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, 
StringInfo s)
        TransApplyAction apply_action;
        StringInfoData original_msg;
 
+       if (is_prefetching())
+       {
+               return false;
+       }
+
        apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
        /* not in streaming mode */
@@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
                                                RelationGetRelationName(rel))));
 }
 
+#define SAFE_APPLY(call)                                                       
                \
+       if (is_prefetching())                                                   
                \
+       {                                                                       
                                        \
+               PG_TRY();                                                       
                                \
+               {                                                               
                                        \
+                       call;                                                   
                                \
+               }                                                               
                                        \
+               PG_CATCH();                                                     
                                \
+               {                                                               
                                        \
+                       HOLD_INTERRUPTS();                                      
                        \
+                       elog(DEBUG1, "Failed to prefetch LR operation");\
+                       FlushErrorState();                                      
                        \
+                       RESUME_INTERRUPTS();                                    
                \
+                       lr_prefetch_errors += 1;                                
                \
+               }                                                               
                                        \
+               PG_END_TRY();                                                   
                        \
+       } else {                                                                
                                \
+               call;                                                           
                                \
+       }
+
+
 /*
  * Handle INSERT message.
  */
@@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s)
                ResultRelInfo *relinfo = edata->targetRelInfo;
 
                ExecOpenIndices(relinfo, false);
-               apply_handle_insert_internal(edata, relinfo, remoteslot);
+               SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, 
remoteslot));
                ExecCloseIndices(relinfo);
        }
 
@@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
                   !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
                   RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
 
-       /* Caller will not have done this bit. */
-       Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
-       InitConflictIndexes(relinfo);
+       if (is_prefetching() && prefetch_replica_identity_only)
+       {
+               TupleTableSlot *localslot = NULL;
+               LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+               Relation                localrel = relinfo->ri_RelationDesc;
+               EPQState                epqstate;
 
-       /* Do the insert. */
-       TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
-       ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+               EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+
+               (void)FindReplTupleInLocalRel(edata, localrel,
+                                                                         
&relmapentry->remoterel,
+                                                                         
relmapentry->localindexoid,
+                                                                         
remoteslot, &localslot);
+       }
+       else
+       {
+               /* Caller will not have done this bit. */
+               Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+               InitConflictIndexes(relinfo);
+
+               /* Do the insert. */
+               TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+               ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+       }
+       if (is_prefetching())
+       {
+               lr_prefetch_inserts += 1;
+       }
 }
 
 /*
@@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s)
                apply_handle_tuple_routing(edata,
                                                                   remoteslot, 
&newtup, CMD_UPDATE);
        else
-               apply_handle_update_internal(edata, edata->targetRelInfo,
-                                                                        
remoteslot, &newtup, rel->localindexoid);
+               SAFE_APPLY(apply_handle_update_internal(edata, 
edata->targetRelInfo,
+                                                                               
                remoteslot, &newtup, rel->localindexoid));
 
        finish_edata(edata);
 
@@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                                                                        
localindexoid,
                                                                        
remoteslot, &localslot);
 
+       if (is_prefetching())
+       {
+               if (found)
+                       lr_prefetch_hits += 1;
+               else
+                       lr_prefetch_misses += 1;
+               if (prefetch_replica_identity_only)
+                       goto Cleanup;
+       }
+
        /*
         * Tuple found.
         *
@@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                                                        remoteslot, newslot, 
list_make1(&conflicttuple));
        }
 
-       /* Cleanup. */
+  Cleanup:
        ExecCloseIndices(relinfo);
        EvalPlanQualEnd(&epqstate);
 }
@@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s)
                ResultRelInfo *relinfo = edata->targetRelInfo;
 
                ExecOpenIndices(relinfo, false);
-               apply_handle_delete_internal(edata, relinfo,
-                                                                        
remoteslot, rel->localindexoid);
+               SAFE_APPLY(apply_handle_delete_internal(edata, relinfo,
+                                                                               
                remoteslot, rel->localindexoid));
                ExecCloseIndices(relinfo);
        }
 
@@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
        found = FindReplTupleInLocalRel(edata, localrel, remoterel, 
localindexoid,
                                                                        
remoteslot, &localslot);
 
+       if (is_prefetching())
+       {
+               if (found)
+                       lr_prefetch_hits += 1;
+               else
+                       lr_prefetch_misses += 1;
+               goto Cleanup;
+       }
+
        /* If found delete it. */
        if (found)
        {
@@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
                                                        remoteslot, NULL, 
list_make1(&conflicttuple));
        }
 
-       /* Cleanup. */
+  Cleanup:
        EvalPlanQualEnd(&epqstate);
 }
 
@@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, 
Relation localrel,
        EState     *estate = edata->estate;
        bool            found;
 
+       LockTupleMode lockmode = is_prefetching() ? 
prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : 
LockTupleExclusive;
+
        /*
         * Regardless of the top-level operation, we're performing a read here, 
so
         * check for SELECT privileges.
@@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, 
Relation localrel,
 #endif
 
                found = RelationFindReplTupleByIndex(localrel, localidxoid,
-                                                                               
         LockTupleExclusive,
+                                                                               
         lockmode,
                                                                                
         remoteslot, *localslot);
        }
        else
-               found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+               found = RelationFindReplTupleSeq(localrel, lockmode,
                                                                                
 remoteslot, *localslot);
 
        return found;
@@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
        switch (operation)
        {
                case CMD_INSERT:
-                       apply_handle_insert_internal(edata, partrelinfo,
-                                                                               
 remoteslot_part);
+                       SAFE_APPLY(apply_handle_insert_internal(edata, 
partrelinfo,
+                                                                               
                        remoteslot_part));
                        break;
 
                case CMD_DELETE:
-                       apply_handle_delete_internal(edata, partrelinfo,
-                                                                               
 remoteslot_part,
-                                                                               
 part_entry->localindexoid);
+                       SAFE_APPLY(apply_handle_delete_internal(edata, 
partrelinfo,
+                                                                               
                        remoteslot_part,
+                                                                               
                        part_entry->localindexoid));
                        break;
 
                case CMD_UPDATE:
@@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                {
                                        TupleTableSlot *newslot = localslot;
 
+                                       if (is_prefetching())
+                                               return;
+
                                        /* Store the new tuple for conflict 
reporting */
                                        slot_store_data(newslot, part_entry, 
newtup);
 
@@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                {
                                        TupleTableSlot *newslot;
 
+                                       if (is_prefetching())
+                                               return;
+
                                        /* Store the new tuple for conflict 
reporting */
                                        newslot = table_slot_create(partrel, 
&estate->es_tupleTable);
                                        slot_store_data(newslot, part_entry, 
newtup);
@@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                                slot_getallattrs(remoteslot);
                                        }
                                        MemoryContextSwitchTo(oldctx);
-                                       apply_handle_insert_internal(edata, 
partrelinfo_new,
-                                                                               
                 remoteslot_part);
+                                       
SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new,
+                                                                               
                                        remoteslot_part));
                                }
 
                                EvalPlanQualEnd(&epqstate);
@@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr 
local_lsn)
        MemoryContextSwitchTo(ApplyMessageContext);
 }
 
-
 /* Update statistics of the worker. */
 static void
 UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
@@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz 
send_time, bool reply)
        }
 }
 
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+       ParallelApplyWorkerInfo* winfo;
+
+       if (buf[0] != 'w')
+               return;
+
+       switch (buf[MSG_CODE_OFFSET])
+       {
+               case LOGICAL_REP_MSG_INSERT:
+               case LOGICAL_REP_MSG_UPDATE:
+               case LOGICAL_REP_MSG_DELETE:
+                       /* Round robin prefetch worker */
+                       winfo = prefetch_worker[prefetch_worker_rr++ % 
n_prefetch_workers];
+                       pa_send_data(winfo, len, buf);
+                       break;
+
+               case LOGICAL_REP_MSG_TYPE:
+               case LOGICAL_REP_MSG_RELATION:
+                       /* broadcast to all prefetch workers */
+                       for (int i = 0; i < n_prefetch_workers; i++)
+                       {
+                               winfo = prefetch_worker[i];
+                               pa_send_data(winfo, len, buf);
+                       }
+                       break;
+
+               default:
+                       /* Ignore other messages */
+                       break;
+       }
+}
+
 /*
  * Apply main loop.
  */
@@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        bool            ping_sent = false;
        TimeLineID      tli;
        ErrorContextCallback errcallback;
+       char* prefetch_buf = NULL;
+       size_t prefetch_buf_pos = 0;
+       size_t prefetch_buf_used = 0;
+       size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
 
        /*
         * Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                                                                
                        "LogicalStreamingContext",
                                                                                
                        ALLOCSET_DEFAULT_SIZES);
 
+       if (max_parallel_prefetch_workers_per_subscription != 0)
+       {
+               int i;
+               for (i = 0; i < max_parallel_prefetch_workers_per_subscription; 
i++)
+               {
+                       prefetch_worker[i] = pa_launch_parallel_worker();
+                       if (!prefetch_worker[i])
+                       {
+                               elog(LOG, "Launch only %d prefetch worklers 
from %d",
+                                        i, 
max_parallel_prefetch_workers_per_subscription);
+                               break;
+                       }
+                       prefetch_worker[i]->in_use = true;
+                       prefetch_worker[i]->shared->do_prefetch = true;
+               }
+               n_prefetch_workers = i;
+               prefetch_buf = palloc(prefetch_buf_size);
+       }
+
        /* mark as idle, before starting to loop */
        pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        {
                pgsocket        fd = PGINVALID_SOCKET;
                int                     rc;
-               int                     len;
+               int32           len;
                char       *buf = NULL;
                bool            endofstream = false;
+               bool            no_more_data = false;
                long            wait_time;
 
                CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
-               if (len != 0)
+               /* Loop to process all available data (without blocking). */
+               for (;;)
                {
-                       /* Loop to process all available data (without 
blocking). */
-                       for (;;)
-                       {
-                               CHECK_FOR_INTERRUPTS();
+                       CHECK_FOR_INTERRUPTS();
 
-                               if (len == 0)
+                       if (len > 0 && n_prefetch_workers != 0 && 
prefetch_buf_pos == prefetch_buf_used)
+                       {
+                               prefetch_buf_used = 0;
+                               do
                                {
-                                       break;
-                               }
-                               else if (len < 0)
+                                       if (prefetch_buf_used + len + 4 > 
prefetch_buf_size)
+                                       {
+                                               prefetch_buf_size *= 2;
+                                               elog(DEBUG1, "Increase prefetch 
buffer size to %ld", prefetch_buf_size);
+                                               prefetch_buf = 
repalloc(prefetch_buf, prefetch_buf_size);
+                                       }
+                                       
memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+                                       
memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+                                       prefetch_buf_used += 4 + len;
+                                       if (prefetch_buf_used >= 
INIT_PREFETCH_BUF_SIZE)
+                                               break;
+                                       len = 
walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+                               } while (len > 0);
+
+                               no_more_data = len <= 0;
+
+                               for (prefetch_buf_pos = 0; prefetch_buf_pos < 
prefetch_buf_used; prefetch_buf_pos += 4 + len)
                                {
-                                       ereport(LOG,
-                                                       (errmsg("data stream 
from publisher has ended")));
-                                       endofstream = true;
-                                       break;
+                                       memcpy(&len, 
&prefetch_buf[prefetch_buf_pos], 4);
+                                       
lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
                                }
-                               else
-                               {
-                                       int                     c;
-                                       StringInfoData s;
+                               memcpy(&len, prefetch_buf, 4);
+                               buf = &prefetch_buf[4];
+                               prefetch_buf_pos = len + 4;
+                       }
 
-                                       if (ConfigReloadPending)
-                                       {
-                                               ConfigReloadPending = false;
-                                               ProcessConfigFile(PGC_SIGHUP);
-                                       }
+                       if (len == 0)
+                       {
+                               break;
+                       }
+                       else if (len < 0)
+                       {
+                               ereport(LOG,
+                                               (errmsg("data stream from 
publisher has ended")));
+                               endofstream = true;
+                               break;
+                       }
+                       else
+                       {
+                               int                     c;
+                               StringInfoData s;
 
-                                       /* Reset timeout. */
-                                       last_recv_timestamp = 
GetCurrentTimestamp();
-                                       ping_sent = false;
+                               if (ConfigReloadPending)
+                               {
+                                       ConfigReloadPending = false;
+                                       ProcessConfigFile(PGC_SIGHUP);
+                               }
 
-                                       /* Ensure we are reading the data into 
our memory context. */
-                                       
MemoryContextSwitchTo(ApplyMessageContext);
+                               /* Reset timeout. */
+                               last_recv_timestamp = GetCurrentTimestamp();
+                               ping_sent = false;
 
-                                       initReadOnlyStringInfo(&s, buf, len);
+                               /* Ensure we are reading the data into our 
memory context. */
+                               MemoryContextSwitchTo(ApplyMessageContext);
 
-                                       c = pq_getmsgbyte(&s);
+                               initReadOnlyStringInfo(&s, buf, len);
 
-                                       if (c == 'w')
-                                       {
-                                               XLogRecPtr      start_lsn;
-                                               XLogRecPtr      end_lsn;
-                                               TimestampTz send_time;
+                               c = pq_getmsgbyte(&s);
 
-                                               start_lsn = pq_getmsgint64(&s);
-                                               end_lsn = pq_getmsgint64(&s);
-                                               send_time = pq_getmsgint64(&s);
+                               if (c == 'w')
+                               {
+                                       XLogRecPtr      start_lsn;
+                                       XLogRecPtr      end_lsn;
+                                       TimestampTz send_time;
 
-                                               if (last_received < start_lsn)
-                                                       last_received = 
start_lsn;
+                                       start_lsn = pq_getmsgint64(&s);
+                                       end_lsn = pq_getmsgint64(&s);
+                                       send_time = pq_getmsgint64(&s);
 
-                                               if (last_received < end_lsn)
-                                                       last_received = end_lsn;
+                                       if (last_received < start_lsn)
+                                               last_received = start_lsn;
 
-                                               
UpdateWorkerStats(last_received, send_time, false);
+                                       if (last_received < end_lsn)
+                                               last_received = end_lsn;
 
-                                               apply_dispatch(&s);
-                                       }
-                                       else if (c == 'k')
-                                       {
-                                               XLogRecPtr      end_lsn;
-                                               TimestampTz timestamp;
-                                               bool            reply_requested;
+                                       UpdateWorkerStats(last_received, 
send_time, false);
 
-                                               end_lsn = pq_getmsgint64(&s);
-                                               timestamp = pq_getmsgint64(&s);
-                                               reply_requested = 
pq_getmsgbyte(&s);
+                                       apply_dispatch(&s);
+                               }
+                               else if (c == 'k')
+                               {
+                                       XLogRecPtr      end_lsn;
+                                       TimestampTz timestamp;
+                                       bool            reply_requested;
 
-                                               if (last_received < end_lsn)
-                                                       last_received = end_lsn;
+                                       end_lsn = pq_getmsgint64(&s);
+                                       timestamp = pq_getmsgint64(&s);
+                                       reply_requested = pq_getmsgbyte(&s);
 
-                                               send_feedback(last_received, 
reply_requested, false);
-                                               
UpdateWorkerStats(last_received, timestamp, true);
-                                       }
-                                       /* other message types are purposefully 
ignored */
+                                       if (last_received < end_lsn)
+                                               last_received = end_lsn;
 
-                                       MemoryContextReset(ApplyMessageContext);
+                                       send_feedback(last_received, 
reply_requested, false);
+                                       UpdateWorkerStats(last_received, 
timestamp, true);
                                }
+                               /* other message types are purposefully ignored 
*/
 
+                               MemoryContextReset(ApplyMessageContext);
+                       }
+                       if (prefetch_buf_pos < prefetch_buf_used)
+                       {
+                               memcpy(&len, &prefetch_buf[prefetch_buf_pos], 
4);
+                               buf = &prefetch_buf[prefetch_buf_pos + 4];
+                               prefetch_buf_pos += 4 + len;
+                       }
+                       else if (prefetch_buf_used != 0 && no_more_data)
+                       {
+                               break;
+                       }
+                       else
+                       {
                                len = walrcv_receive(LogRepWorkerWalRcvConn, 
&buf, &fd);
                        }
                }
diff --git a/src/backend/utils/misc/guc_tables.c 
b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..3b254898663 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
+#include "replication/worker_internal.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"prefetch_replica_identity_only",
+                       PGC_SIGHUP,
+                       REPLICATION_SUBSCRIBERS,
+                       gettext_noop("Whether LR prefetch work should prefetch 
only replica identity index or all other indexes too."),
+                       NULL,
+               },
+               &prefetch_replica_identity_only,
+               true,
+               NULL, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"max_parallel_prefetch_workers_per_subscription",
+                       PGC_SIGHUP,
+                       REPLICATION_SUBSCRIBERS,
+                       gettext_noop("Maximum number of parallel prefetch 
workers per subscription."),
+                       NULL,
+               },
+               &max_parallel_prefetch_workers_per_subscription,
+               2, 0, MAX_LR_PREFETCH_WORKERS,
+               NULL, NULL, NULL
+       },
+
        {
                {"max_active_replication_origins",
                        PGC_POSTMASTER,
diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h
index 0b534e30603..88f5d2e4cc5 100644
--- a/src/include/nodes/lockoptions.h
+++ b/src/include/nodes/lockoptions.h
@@ -56,6 +56,10 @@ typedef enum LockTupleMode
        LockTupleNoKeyExclusive,
        /* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
        LockTupleExclusive,
+       /* Do not lock tuple */
+       LockTupleNoLock,
+       /* Try explusive lock, silent give up in case of conflict */
+       LockTupleTryExclusive,
 } LockTupleMode;
 
 #endif                                                 /* LOCKOPTIONS_H */
diff --git a/src/include/port/pg_iovec.h b/src/include/port/pg_iovec.h
index 90be3af449d..8fefeb8c245 100644
--- a/src/include/port/pg_iovec.h
+++ b/src/include/port/pg_iovec.h
@@ -53,6 +53,7 @@ struct iovec
 static inline ssize_t
 pg_preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset)
 {
+       pg_usleep(100);
 #if HAVE_DECL_PREADV
        /*
         * Avoid a small amount of argument copying overhead in the kernel if
diff --git a/src/include/replication/logicallauncher.h 
b/src/include/replication/logicallauncher.h
index 82b202f3305..19d1a8d466b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
 extern PGDLLIMPORT int max_logical_replication_workers;
 extern PGDLLIMPORT int max_sync_workers_per_subscription;
 extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
 
 extern void ApplyLauncherRegister(void);
 extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 30b2775952c..c6745e77efc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
         */
        PartialFileSetState fileset_state;
        FileSet         fileset;
+
+       /*
+        * Prefetch worker
+        */
+       bool            do_prefetch;
 } ParallelApplyWorkerShared;
 
 /*
@@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                                                
                bool only_running);
@@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                                                   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType 
action);
+
+#define isParallelApplyWorker(worker) ((worker)->in_use &&                     
\
                                                                           
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
 #define isTablesyncWorker(worker) ((worker)->in_use && \
                                                                   
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void);
 
 static inline bool
 am_tablesync_worker(void)

Attachment: apply-updates.sh
Description: Bourne shell script

Attachment: apply-inserts.sh
Description: Bourne shell script

Reply via email to