On Tue, Aug 31, 2021 at 3:20 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Fri, Aug 27, 2021 at 12:04 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > > > > Few comments on v6-0002* > ========================= > 1. > -BufFileDeleteFileSet(FileSet *fileset, const char *name) > +BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok) > { > char segment_name[MAXPGPATH]; > int segment = 0; > @@ -358,7 +369,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) > for (;;) > { > FileSetSegmentName(segment_name, name, segment); > - if (!FileSetDelete(fileset, segment_name, true)) > + if (!FileSetDelete(fileset, segment_name, !missing_ok)) > > I don't think the usage of missing_ok is correct here. If you see > FileSetDelete->PathNameDeleteTemporaryFile, it already tolerates that > the file doesn't exist but gives an error only when it is unable to > link. So, with this missing_ok users (say like worker.c) won't even > get errors when they are not able to remove files whereas I think the > need for the patch is to not get an error when the file doesn't exist. > I think you don't need to change anything in the way we invoke > FileSetDelete.
Right, fixed. > 2. > -static HTAB *xidhash = NULL; > +static FileSet *stream_fileset = NULL; > > Can we keep this in LogicalRepWorker and initialize it accordingly? Done > 3. > + /* Open the subxact file, if it does not exist, create it. */ > + fd = BufFileOpenFileSet(stream_fileset, path, O_RDWR, true); > + if (fd == NULL) > + fd = BufFileCreateFileSet(stream_fileset, path); > > I think retaining the existing comment: "Create the subxact file if it > not already created, otherwise open the existing file." seems better > here. Done > 4. > /* > - * If there is no subtransaction then nothing to do, but if already have > - * subxact file then delete that. > + * If there are no subtransactions, there is nothing to be done, but if > + * subxacts already exist, delete it. > */ > > How about changing the above comment to something like: "Delete the > subxacts file, if exists"? Done > 5. Can we slightly change the commit message as: > Optimize fileset usage in apply worker. > > Use one fileset for the entire worker lifetime instead of using > separate filesets for each streaming transaction. Now, the > changes/subxacts files for every streaming transaction will be created > under the same fileset and the files will be deleted after the > transaction is completed. > > This patch extends the BufFileOpenFileSet and BufFileDeleteFileSet > APIs to allow users to specify whether to give an error on missing > files. Done -- Regards, Dilip Kumar EnterpriseDB: http://www.enterprisedb.com
From fade8ae2779e8aeda0d1fee3902a93eaedb0187f Mon Sep 17 00:00:00 2001 From: Dilip Kumar <dilipkumar@localhost.localdomain> Date: Fri, 27 Aug 2021 11:49:12 +0530 Subject: [PATCH v7] Optimize fileset usage in apply worker Use one fileset for the entire worker lifetime instead of using separate filesets for each streaming transaction. Now, the changes/subxacts files for every streaming transaction will be created under the same fileset and the files will be deleted after the transaction is completed. This patch extends the BufFileOpenFileSet and BufFileDeleteFileSet APIs to allow users to specify whether to give an error on missing files. --- src/backend/replication/logical/launcher.c | 6 +- src/backend/replication/logical/worker.c | 249 +++++------------------------ src/backend/storage/file/buffile.c | 21 ++- src/backend/utils/sort/logtape.c | 2 +- src/backend/utils/sort/sharedtuplestore.c | 3 +- src/include/replication/worker_internal.h | 11 +- src/include/storage/buffile.h | 5 +- 7 files changed, 80 insertions(+), 217 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8b1772d..3fb4caa 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -379,6 +379,7 @@ retry: worker->relid = relid; worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; + worker->stream_fileset = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); @@ -648,8 +649,9 @@ logicalrep_worker_onexit(int code, Datum arg) logicalrep_worker_detach(); - /* Cleanup filesets used for streaming transactions. */ - logicalrep_worker_cleanupfileset(); + /* Cleanup fileset used for streaming transactions. */ + if (MyLogicalRepWorker->stream_fileset != NULL) + FileSetDeleteAll(MyLogicalRepWorker->stream_fileset); ApplyLauncherWakeup(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index bfb7d1a..a222cb3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .ts = 0, }; -/* - * Stream xid hash entry. Whenever we see a new xid we create this entry in the - * xidhash and along with it create the streaming file and store the fileset handle. - * The subxact file is created iff there is any subxact info under this xid. This - * entry is used on the subsequent streams for the xid to get the corresponding - * fileset handles, so storing them in hash makes the search faster. - */ -typedef struct StreamXidHash -{ - TransactionId xid; /* xid is the hash key and must be first */ - FileSet *stream_fileset; /* file set for stream data */ - FileSet *subxact_fileset; /* file set for subxact info */ -} StreamXidHash; - static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; @@ -269,12 +255,6 @@ static bool in_streamed_transaction = false; static TransactionId stream_xid = InvalidTransactionId; -/* - * Hash table for storing the streaming xid information along with filesets - * for streaming and subxact files. - */ -static HTAB *xidhash = NULL; - /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -1118,7 +1098,6 @@ static void apply_handle_stream_start(StringInfo s) { bool first_segment; - HASHCTL hash_ctl; if (in_streamed_transaction) ereport(ERROR, @@ -1148,17 +1127,20 @@ apply_handle_stream_start(StringInfo s) set_apply_error_context_xact(stream_xid, 0); /* - * Initialize the xidhash table if we haven't yet. This will be used for + * Initialize the stream_fileset if we haven't yet. This will be used for * the entire duration of the apply worker so create it in permanent * context. */ - if (xidhash == NULL) + if (MyLogicalRepWorker->stream_fileset == NULL) { - hash_ctl.keysize = sizeof(TransactionId); - hash_ctl.entrysize = sizeof(StreamXidHash); - hash_ctl.hcxt = ApplyContext; - xidhash = hash_create("StreamXidHash", 1024, &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet)); + FileSetInit(MyLogicalRepWorker->stream_fileset); + + MemoryContextSwitchTo(oldctx); } /* open the spool file for this transaction */ @@ -1253,7 +1235,6 @@ apply_handle_stream_abort(StringInfo s) BufFile *fd; bool found = false; char path[MAXPGPATH]; - StreamXidHash *ent; set_apply_error_context_xact(subxid, 0); @@ -1285,19 +1266,10 @@ apply_handle_stream_abort(StringInfo s) return; } - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* open the changes file */ changes_filename(path, MyLogicalRepWorker->subid, xid); - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, + O_RDWR, false); /* OK, truncate the file at the right offset */ BufFileTruncateFileSet(fd, subxact_data.subxacts[subidx].fileno, @@ -1327,7 +1299,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) int nchanges; char path[MAXPGPATH]; char *buffer = NULL; - StreamXidHash *ent; MemoryContext oldcxt; BufFile *fd; @@ -1345,17 +1316,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - - fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, + false); buffer = palloc(BLCKSZ); initStringInfo(&s2); @@ -2542,30 +2504,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) } /* - * Cleanup filesets. - */ -void -logicalrep_worker_cleanupfileset(void) -{ - HASH_SEQ_STATUS status; - StreamXidHash *hentry; - - /* Remove all the pending stream and subxact filesets. */ - if (xidhash) - { - hash_seq_init(&status, xidhash); - while ((hentry = (StreamXidHash *) hash_seq_search(&status)) != NULL) - { - FileSetDeleteAll(hentry->stream_fileset); - - /* Delete the subxact fileset iff it is created. */ - if (hentry->subxact_fileset) - FileSetDeleteAll(hentry->subxact_fileset); - } - } -} - -/* * Apply main loop. */ static void @@ -3026,58 +2964,30 @@ subxact_info_write(Oid subid, TransactionId xid) { char path[MAXPGPATH]; Size len; - StreamXidHash *ent; BufFile *fd; Assert(TransactionIdIsValid(xid)); - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - /* By this time we must have created the transaction entry */ - Assert(ent); + /* Get the subxact filename. */ + subxact_filename(path, subid, xid); - /* - * If there is no subtransaction then nothing to do, but if already have - * subxact file then delete that. - */ + /* Delete the subxacts file, if exists. */ if (subxact_data.nsubxacts == 0) { - if (ent->subxact_fileset) - { - cleanup_subxact_info(); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } + cleanup_subxact_info(); + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); + return; } - subxact_filename(path, subid, xid); - /* * Create the subxact file if it not already created, otherwise open the * existing file. */ - if (ent->subxact_fileset == NULL) - { - MemoryContext oldctx; - - /* - * We need to maintain fileset across multiple stream start/stop - * calls. So, need to allocate it in a persistent context. - */ - oldctx = MemoryContextSwitchTo(ApplyContext); - ent->subxact_fileset = palloc(sizeof(FileSet)); - FileSetInit(ent->subxact_fileset); - MemoryContextSwitchTo(oldctx); - - fd = BufFileCreateFileSet(ent->subxact_fileset, path); - } - else - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDWR); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDWR, + true); + if (fd == NULL) + fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, path); len = sizeof(SubXactInfo) * subxact_data.nsubxacts; @@ -3104,34 +3014,21 @@ subxact_info_read(Oid subid, TransactionId xid) char path[MAXPGPATH]; Size len; BufFile *fd; - StreamXidHash *ent; MemoryContext oldctx; Assert(!subxact_data.subxacts); Assert(subxact_data.nsubxacts == 0); Assert(subxact_data.nsubxacts_max == 0); - /* Find the stream xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* - * If subxact_fileset is not valid that mean we don't have any subxact - * info + * Open the subxact file for the input streaming xid, just return if the + * file does not exist. */ - if (ent->subxact_fileset == NULL) - return; - subxact_filename(path, subid, xid); - - fd = BufFileOpenFileSet(ent->subxact_fileset, path, O_RDONLY); + fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, path, O_RDONLY, + true); + if (fd == NULL) + return; /* read number of subxact items */ if (BufFileRead(fd, &subxact_data.nsubxacts, @@ -3267,42 +3164,20 @@ changes_filename(char *path, Oid subid, TransactionId xid) * Cleanup files for a subscription / toplevel transaction. * * Remove files with serialized changes and subxact info for a particular - * toplevel transaction. Each subscription has a separate set of files. + * toplevel transaction. Each subscription has a separate file. */ static void stream_cleanup_files(Oid subid, TransactionId xid) { char path[MAXPGPATH]; - StreamXidHash *ent; - - /* Find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_FIND, - NULL); - if (!ent) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("transaction %u not found in stream XID hash table", - xid))); - /* Delete the change file and release the stream fileset memory */ + /* Delete the changes file. */ changes_filename(path, subid, xid); - FileSetDeleteAll(ent->stream_fileset); - pfree(ent->stream_fileset); - ent->stream_fileset = NULL; + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, false); - /* Delete the subxact file and release the memory, if it exist */ - if (ent->subxact_fileset) - { - subxact_filename(path, subid, xid); - FileSetDeleteAll(ent->subxact_fileset); - pfree(ent->subxact_fileset); - ent->subxact_fileset = NULL; - } - - /* Remove the xid entry from the stream xid hash */ - hash_search(xidhash, (void *) &xid, HASH_REMOVE, NULL); + /* Delete the subxact file, if it exists. */ + subxact_filename(path, subid, xid); + BufFileDeleteFileSet(MyLogicalRepWorker->stream_fileset, path, true); } /* @@ -3312,8 +3187,8 @@ stream_cleanup_files(Oid subid, TransactionId xid) * * Open a file for streamed changes from a toplevel transaction identified * by stream_xid (global variable). If it's the first chunk of streamed - * changes for this transaction, initialize the fileset and create the buffile, - * otherwise open the previously created file. + * changes for this transaction, create the buffile, otherwise open the + * previously created file. * * This can only be called at the beginning of a "streaming" block, i.e. * between stream_start/stream_stop messages from the upstream. @@ -3322,20 +3197,13 @@ static void stream_open_file(Oid subid, TransactionId xid, bool first_segment) { char path[MAXPGPATH]; - bool found; MemoryContext oldcxt; - StreamXidHash *ent; Assert(in_streamed_transaction); Assert(OidIsValid(subid)); Assert(TransactionIdIsValid(xid)); Assert(stream_fd == NULL); - /* create or find the xid entry in the xidhash */ - ent = (StreamXidHash *) hash_search(xidhash, - (void *) &xid, - HASH_ENTER, - &found); changes_filename(path, subid, xid); elog(DEBUG1, "opening file \"%s\" for streamed changes", path); @@ -3347,49 +3215,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment) oldcxt = MemoryContextSwitchTo(LogicalStreamingContext); /* - * If this is the first streamed segment, the file must not exist, so make - * sure we're the ones creating it. Otherwise just open the file for - * writing, in append mode. + * If this is the first streamed segment, create the changes file. + * Otherwise, just open the file for writing, in append mode. */ if (first_segment) - { - MemoryContext savectx; - FileSet *fileset; - - if (found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - - /* - * We need to maintain fileset across multiple stream start/stop - * calls. So, need to allocate it in a persistent context. - */ - savectx = MemoryContextSwitchTo(ApplyContext); - fileset = palloc(sizeof(FileSet)); - - FileSetInit(fileset); - MemoryContextSwitchTo(savectx); - - stream_fd = BufFileCreateFileSet(fileset, path); - - /* Remember the fileset for the next stream of the same transaction */ - ent->xid = xid; - ent->stream_fileset = fileset; - ent->subxact_fileset = NULL; - } + stream_fd = BufFileCreateFileSet(MyLogicalRepWorker->stream_fileset, + path); else { - if (!found) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("incorrect first-segment flag for streamed replication transaction"))); - /* * Open the file and seek to the end of the file because we always * append the changes file. */ - stream_fd = BufFileOpenFileSet(ent->stream_fileset, path, O_RDWR); + stream_fd = BufFileOpenFileSet(MyLogicalRepWorker->stream_fileset, + path, O_RDWR, false); BufFileSeek(stream_fd, 0, 0, SEEK_END); } diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index 5e5409d..5b5a6e1 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -278,10 +278,12 @@ BufFileCreateFileSet(FileSet *fileset, const char *name) * with BufFileCreateFileSet in the same FileSet using the same name. * The backend that created the file must have called BufFileClose() or * BufFileExportFileSet() to make sure that it is ready to be opened by other - * backends and render it read-only. + * backends and render it read-only. If missing_ok is true, it will return + * NULL if the file does not exist otherwise, it will throw an error. */ BufFile * -BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) +BufFileOpenFileSet(FileSet *fileset, const char *name, int mode, + bool missing_ok) { BufFile *file; char segment_name[MAXPGPATH]; @@ -318,10 +320,18 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * name. */ if (nfiles == 0) + { + /* free the memory */ + pfree(files); + + if (missing_ok) + return NULL; + ereport(ERROR, (errcode_for_file_access(), errmsg("could not open temporary file \"%s\" from BufFile \"%s\": %m", segment_name, name))); + } file = makeBufFileCommon(nfiles); file->files = files; @@ -341,10 +351,11 @@ BufFileOpenFileSet(FileSet *fileset, const char *name, int mode) * the FileSet to be cleaned up. * * Only one backend should attempt to delete a given name, and should know - * that it exists and has been exported or closed. + * that it exists and has been exported or closed otherwise missing_ok should + * be passed true. */ void -BufFileDeleteFileSet(FileSet *fileset, const char *name) +BufFileDeleteFileSet(FileSet *fileset, const char *name, bool missing_ok) { char segment_name[MAXPGPATH]; int segment = 0; @@ -366,7 +377,7 @@ BufFileDeleteFileSet(FileSet *fileset, const char *name) CHECK_FOR_INTERRUPTS(); } - if (!found) + if (!found && !missing_ok) elog(ERROR, "could not delete unknown BufFile \"%s\"", name); } diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c index f7994d7..debf12e1 100644 --- a/src/backend/utils/sort/logtape.c +++ b/src/backend/utils/sort/logtape.c @@ -564,7 +564,7 @@ ltsConcatWorkerTapes(LogicalTapeSet *lts, TapeShare *shared, lt = <s->tapes[i]; pg_itoa(i, filename); - file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY); + file = BufFileOpenFileSet(&fileset->fs, filename, O_RDONLY, false); filesize = BufFileSize(file); /* diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 504ef1c..033088f 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -560,7 +560,8 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) sts_filename(name, accessor, accessor->read_participant); accessor->read_file = - BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY); + BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY, + false); } /* Seek and load the chunk header. */ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a6c9d4e..1a2437a 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -50,6 +50,16 @@ typedef struct LogicalRepWorker XLogRecPtr relstate_lsn; slock_t relmutex; + /* + * The fileset is used by the worker to create the changes and subxact + * files for the streaming transaction. Upon the arrival of the first + * streaming transaction, the fileset will be initialized, and it will be + * deleted when the worker exits. Under this, separate buffiles would be + * created for each transaction and would be deleted after the transaction + * is completed. + */ + FileSet *stream_fileset; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -79,7 +89,6 @@ extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, extern void logicalrep_worker_stop(Oid subid, Oid relid); extern void logicalrep_worker_wakeup(Oid subid, Oid relid); extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker); -extern void logicalrep_worker_cleanupfileset(void); extern int logicalrep_sync_worker_count(Oid subid); diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 143eada..7ae5ea2 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -49,8 +49,9 @@ extern long BufFileAppend(BufFile *target, BufFile *source); extern BufFile *BufFileCreateFileSet(FileSet *fileset, const char *name); extern void BufFileExportFileSet(BufFile *file); extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name, - int mode); -extern void BufFileDeleteFileSet(FileSet *fileset, const char *name); + int mode, bool missing_ok); +extern void BufFileDeleteFileSet(FileSet *fileset, const char *name, + bool missing_ok); extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset); #endif /* BUFFILE_H */ -- 1.8.3.1