Completely rewritten version of prefetch patch.
Now prefetch workers do not try to apply LR application and then rollback transaction. They just perform indexes lookup and so prefetch index and referenced heap pages.
So no any hacks are needed to prevent lock conflicts and WAL logging.

Performance results are the following (test scenario was explained in previous message as well as used schell scripts):

update:
   prefetch (2):           5:20
   prefetch (4):           3:20
   prefetch (8):           2:05
   no prefetch:           8:30

insert:
   pk (4) prefetch:      9:55
   pk+sk(4) prefetch: 5:20
   pk+sk(8) prefetch: 3:08
   no prefetch:           9:20

The number in parentheses specifies number of prefetch workers.
For example to spawn 8 prefetch workers I used the following settings in postgresql.conf.replica:

prefetch_replica_identity_only=off
max_worker_processes=16
max_logical_replication_workers=16
max_parallel_apply_workers_per_subscription=8
max_parallel_prefetch_workers_per_subscription=8
port=54322


Also I run continuous test with long (3 hours) updates workload on publisher with logical replication to subscriber. And with 8 prefetch workers replica is able to caught up with primary where 10 backends are performing update! After the end of this updates replica was at the same state as primary while without prefetch it proceed only 1/2 of
generated WAL and it takes another 5:30  hours to catch up.



diff --git a/src/backend/executor/execReplication.c 
b/src/backend/executor/execReplication.c
index 53ddd25c42..3c50f17227 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -275,6 +275,47 @@ retry:
        return found;
 }
 
+/*
+ * Search the relation 'rel' for tuple using the index.
+ * Returns true if tuple is found.
+ */
+bool
+RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot,
+                                         TupleTableSlot *outslot)
+{
+       ScanKeyData skey[INDEX_MAX_KEYS];
+       int                     skey_attoff;
+       IndexScanDesc scan;
+       SnapshotData snap;
+       Relation        idxrel;
+       bool            found;
+
+       /* Do not do prefetch when there is no index */
+       if (!OidIsValid(idxoid))
+               return false;
+
+       /* Open the index. */
+       idxrel = index_open(idxoid, AccessShareLock);
+
+       InitDirtySnapshot(snap);
+
+       /* Build scan key. */
+       skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+       /* Start an index scan. */
+       scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+       index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+       /* Try to find the tuple */
+       found = index_getnext_slot(scan, ForwardScanDirection, outslot);
+
+       /* Cleanup */
+       index_endscan(scan);
+       index_close(idxrel, AccessShareLock);
+
+       return found;
+}
+
 /*
  * Compare the tuples in the slots by checking if they have equal values.
  */
diff --git a/src/backend/replication/logical/applyparallelworker.c 
b/src/backend/replication/logical/applyparallelworker.c
index d25085d351..a207a4acdc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -396,6 +396,57 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
        return true;
 }
 
+/*
+ * Try to get a parallel prefetch worker.
+ */
+ParallelApplyWorkerInfo *
+pa_launch_prefetch_worker(void)
+{
+       MemoryContext oldcontext;
+       bool            launched;
+       ParallelApplyWorkerInfo *winfo;
+
+       /*
+        * Start a new parallel prefetch worker.
+        *
+        * The worker info can be used for the lifetime of the worker process, 
so
+        * create it in a permanent context.
+        */
+       oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+       winfo = (ParallelApplyWorkerInfo *) 
palloc0(sizeof(ParallelApplyWorkerInfo));
+
+       /* Setup shared memory. */
+       if (!pa_setup_dsm(winfo))
+       {
+               MemoryContextSwitchTo(oldcontext);
+               pfree(winfo);
+               return NULL;
+       }
+
+       launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_PREFETCH,
+                                                                               
MyLogicalRepWorker->dbid,
+                                                                               
MySubscription->oid,
+                                                                               
MySubscription->name,
+                                                                               
MyLogicalRepWorker->userid,
+                                                                               
InvalidOid,
+                                                                               
dsm_segment_handle(winfo->dsm_seg));
+
+       if (launched)
+       {
+               winfo->do_prefetch = true;
+       }
+       else
+       {
+               pa_free_worker_info(winfo);
+               winfo = NULL;
+       }
+
+       MemoryContextSwitchTo(oldcontext);
+
+       return winfo;
+}
+
 /*
  * Try to get a parallel apply worker from the pool. If none is available then
  * start a new one.
@@ -943,20 +994,22 @@ ParallelApplyWorkerMain(Datum main_arg)
 
        InitializingApplyWorker = false;
 
-       /* Setup replication origin tracking. */
-       StartTransactionCommand();
-       ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+       if (am_parallel_apply_worker())
+       {
+               /* Setup replication origin tracking. */
+               StartTransactionCommand();
+               ReplicationOriginNameForLogicalRep(MySubscription->oid, 
InvalidOid,
                                                                           
originname, sizeof(originname));
-       originid = replorigin_by_name(originname, false);
-
-       /*
-        * The parallel apply worker doesn't need to monopolize this replication
-        * origin which was already acquired by its leader process.
-        */
-       replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-       replorigin_session_origin = originid;
-       CommitTransactionCommand();
+               originid = replorigin_by_name(originname, false);
 
+               /*
+                * The parallel apply worker doesn't need to monopolize this 
replication
+                * origin which was already acquired by its leader process.
+                */
+               replorigin_session_setup(originid, 
MyLogicalRepWorker->leader_pid);
+               replorigin_session_origin = originid;
+               CommitTransactionCommand();
+       }
        /*
         * Setup callback for syscache so that we know when something changes in
         * the subscription relation state.
@@ -1149,8 +1202,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size 
nbytes, const void *data)
        shm_mq_result result;
        TimestampTz startTime = 0;
 
-       Assert(!IsTransactionState());
-       Assert(!winfo->serialize_changes);
+       if (!winfo->do_prefetch)
+       {
+               Assert(!IsTransactionState());
+               Assert(!winfo->serialize_changes);
+       }
 
        /*
         * We don't try to send data to parallel worker for 'immediate' mode. 
This
@@ -1519,6 +1575,9 @@ pa_get_fileset_state(void)
 {
        PartialFileSetState fileset_state;
 
+       if (am_parallel_prefetch_worker())
+               return FS_EMPTY;
+
        Assert(am_parallel_apply_worker());
 
        SpinLockAcquire(&MyParallelShared->mutex);
diff --git a/src/backend/replication/logical/launcher.c 
b/src/backend/replication/logical/launcher.c
index 4aed0dfceb..ed6c057cec 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
 int                    max_logical_replication_workers = 4;
 int                    max_sync_workers_per_subscription = 2;
 int                    max_parallel_apply_workers_per_subscription = 2;
+int                    max_parallel_prefetch_workers_per_subscription = 2;
 
 LogicalRepWorker *MyLogicalRepWorker = NULL;
 
@@ -257,7 +258,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool 
only_running)
                LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
                /* Skip parallel apply workers. */
-               if (isParallelApplyWorker(w))
+               if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w))
                        continue;
 
                if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -322,6 +323,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
        TimestampTz now;
        bool            is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
        bool            is_parallel_apply_worker = (wtype == 
WORKERTYPE_PARALLEL_APPLY);
+       bool            is_parallel_prefetch_worker = (wtype == 
WORKERTYPE_PARALLEL_PREFETCH);
 
        /*----------
         * Sanity checks:
@@ -331,7 +333,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
         */
        Assert(wtype != WORKERTYPE_UNKNOWN);
        Assert(is_tablesync_worker == OidIsValid(relid));
-       Assert(is_parallel_apply_worker == (subworker_dsm != 
DSM_HANDLE_INVALID));
+       Assert((is_parallel_apply_worker|is_parallel_prefetch_worker) == 
(subworker_dsm != DSM_HANDLE_INVALID));
 
        ereport(DEBUG1,
                        (errmsg_internal("starting logical replication worker 
for subscription \"%s\"",
@@ -452,8 +454,8 @@ retry:
        worker->relstate = SUBREL_STATE_UNKNOWN;
        worker->relstate_lsn = InvalidXLogRecPtr;
        worker->stream_fileset = NULL;
-       worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
-       worker->parallel_apply = is_parallel_apply_worker;
+       worker->leader_pid = 
(is_parallel_apply_worker|is_parallel_prefetch_worker) ? MyProcPid : InvalidPid;
+       worker->parallel_apply = 
is_parallel_apply_worker|is_parallel_prefetch_worker;
        worker->last_lsn = InvalidXLogRecPtr;
        TIMESTAMP_NOBEGIN(worker->last_send_time);
        TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -492,6 +494,16 @@ retry:
                        memcpy(bgw.bgw_extra, &subworker_dsm, 
sizeof(dsm_handle));
                        break;
 
+               case WORKERTYPE_PARALLEL_PREFETCH:
+                       snprintf(bgw.bgw_function_name, BGW_MAXLEN, 
"ParallelApplyWorkerMain");
+                       snprintf(bgw.bgw_name, BGW_MAXLEN,
+                                        "logical replication parallel prefetch 
worker for subscription %u",
+                                        subid);
+                       snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication 
parallel worker");
+
+                       memcpy(bgw.bgw_extra, &subworker_dsm, 
sizeof(dsm_handle));
+                       break;
+
                case WORKERTYPE_TABLESYNC:
                        snprintf(bgw.bgw_function_name, BGW_MAXLEN, 
"TablesyncWorkerMain");
                        snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -626,7 +638,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 
        if (worker)
        {
-               Assert(!isParallelApplyWorker(worker));
+               Assert(!isParallelApplyWorker(worker) && 
!isParallelPrefetchWorker(worker));
                logicalrep_worker_stop_internal(worker, SIGTERM);
        }
 
@@ -774,7 +786,7 @@ logicalrep_worker_detach(void)
                {
                        LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-                       if (isParallelApplyWorker(w))
+                       if (isParallelApplyWorker(w) || 
isParallelPrefetchWorker(w))
                                logicalrep_worker_stop_internal(w, SIGTERM);
                }
 
@@ -1369,6 +1381,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
                        case WORKERTYPE_PARALLEL_APPLY:
                                values[9] = CStringGetTextDatum("parallel 
apply");
                                break;
+                       case WORKERTYPE_PARALLEL_PREFETCH:
+                               values[9] = CStringGetTextDatum("parallel 
prefetch");
+                               break;
                        case WORKERTYPE_TABLESYNC:
                                values[9] = CStringGetTextDatum("table 
synchronization");
                                break;
diff --git a/src/backend/replication/logical/tablesync.c 
b/src/backend/replication/logical/tablesync.c
index c90f23ee5b..f965317529 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -681,6 +681,9 @@ process_syncing_tables(XLogRecPtr current_lsn)
                         */
                        break;
 
+               case WORKERTYPE_PARALLEL_PREFETCH:
+                       break;
+
                case WORKERTYPE_TABLESYNC:
                        process_syncing_tables_for_sync(current_lsn);
                        break;
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index fd11805a44..db1f8bcebd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
 /* Are we initializing an apply worker? */
 bool           InitializingApplyWorker = false;
 
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = false;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the 
subskiplsn.
@@ -482,6 +494,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
                                        (rel->state == SUBREL_STATE_SYNCDONE &&
                                         rel->statelsn <= remote_final_lsn));
 
+               case WORKERTYPE_PARALLEL_PREFETCH:
+                       return true;
+
                case WORKERTYPE_UNKNOWN:
                        /* Should never happen. */
                        elog(ERROR, "Unknown worker type");
@@ -556,6 +571,11 @@ handle_streamed_transaction(LogicalRepMsgType action, 
StringInfo s)
        TransApplyAction apply_action;
        StringInfoData original_msg;
 
+       if (am_parallel_prefetch_worker())
+       {
+               return false;
+       }
+
        apply_action = get_transaction_apply_action(stream_xid, &winfo);
 
        /* not in streaming mode */
@@ -2487,13 +2507,36 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
                   !relinfo->ri_RelationDesc->rd_rel->relhasindex ||
                   RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
 
-       /* Caller will not have done this bit. */
-       Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
-       InitConflictIndexes(relinfo);
+       if (am_parallel_prefetch_worker())
+       {
+               Relation localrel = relinfo->ri_RelationDesc;
+               TupleTableSlot *localslot = table_slot_create(localrel, 
&estate->es_tupleTable);
+               LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+               if (prefetch_replica_identity_only)
+               {
+                       (void)RelationPrefetchIndex(localrel, 
relmapentry->localindexoid, remoteslot, localslot);
+               }
+               else
+               {
+                       for (int i = 0; i < relinfo->ri_NumIndices; i++)
+                       {
+                               Oid sec_index_oid = 
RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+                               (void)RelationPrefetchIndex(localrel, 
sec_index_oid, remoteslot, localslot);
+                       }
+               }
+               lr_prefetch_inserts += 1;
+       }
+       else
+       {
+               /* Caller will not have done this bit. */
+               Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+               InitConflictIndexes(relinfo);
 
-       /* Do the insert. */
-       TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
-       ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+               /* Do the insert. */
+               TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+               ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+       }
 }
 
 /*
@@ -2677,6 +2720,32 @@ apply_handle_update_internal(ApplyExecutionData *edata,
        EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
        ExecOpenIndices(relinfo, false);
 
+       if (am_parallel_prefetch_worker())
+       {
+               /*
+                * While it may be reasonable to prefetch indexes for both old 
and new tuples,
+                * we do it only for one of them (old if it exists,  new 
otherwise), assuming
+                * that probability that index key is changed is quite small
+                */
+               localslot = table_slot_create(localrel, &estate->es_tupleTable);
+               found = RelationPrefetchIndex(localrel, localindexoid, 
remoteslot, localslot);
+               if (found)
+                       lr_prefetch_hits += 1;
+               else
+                       lr_prefetch_misses += 1;
+               if (!prefetch_replica_identity_only)
+               {
+                       for (int i = 0; i < relinfo->ri_NumIndices; i++)
+                       {
+                               Oid sec_index_oid = 
RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+                               if (sec_index_oid != localindexoid)
+                               {
+                                       (void)RelationPrefetchIndex(localrel, 
sec_index_oid, remoteslot, localslot);
+                               }
+                       }
+               }
+               goto Cleanup;
+       }
        found = FindReplTupleInLocalRel(edata, localrel,
                                                                        
&relmapentry->remoterel,
                                                                        
localindexoid,
@@ -2739,7 +2808,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                                                        remoteslot, newslot, 
list_make1(&conflicttuple));
        }
 
-       /* Cleanup. */
+  Cleanup:
        ExecCloseIndices(relinfo);
        EvalPlanQualEnd(&epqstate);
 }
@@ -2864,6 +2933,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
                   !localrel->rd_rel->relhasindex ||
                   RelationGetIndexList(localrel) == NIL);
 
+       if (am_parallel_prefetch_worker())
+       {
+               localslot = table_slot_create(localrel, &estate->es_tupleTable);
+               found = RelationPrefetchIndex(localrel, localindexoid, 
remoteslot, localslot);
+               if (found)
+                       lr_prefetch_hits += 1;
+               else
+                       lr_prefetch_misses += 1;
+               /* No need to prefdetch other indexes because the are not 
touched during delete */
+               goto Cleanup;
+       }
        found = FindReplTupleInLocalRel(edata, localrel, remoterel, 
localindexoid,
                                                                        
remoteslot, &localslot);
 
@@ -2900,7 +2980,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
                                                        remoteslot, NULL, 
list_make1(&conflicttuple));
        }
 
-       /* Cleanup. */
+  Cleanup:
        EvalPlanQualEnd(&epqstate);
 }
 
@@ -3567,6 +3647,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz 
send_time, bool reply)
        }
 }
 
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+       ParallelApplyWorkerInfo* winfo;
+
+       if (buf[0] != 'w')
+               return;
+
+       switch (buf[MSG_CODE_OFFSET])
+       {
+               case LOGICAL_REP_MSG_INSERT:
+               case LOGICAL_REP_MSG_UPDATE:
+               case LOGICAL_REP_MSG_DELETE:
+                       /* Round robin prefetch worker */
+                       winfo = prefetch_worker[prefetch_worker_rr++ % 
n_prefetch_workers];
+                       pa_send_data(winfo, len, buf);
+                       break;
+
+               case LOGICAL_REP_MSG_TYPE:
+               case LOGICAL_REP_MSG_RELATION:
+                       /* broadcast to all prefetch workers */
+                       for (int i = 0; i < n_prefetch_workers; i++)
+                       {
+                               winfo = prefetch_worker[i];
+                               pa_send_data(winfo, len, buf);
+                       }
+                       break;
+
+               default:
+                       /* Ignore other messages */
+                       break;
+       }
+}
+
 /*
  * Apply main loop.
  */
@@ -3577,6 +3693,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        bool            ping_sent = false;
        TimeLineID      tli;
        ErrorContextCallback errcallback;
+       char* prefetch_buf = NULL;
+       size_t prefetch_buf_pos = 0;
+       size_t prefetch_buf_used = 0;
+       size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
 
        /*
         * Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3714,23 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                                                                
                        "LogicalStreamingContext",
                                                                                
                        ALLOCSET_DEFAULT_SIZES);
 
+       if (max_parallel_prefetch_workers_per_subscription != 0)
+       {
+               int i;
+               for (i = 0; i < max_parallel_prefetch_workers_per_subscription; 
i++)
+               {
+                       prefetch_worker[i] = pa_launch_prefetch_worker();
+                       if (!prefetch_worker[i])
+                       {
+                               elog(LOG, "Launch only %d prefetch workers from 
%d",
+                                        i, 
max_parallel_prefetch_workers_per_subscription);
+                               break;
+                       }
+               }
+               n_prefetch_workers = i;
+               prefetch_buf = palloc(prefetch_buf_size);
+       }
+
        /* mark as idle, before starting to loop */
        pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -3611,9 +3748,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        {
                pgsocket        fd = PGINVALID_SOCKET;
                int                     rc;
-               int                     len;
-               char       *buf = NULL;
+               int32           len;
+               char            *buf = NULL;
                bool            endofstream = false;
+               bool            no_more_data = false;
                long            wait_time;
 
                CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3760,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
-               if (len != 0)
+               /* Loop to process all available data (without blocking). */
+               for (;;)
                {
-                       /* Loop to process all available data (without 
blocking). */
-                       for (;;)
-                       {
-                               CHECK_FOR_INTERRUPTS();
+                       CHECK_FOR_INTERRUPTS();
 
-                               if (len == 0)
+                       if (len > 0 && n_prefetch_workers != 0 && 
prefetch_buf_pos == prefetch_buf_used)
+                       {
+                               prefetch_buf_used = 0;
+                               do
                                {
-                                       break;
-                               }
-                               else if (len < 0)
+                                       if (prefetch_buf_used + len + 4 > 
prefetch_buf_size)
+                                       {
+                                               prefetch_buf_size *= 2;
+                                               elog(DEBUG1, "Increase prefetch 
buffer size to %ld", prefetch_buf_size);
+                                               prefetch_buf = 
repalloc(prefetch_buf, prefetch_buf_size);
+                                       }
+                                       
memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+                                       
memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+                                       prefetch_buf_used += 4 + len;
+                                       if (prefetch_buf_used >= 
INIT_PREFETCH_BUF_SIZE)
+                                               break;
+                                       len = 
walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+                               } while (len > 0);
+
+                               no_more_data = len <= 0;
+
+                               for (prefetch_buf_pos = 0; prefetch_buf_pos < 
prefetch_buf_used; prefetch_buf_pos += 4 + len)
                                {
-                                       ereport(LOG,
-                                                       (errmsg("data stream 
from publisher has ended")));
-                                       endofstream = true;
-                                       break;
+                                       memcpy(&len, 
&prefetch_buf[prefetch_buf_pos], 4);
+                                       
lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
                                }
-                               else
-                               {
-                                       int                     c;
-                                       StringInfoData s;
+                               memcpy(&len, prefetch_buf, 4);
+                               buf = &prefetch_buf[4];
+                               prefetch_buf_pos = len + 4;
+                       }
 
-                                       if (ConfigReloadPending)
-                                       {
-                                               ConfigReloadPending = false;
-                                               ProcessConfigFile(PGC_SIGHUP);
-                                       }
+                       if (len == 0)
+                       {
+                               break;
+                       }
+                       else if (len < 0)
+                       {
+                               ereport(LOG,
+                                               (errmsg("data stream from 
publisher has ended")));
+                               endofstream = true;
+                               break;
+                       }
+                       else
+                       {
+                               int                     c;
+                               StringInfoData s;
 
-                                       /* Reset timeout. */
-                                       last_recv_timestamp = 
GetCurrentTimestamp();
-                                       ping_sent = false;
+                               if (ConfigReloadPending)
+                               {
+                                       ConfigReloadPending = false;
+                                       ProcessConfigFile(PGC_SIGHUP);
+                               }
 
-                                       /* Ensure we are reading the data into 
our memory context. */
-                                       
MemoryContextSwitchTo(ApplyMessageContext);
+                               /* Reset timeout. */
+                               last_recv_timestamp = GetCurrentTimestamp();
+                               ping_sent = false;
 
-                                       initReadOnlyStringInfo(&s, buf, len);
+                               /* Ensure we are reading the data into our 
memory context. */
+                               MemoryContextSwitchTo(ApplyMessageContext);
 
-                                       c = pq_getmsgbyte(&s);
+                               initReadOnlyStringInfo(&s, buf, len);
 
-                                       if (c == 'w')
-                                       {
-                                               XLogRecPtr      start_lsn;
-                                               XLogRecPtr      end_lsn;
-                                               TimestampTz send_time;
+                               c = pq_getmsgbyte(&s);
 
-                                               start_lsn = pq_getmsgint64(&s);
-                                               end_lsn = pq_getmsgint64(&s);
-                                               send_time = pq_getmsgint64(&s);
+                               if (c == 'w')
+                               {
+                                       XLogRecPtr      start_lsn;
+                                       XLogRecPtr      end_lsn;
+                                       TimestampTz send_time;
 
-                                               if (last_received < start_lsn)
-                                                       last_received = 
start_lsn;
+                                       start_lsn = pq_getmsgint64(&s);
+                                       end_lsn = pq_getmsgint64(&s);
+                                       send_time = pq_getmsgint64(&s);
 
-                                               if (last_received < end_lsn)
-                                                       last_received = end_lsn;
+                                       if (last_received < start_lsn)
+                                               last_received = start_lsn;
 
-                                               
UpdateWorkerStats(last_received, send_time, false);
+                                       if (last_received < end_lsn)
+                                               last_received = end_lsn;
 
-                                               apply_dispatch(&s);
-                                       }
-                                       else if (c == 'k')
-                                       {
-                                               XLogRecPtr      end_lsn;
-                                               TimestampTz timestamp;
-                                               bool            reply_requested;
+                                       UpdateWorkerStats(last_received, 
send_time, false);
 
-                                               end_lsn = pq_getmsgint64(&s);
-                                               timestamp = pq_getmsgint64(&s);
-                                               reply_requested = 
pq_getmsgbyte(&s);
+                                       apply_dispatch(&s);
+                               }
+                               else if (c == 'k')
+                               {
+                                       XLogRecPtr      end_lsn;
+                                       TimestampTz timestamp;
+                                       bool            reply_requested;
 
-                                               if (last_received < end_lsn)
-                                                       last_received = end_lsn;
+                                       end_lsn = pq_getmsgint64(&s);
+                                       timestamp = pq_getmsgint64(&s);
+                                       reply_requested = pq_getmsgbyte(&s);
 
-                                               send_feedback(last_received, 
reply_requested, false);
-                                               
UpdateWorkerStats(last_received, timestamp, true);
-                                       }
-                                       /* other message types are purposefully 
ignored */
+                                       if (last_received < end_lsn)
+                                               last_received = end_lsn;
 
-                                       MemoryContextReset(ApplyMessageContext);
+                                       send_feedback(last_received, 
reply_requested, false);
+                                       UpdateWorkerStats(last_received, 
timestamp, true);
                                }
+                               /* other message types are purposefully ignored 
*/
 
+                               MemoryContextReset(ApplyMessageContext);
+                       }
+                       if (prefetch_buf_pos < prefetch_buf_used)
+                       {
+                               memcpy(&len, &prefetch_buf[prefetch_buf_pos], 
4);
+                               buf = &prefetch_buf[prefetch_buf_pos + 4];
+                               prefetch_buf_pos += 4 + len;
+                       }
+                       else if (prefetch_buf_used != 0 && no_more_data)
+                       {
+                               break;
+                       }
+                       else
+                       {
                                len = walrcv_receive(LogRepWorkerWalRcvConn, 
&buf, &fd);
                        }
                }
@@ -3926,6 +4104,10 @@ send_feedback(XLogRecPtr recvpos, bool force, bool 
requestReply)
 static void
 apply_worker_exit(void)
 {
+       /* Don't restart prefetch workers */
+       if (am_parallel_prefetch_worker())
+               return;
+
        if (am_parallel_apply_worker())
        {
                /*
@@ -4729,6 +4911,10 @@ InitializeLogRepWorker(void)
                                (errmsg("logical replication table 
synchronization worker for subscription \"%s\", table \"%s\" has started",
                                                MySubscription->name,
                                                
get_rel_name(MyLogicalRepWorker->relid))));
+       else if (am_parallel_prefetch_worker())
+               ereport(LOG,
+                               (errmsg("logical replication prefetch worker 
for subscription \"%s\" has started",
+                                               MySubscription->name)));
        else
                ereport(LOG,
                                (errmsg("logical replication apply worker for 
subscription \"%s\" has started",
diff --git a/src/backend/utils/misc/guc_tables.c 
b/src/backend/utils/misc/guc_tables.c
index 511dc32d51..b3812d7e0e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
 #include "replication/slot.h"
 #include "replication/slotsync.h"
 #include "replication/syncrep.h"
+#include "replication/worker_internal.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"prefetch_replica_identity_only",
+                       PGC_SIGHUP,
+                       REPLICATION_SUBSCRIBERS,
+                       gettext_noop("Whether LR prefetch work should prefetch 
only replica identity index or all other indexes too."),
+                       NULL,
+               },
+               &prefetch_replica_identity_only,
+               false,
+               NULL, NULL, NULL
+       },
+
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"max_parallel_prefetch_workers_per_subscription",
+                       PGC_SIGHUP,
+                       REPLICATION_SUBSCRIBERS,
+                       gettext_noop("Maximum number of parallel prefetch 
workers per subscription."),
+                       NULL,
+               },
+               &max_parallel_prefetch_workers_per_subscription,
+               2, 0, MAX_LR_PREFETCH_WORKERS,
+               NULL, NULL, NULL
+       },
+
        {
                {"max_active_replication_origins",
                        PGC_POSTMASTER,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544..3403128977 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -759,6 +759,8 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid 
idxoid,
                                                                                
 TupleTableSlot *outslot);
 extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
                                                                         
TupleTableSlot *searchslot, TupleTableSlot *outslot);
+extern bool RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot 
*searchslot,
+                                                                 
TupleTableSlot *outslot);
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
                                                                         EState 
*estate, TupleTableSlot *slot);
diff --git a/src/include/replication/logicallauncher.h 
b/src/include/replication/logicallauncher.h
index 82b202f330..19d1a8d466 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
 extern PGDLLIMPORT int max_logical_replication_workers;
 extern PGDLLIMPORT int max_sync_workers_per_subscription;
 extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
 
 extern void ApplyLauncherRegister(void);
 extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h 
b/src/include/replication/worker_internal.h
index 30b2775952..7f5b4fa51b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -32,6 +32,7 @@ typedef enum LogicalRepWorkerType
        WORKERTYPE_TABLESYNC,
        WORKERTYPE_APPLY,
        WORKERTYPE_PARALLEL_APPLY,
+       WORKERTYPE_PARALLEL_PREFETCH,
 } LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
@@ -214,6 +215,12 @@ typedef struct ParallelApplyWorkerInfo
         */
        bool            in_use;
 
+
+       /*
+        * Performing prefetch of pages accessed by LR operations
+        */
+       bool            do_prefetch;
+
        ParallelApplyWorkerShared *shared;
 } ParallelApplyWorkerInfo;
 
@@ -237,6 +244,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
 
 extern PGDLLIMPORT bool InitializingApplyWorker;
 
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                                                
                bool only_running);
@@ -326,10 +341,13 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                                                   XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+#define isParallelApplyWorker(worker) ((worker)->in_use &&                     
\
                                                                           
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isParallelPrefetchWorker(worker) ((worker)->in_use &&                  
\
+                                                                               
  (worker)->type == WORKERTYPE_PARALLEL_PREFETCH)
 #define isTablesyncWorker(worker) ((worker)->in_use && \
                                                                   
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_prefetch_worker(void);
 
 static inline bool
 am_tablesync_worker(void)
@@ -351,4 +369,11 @@ am_parallel_apply_worker(void)
        return isParallelApplyWorker(MyLogicalRepWorker);
 }
 
+static inline bool
+am_parallel_prefetch_worker(void)
+{
+       Assert(MyLogicalRepWorker->in_use);
+       return isParallelPrefetchWorker(MyLogicalRepWorker);
+}
+
 #endif                                                 /* WORKER_INTERNAL_H */

Reply via email to