Dear Hou,

Thanks for updating the patch! Followings are my comments.

===
01. applyparallelworker.c - SIZE_STATS_MESSAGE

```
/*
 * There are three fields in each message received by the parallel apply
 * worker: start_lsn, end_lsn and send_time. Because we have updated these
 * statistics in the leader apply worker, we can ignore these fields in the
 * parallel apply worker (see function LogicalRepApplyLoop).
 */
#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
```

According to other comment styles, it seems that the first sentence of the 
comment should
represent the datatype and usage, not the detailed reason.
For example, about ParallelApplyWorkersList, you said "A list ...". How about 
adding like following message:
The message size that can be skipped by parallel apply worker


~~~
02. applyparallelworker.c - parallel_apply_start_subtrans

```
        if (current_xid != top_xid &&
                !list_member_xid(subxactlist, current_xid))
```

A macro TransactionIdEquals is defined in access/transam.h. Should we use it, 
or is it too trivial?


~~~
03. applyparallelwprker.c - LogicalParallelApplyLoop

```
                        case SHM_MQ_WOULD_BLOCK:
                                {
                                        int                     rc;

                                        if (!in_streamed_transaction)
                                        {
                                                /*
                                                 * If we didn't get any 
transactions for a while there might be
                                                 * unconsumed invalidation 
messages in the queue, consume them
                                                 * now.
                                                 */
                                                AcceptInvalidationMessages();
                                                maybe_reread_subscription();
                                        }

                                        MemoryContextReset(ApplyMessageContext);
```

Is MemoryContextReset() needed? IIUC no one uses ApplyMessageContext if we 
reach here.


~~~
04. applyparallelwprker.c - HandleParallelApplyMessages

```
                else if (res == SHM_MQ_SUCCESS)
                {
                        StringInfoData msg;

                        initStringInfo(&msg);
                        appendBinaryStringInfo(&msg, data, nbytes);
                        HandleParallelApplyMessage(winfo, &msg);
                        pfree(msg.data);
                }
```

In LogicalParallelApplyLoop(), appendBinaryStringInfo() is not used
but initialized StringInfoData directly initialized. Why there is a difrerence?
The function will do repalloc() and memcpy(), so it may be inefficient.


~~~
05. applyparallelwprker.c - parallel_apply_send_data

```
        if (result != SHM_MQ_SUCCESS)
                ereport(ERROR,
                                
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("could not send data to shared-memory 
queue")));

```

I checked the enumeration of shm_mq_result, and I felt that shm_mq_send(nowait 
= false) failed
only when the opposite process has been exited.
How about add a hint or detailed message like "lost connection to parallel 
apply worker"?


===
06. worker.c - nchanges

```
/*
 * The number of changes sent to parallel apply workers during one streaming
 * block.
 */
static uint32 nchanges = 0;
```

I found that the name "nchanges" has been already used in 
apply_spooled_messages().
It works well because the local variable is always used
when name collision between local and global variables is occurred, but I think 
it may be confused.


~~~
07. worker.c - apply_handle_commit_internal

I think we can add an assertion like Assert(replorigin_session_origin_lsn != 
InvalidXLogRecPtr && replorigin_session_origin = InvalidRepOriginId),
to avoid missing replorigin_session_setup. Previously it was set at the entry 
point at never reset.


~~~
08. worker.c - apply_handle_prepare_internal

Same as above.


~~~
09. worker.c - maybe_reread_subscription

```
        /*
         * Exit if any parameter that affects the remote connection was changed.
         * The launcher will start a new worker.
         */
        if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
                strcmp(newsub->name, MySubscription->name) != 0 ||
                strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
                newsub->binary != MySubscription->binary ||
                newsub->stream != MySubscription->stream ||
                strcmp(newsub->origin, MySubscription->origin) != 0 ||
                newsub->owner != MySubscription->owner ||
                !equal(newsub->publications, MySubscription->publications))
        {
                ereport(LOG,
                                (errmsg("logical replication apply worker for 
subscription \"%s\" will restart because of a parameter change",
                                                MySubscription->name)));

                proc_exit(0);
        }
```

When the parallel apply worker has been launched and then the subscription 
option has been modified,
the same message will appear twice.
But if the option "streaming" is changed from "parallel" to "on", one of them 
will not restart again.
Should we modify message?


===
10. general

IIUC parallel apply workers could not detect the deadlock automatically, right?
I thought we might be able to use the heartbeat protocol between a leader 
worker and parallel workers.
 
You have already implemented a mechanism to send and receive messages between 
workers.
My idea is that each parallel apply worker records a timestamp that gets a 
message from the leader
and if a certain time (30s?) has passed it sends a heartbeat message like 'H'.
The leader consumes 'H' and sends a reply like LOGICAL_REP_MSG_KEEPALIVE in 
HandleParallelApplyMessage().
If the parallel apply worker does not receive any message for more than one 
minute,
it regards that the deadlock has occurred and can change the retry flag to on 
and exit.

The above assumes that the leader cannot reply to the message while waiting for 
the lock.
Moreover, it may have notable overhead and we must use a new logical 
replication message type.

How do you think? Have you already considered about it?

Best Regards,
Hayato Kuroda
FUJITSU LIMITED

Reply via email to