On Mon, May 30, 2022 at 2:22 PM wangw.f...@fujitsu.com <wangw.f...@fujitsu.com> wrote: > > Attach the new patches(only changed 0001 and 0002) >
Few comments/suggestions for 0001 and 0003 ===================================== 0001 -------- 1. + else + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication apply worker for subscription %u", subid); Can we slightly change the message to: "logical replication background apply worker for subscription %u"? 2. Can we think of separating the new logic for applying the xact by bgworker into a new file like applybgwroker or applyparallel? We have previously done the same in the case of vacuum (see vacuumparallel.c). 3. + /* + * XXX The publisher side doesn't always send relation update messages + * after the streaming transaction, so update the relation in main + * apply worker here. + */ + if (action == LOGICAL_REP_MSG_RELATION) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + logicalrep_relmap_update(rel); + } I think the publisher side won't send the relation update message after streaming transaction only if it has already been sent for a non-streaming transaction in which case we don't need to update the local cache here. This is as per my understanding of maybe_send_schema(), do let me know if I am missing something? If my understanding is correct then we don't need this change. 4. + * For the main apply worker, if in streaming mode (receiving a block of + * streamed transaction), we send the data to the apply background worker. * - * If in streaming mode (receiving a block of streamed transaction), we - * simply redirect it to a file for the proper toplevel transaction. This comment is slightly confusing. Can we change it to something like: "In streaming case (receiving a block of streamed transaction), for SUBSTREAM_ON mode, we simply redirect it to a file for the proper toplevel transaction, and for SUBSTREAM_APPLY mode, we send the changes to background apply worker."? 5. +apply_handle_stream_abort(StringInfo s) { ... ... + /* + * If the two XIDs are the same, it's in fact abort of toplevel xact, + * so just free the subxactlist. + */ + if (subxid == xid) + { + set_apply_error_context_xact(subxid, InvalidXLogRecPtr); - fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, - false); + AbortCurrentTransaction(); - buffer = palloc(BLCKSZ); + EndTransactionBlock(false); + CommitTransactionCommand(); + + in_remote_transaction = false; ... ... } Here, can we update the replication origin as we are doing in apply_handle_rollback_prepared? Currently, we don't do it because we are just cleaning up temporary files for which we don't even have a transaction. Also, we don't have the required infrastructure to advance origins for aborts as we have for abort prepared. See commits [1eb6d6527a][8a812e5106]. If we think it is a good idea then I think we need to send abort_lsn and abort_time from the publisher and we need to be careful to make it work with lower subscriber versions that don't have the facility to process these additional values. 0003 -------- 6. + /* + * If any unique index exist, check that they are same as remoterel. + */ + if (!rel->sameunique) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate relation with different unique index"), + errhint("Please change the streaming option to 'on' instead of 'apply'."))); I think we can do better here. Instead of simply erroring out and asking the user to change streaming mode, we can remember this in the system catalog probably in pg_subscription, and then on restart, we can change the streaming mode to 'on', perform the transaction, and again change the streaming mode to apply. I am not sure whether we want to do it in the first version or not, so if you agree with this, developing it as a separate patch would be a good idea. Also, please update comments here as to why we don't handle such cases. -- With Regards, Amit Kapila.