Thanks for feedback! Robert Haas <robertmh...@gmail.com> wrote:
> However, this is also quite invasive. It changes a lot of code and it > doesn't do so in a very predictable way. It's not like you went > through and replaced every call to write() with a call to > SpecialEncyptionMagicWrite(). Rather, there are new #ifdef > USE_ENCRYPTION blocks all over the code base. I think it will be > important to look for ways to refactor this functionality to reduce > that sort of thing to the absolute minimum possible. Encryption needs > to be something that the code needs to know about here and there, but > not everywhere. The point of #ifdef USE_ENCRYPTION is that we rely on OpenSSL, so the code needs to compile even w/o OpenSSL (of course the encryption won't be enabled in that case). I'll try to reduce the use of this construct only to the code blocks that really reference the OpenSSL functions. > Some of the changes are things that could perhaps be done as separate, > preparatory patches. For instance, pgstat.c gets a heavy refactoring > to make it do I/O in bigger chunks. There's no reason that you > couldn't separate some of those changes out, clean them up, and get > them committed separately. It would be more efficient even as things > stand. OK, well, there is one reason: we may be getting a > shared-memory stats collector soon, and if we do, then the need for > all of this will go away. But the general point is valid: whatever is > a useful cleanup apart from this patch should be done separately from > this patch. We'll omit the stats collector related changes so far because the patch [1] is under active development, and that would require us to rebase our patch often. And if the stats files will eventually go away, we won't need to encrypt the statistics. > As far as possible, we need to try to do things in the same way in the > encrypted and non-encrypted cases. For example, it's pretty hard to > feel good about code like this: > > + sz_hdr = sizeof(ReorderBufferDiskChange); > + if (data_encrypted) > +#ifdef USE_ENCRYPTION > + sz_hdr = TYPEALIGN(ENCRYPTION_BLOCK, sz_hdr); > +#else > + ENCRYPTION_NOT_SUPPORTED_MSG; > +#endif /* USE_ENCRYPTION */ > > You won't be able to have much confidence that this code works both > with and without encryption unless you test it both ways, because the > file format is different in those two cases, and not just by being > encrypted. That means that anyone who modifies this code in the > future has two ways of breaking it. They can break the normal case, > or they can break the encrypted case. If encryption were available as > a sort of service that everyone could use, then that would probably be > fine, but like I said above, I think something as invasive as this > currently is will lead to a lot of complaints. The problem I tried to solve here is that the whole encryption block (16 bytes) needs to be read and decrypted even if you need only part of its data. In the snippet above I tried to ensure that the whole blocks are always read, but I agree this approach is fragile. Since buffile.c needs to handle this kind of problem (and it already does in the last patch version), I think even other than temporary files could be handled by this module. The patch below adds functions BufFileOpenTransient(), BufFileWriteTransient(), etc. that can replace OpenTransientFile, write(), etc. respectively. Once we implement the encryption, these functions will also hide handling of the encryption blocks from user. In this (preliminary) patch As an example, I applied this approach to ReorderBufferSerializeChange() and the function seems a bit simpler now. (ReorderBufferRestoreChange() would need more work to adopt this approach.) > The code needs a lot more documentation, not only SGML documentation > but also code comments and probably a README file someplace. ok, we'll do that. > The interaction of this capability with certain tricks that PostgreSQL > plays needs some thought -- and documentation, at least developer > documentation, maybe user documentation. One area is recovery. > Writing WAL relies on the fact that if you are in the midst of > rewriting a block and the power fails, bytes that are the same in both > the old and new block images will be undisturbed; encryption will make > that false. How's that going to work? Yes, if only a single bit is different, decryption will turn the whole encryption block (16 bytes) into garbage. So we need to enforce full_page_writes=on if encryption is enabled. The last version of the patch does not do that. > Hint bits also rely on this principle. I thought there might be some > interaction between this work and wal_log_hints for this reason, but I see > nothing of that sort in the patch. I'm not sure if the hint bit is still a problem if we first copy the shared buffer to backend-local memory and encrypt it there. That's what the patch does. > Full page writes seem related too; I don't know how something like this can > be safe without both full_page_writes and wal_log_hints forced on. I don't > know whether using encryption should forcibly override those settings or > whether it should just refuse to start unless they are set properly, but I > think it probably needs to do one or the other. Maybe it's more polite to refuse to start so that user knows what's going on. I'm not sure if PG ever changes any configuration variable forcibly. > Hope this helps in some way. I think it would be good if this effort > went forward in some way. Sure, it helps! Thanks. [1] https://commitfest.postgresql.org/22/1708/ -- Antonin Houska https://www.cybertec-postgresql.com
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 2fbfadd9f0..9fc00bde9e 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3874,12 +3874,6 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_RELATION_MAP_WRITE: event_name = "RelationMapWrite"; break; - case WAIT_EVENT_REORDER_BUFFER_READ: - event_name = "ReorderBufferRead"; - break; - case WAIT_EVENT_REORDER_BUFFER_WRITE: - event_name = "ReorderBufferWrite"; - break; case WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ: event_name = "ReorderLogicalMappingRead"; break; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2cfdf1c9ac..f958d27761 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -71,6 +71,7 @@ #include "replication/slot.h" #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" +#include "storage/buffile.h" #include "storage/fd.h" #include "storage/sinval.h" #include "utils/builtins.h" @@ -109,7 +110,7 @@ typedef struct ReorderBufferIterTXNEntry XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + TransientBufFile *file; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -192,9 +193,11 @@ static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTX static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - int fd, ReorderBufferChange *change); + TransientBufFile *file, ReorderBufferChange *change); +static void ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size, + ReorderBufferTXN *txn); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + TransientBufFile **file, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -468,8 +471,8 @@ ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids) { - Oid *relids; - Size alloc_len; + Oid *relids; + Size alloc_len; alloc_len = sizeof(Oid) * nrelids; @@ -988,7 +991,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) for (off = 0; off < state->nr_txns; off++) { - state->entries[off].fd = -1; + state->entries[off].file = NULL; state->entries[off].segno = 0; } @@ -1013,7 +1016,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); - ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, + ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, &state->entries[off].segno); } @@ -1043,7 +1046,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, cur_txn); ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].fd, + &state->entries[off].file, &state->entries[off].segno); } cur_change = dlist_head_element(ReorderBufferChange, node, @@ -1124,7 +1127,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_delete(&change->node); dlist_push_tail(&state->old_change, &change->node); - if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, + if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { /* successfully restored changes from disk */ @@ -1163,8 +1166,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { - if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + if (state->entries[off].file) + BufFileCloseTransient(state->entries[off].file); } /* free memory we might have "leaked" in the last *Next call */ @@ -1327,8 +1330,8 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) else { /* - * Maybe we already saw this tuple before in this transaction, - * but if so it must have the same cmin. + * Maybe we already saw this tuple before in this transaction, but + * if so it must have the same cmin. */ Assert(ent->cmin == change->data.tuplecid.cmin); @@ -2254,7 +2257,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { dlist_iter subtxn_i; dlist_mutable_iter change_i; - int fd = -1; + TransientBufFile *file = NULL; XLogSegNo curOpenSegNo = 0; Size spilled = 0; @@ -2281,13 +2284,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * store in segment in which it belongs by start lsn, don't split over * multiple segments tho */ - if (fd == -1 || + if (file == NULL || !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size)) { char path[MAXPGPATH]; - if (fd != -1) - CloseTransientFile(fd); + if (file) + BufFileCloseTransient(file); XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size); @@ -2299,16 +2302,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) curOpenSegNo); /* open segment, create it if necessary */ - fd = OpenTransientFile(path, - O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); - - if (fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", path))); + file = BufFileOpenTransient(path, + O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); } - ReorderBufferSerializeChange(rb, txn, fd, change); + ReorderBufferSerializeChange(rb, txn, file, change); dlist_delete(&change->node); ReorderBufferReturnChange(rb, change); @@ -2320,8 +2318,8 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->nentries_mem = 0; txn->serialized = true; - if (fd != -1) - CloseTransientFile(fd); + if (file) + BufFileCloseTransient(file); } /* @@ -2329,15 +2327,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) */ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - int fd, ReorderBufferChange *change) + TransientBufFile *file, ReorderBufferChange *change) { - ReorderBufferDiskChange *ondisk; + ReorderBufferDiskChange hdr; Size sz = sizeof(ReorderBufferDiskChange); - ReorderBufferSerializeReserve(rb, sz); - - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); + memcpy((char *) &hdr + offsetof(ReorderBufferDiskChange, change), + change, sizeof(ReorderBufferChange)); switch (change->action) { @@ -2347,7 +2343,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: { - char *data; ReorderBufferTupleBuf *oldtup, *newtup; Size oldlen = 0; @@ -2370,66 +2365,55 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sz += newlen; } - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange), + txn); if (oldlen) { - memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, oldtup->tuple.t_data, oldlen); - data += oldlen; + ReorderBufferWriteData(file, &oldtup->tuple, + sizeof(HeapTupleData), txn); + ReorderBufferWriteData(file, oldtup->tuple.t_data, oldlen, + txn); } if (newlen) { - memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, newtup->tuple.t_data, newlen); - data += newlen; + ReorderBufferWriteData(file, &newtup->tuple, + sizeof(HeapTupleData), txn); + ReorderBufferWriteData(file, newtup->tuple.t_data, newlen, + txn); } break; } case REORDER_BUFFER_CHANGE_MESSAGE: { - char *data; Size prefix_size = strlen(change->data.msg.prefix) + 1; sz += prefix_size + change->data.msg.message_size + sizeof(Size) + sizeof(Size); - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, + sizeof(ReorderBufferDiskChange), + txn); /* write the prefix including the size */ - memcpy(data, &prefix_size, sizeof(Size)); - data += sizeof(Size); - memcpy(data, change->data.msg.prefix, - prefix_size); - data += prefix_size; + ReorderBufferWriteData(file, &prefix_size, sizeof(Size), txn); + ReorderBufferWriteData(file, change->data.msg.prefix, + prefix_size, txn); /* write the message including the size */ - memcpy(data, &change->data.msg.message_size, sizeof(Size)); - data += sizeof(Size); - memcpy(data, change->data.msg.message, - change->data.msg.message_size); - data += change->data.msg.message_size; + ReorderBufferWriteData(file, &change->data.msg.message_size, + sizeof(Size), txn); + ReorderBufferWriteData(file, change->data.msg.message, + change->data.msg.message_size, txn); break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; - char *data; snap = change->data.snapshot; @@ -2438,49 +2422,37 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(TransactionId) * snap->subxcnt ; - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, + sizeof(ReorderBufferDiskChange), txn); - memcpy(data, snap, sizeof(SnapshotData)); - data += sizeof(SnapshotData); + ReorderBufferWriteData(file, snap, sizeof(SnapshotData), txn); if (snap->xcnt) - { - memcpy(data, snap->xip, - sizeof(TransactionId) * snap->xcnt); - data += sizeof(TransactionId) * snap->xcnt; - } + ReorderBufferWriteData(file, snap->xip, + sizeof(TransactionId) * snap->xcnt, + txn); if (snap->subxcnt) - { - memcpy(data, snap->subxip, - sizeof(TransactionId) * snap->subxcnt); - data += sizeof(TransactionId) * snap->subxcnt; - } + ReorderBufferWriteData(file, snap->subxip, + sizeof(TransactionId) * snap->subxcnt, + txn); break; } case REORDER_BUFFER_CHANGE_TRUNCATE: { - Size size; - char *data; + Size size; /* account for the OIDs of truncated relations */ size = sizeof(Oid) * change->data.truncate.nrelids; sz += size; - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - memcpy(data, change->data.truncate.relids, size); - data += size; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange), + txn); + ReorderBufferWriteData(file, change->data.truncate.relids, size, + txn); break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -2489,27 +2461,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* ReorderBufferChange contains everything important */ break; } +} - ondisk->size = sz; - - errno = 0; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); - if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) - { - int save_errno = errno; - - CloseTransientFile(fd); - - /* if write didn't set errno, assume problem is no disk space */ - errno = save_errno ? save_errno : ENOSPC; +/* + * Wrapper for BufFileWriteTransient() that raises ERROR if the whole chunk + * was not written. XXX Should this be a macro? + */ +static void +ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size, + ReorderBufferTXN *txn) +{ + if (BufFileWriteTransient(file, ptr, size) != size) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to data file for XID %u: %m", txn->xid))); - } - pgstat_report_wait_end(); - - Assert(ondisk->change.action == change->action); } /* @@ -2517,7 +2483,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + TransientBufFile **file, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; @@ -2545,7 +2511,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int readBytes; ReorderBufferDiskChange *ondisk; - if (*fd == -1) + if (*file == NULL) { char path[MAXPGPATH]; @@ -2562,18 +2528,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); - if (*fd < 0 && errno == ENOENT) + *file = BufFileOpenTransient(path, O_RDONLY | PG_BINARY); + if (*file == NULL) { - *fd = -1; + Assert(errno == ENOENT); (*segno)++; continue; } - else if (*fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); } /* @@ -2582,22 +2543,16 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = BufFileReadTransient(*file, rb->outbuf, sizeof(ReorderBufferDiskChange)); /* eof */ if (readBytes == 0) { - CloseTransientFile(*fd); - *fd = -1; + BufFileCloseTransient(*file); + *file = NULL; (*segno)++; continue; } - else if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); else if (readBytes != sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), @@ -2611,16 +2566,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = BufFileReadTransient(*file, rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange)); - if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) + if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", @@ -2767,7 +2716,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* the base struct contains all the data, easy peasy */ case REORDER_BUFFER_CHANGE_TRUNCATE: { - Oid *relids; + Oid *relids; relids = ReorderBufferGetRelids(rb, change->data.truncate.nrelids); diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index c2c445dbf4..6c426c0509 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -41,6 +41,8 @@ #include "postgres.h" +#include <unistd.h> + #include "executor/instrument.h" #include "miscadmin.h" #include "pgstat.h" @@ -93,6 +95,27 @@ struct BufFile PGAlignedBlock buffer; }; +/* + * Buffered variant of a transient file. Unlike BufFile this is simpler in + * several ways: 1) it's not split into segments, 2) there's no need of seek, + * 3) there's no need to combine read and write access. + */ +struct TransientBufFile +{ + /* The underlying file. */ + char *path; + int fileFlags; + int fd; + + off_t offset; /* next read/write position in file */ + int pos; /* next read/write position in buffer */ + int nbytes; /* total # of valid bytes in buffer */ + + bool dirty; /* unsaved data in the buffer? */ + + PGAlignedBlock buffer; +}; + static BufFile *makeBufFileCommon(int nfiles); static BufFile *makeBufFile(File firstfile); static void extendBufFile(BufFile *file); @@ -101,6 +124,9 @@ static void BufFileDumpBuffer(BufFile *file); static int BufFileFlush(BufFile *file); static File MakeNewSharedSegment(BufFile *file, int segment); +static void BufFileLoadBufferTransient(TransientBufFile *file); +static void BufFileDumpBufferTransient(TransientBufFile *file); + /* * Create BufFile and perform the common initialization. */ @@ -831,3 +857,267 @@ BufFileAppend(BufFile *target, BufFile *source) return startBlock; } + +/* + * Open TransientBufFile at given path or create one if it does not + * exist. User will be allowed either to write to the file or to read from it, + * according to fileFlags, but not both. + */ +TransientBufFile * +BufFileOpenTransient(const char *path, int fileFlags) +{ + TransientBufFile *file; + int fd; + + /* Either read or write mode, but not both. */ + Assert((fileFlags & O_RDWR) == 0); + + fd = OpenTransientFile(path, fileFlags); + if (fd < 0) + { + /* + * If caller wants to read from file and the file is not there, he + * should be able to handle the condition on his own. + * + * XXX Shouldn't we always let caller evaluate errno? + */ + if (errno == ENOENT && (fileFlags & O_RDONLY)) + return NULL; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + } + + file = (TransientBufFile *) palloc(sizeof(TransientBufFile)); + + file->path = pstrdup(path); + file->fileFlags = fileFlags; + file->fd = fd; + + file->dirty = false; + + file->pos = 0; + file->nbytes = 0; + + if (file->fileFlags & O_APPEND) + { + /* Position the buffer at the end of the file. */ + errno = 0; + file->offset = lseek(file->fd, 0, SEEK_END); + } + else + { + /* Load the initial part of the file. */ + file->offset = 0L; + BufFileLoadBufferTransient(file); + } + + if (errno > 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not initialize TransientBufFile for file \"%s\": %m", + path))); + + return file; +} + +/* + * Close a TransientBufFile. + */ +void +BufFileCloseTransient(TransientBufFile *file) +{ + /* Flush any unwritten data. */ + if (file->fileFlags & O_WRONLY && file->dirty && file->nbytes > 0) + { + BufFileDumpBufferTransient(file); + + /* + * Caller of BufFileWriteTransient() recognizes the failure to flush + * buffer by the returned value, however this function has no return + * code. + */ + if (file->dirty) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not flush file \"%s\": %m", file->path))); + } + + /* + * XXX Should we raise ERROR if non-zero code is returned here, or is it + * enough if the failure cause WARNING during transaction commit? + */ + CloseTransientFile(file->fd); + + pfree(file->path); + pfree(file); +} + +/* + * Like BufFileWrite() except it receives pointer to TransientBufFile. + * + * TODO Reuse the code of BufFileWrite() in a better way than copy & paste. + */ +size_t +BufFileWriteTransient(TransientBufFile *file, void *ptr, size_t size) +{ + size_t nwritten = 0; + size_t nthistime; + + Assert((file->fileFlags & O_WRONLY)); + + while (size > 0) + { + if (file->pos >= BLCKSZ) + { + Assert(file->dirty); + + /* Buffer full, dump it out */ + BufFileDumpBufferTransient(file); + if (file->dirty) + break; /* I/O error */ + } + + nthistime = BLCKSZ - file->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(file->buffer.data + file->pos, ptr, nthistime); + + file->dirty = true; + file->pos += nthistime; + if (file->nbytes < file->pos) + file->nbytes = file->pos; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + nwritten += nthistime; + } + + return nwritten; +} + +/* + * Like BufFileRead() except it receives pointer to TransientBufFile. + * + * TODO Reuse the code of BufFileRead() in a better way than copy & paste. + */ +size_t +BufFileReadTransient(TransientBufFile *file, void *ptr, size_t size) +{ + size_t nread = 0; + size_t nthistime; + + Assert((file->fileFlags & O_WRONLY) == 0); + + while (size > 0) + { + if (file->pos >= file->nbytes) + { + /* Try to load more data into buffer. */ + file->offset += file->pos; + file->pos = 0; + file->nbytes = 0; + BufFileLoadBufferTransient(file); + if (file->nbytes <= 0) + break; /* no more data available */ + } + + nthistime = file->nbytes - file->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(ptr, file->buffer.data + file->pos, nthistime); + + file->pos += nthistime; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + nread += nthistime; + } + + return nread; +} + +/* + * Load some data into buffer, if possible, starting from file->offset. At + * call, must have dirty = false, pos and nbytes = 0. On exit, nbytes is + * number of bytes loaded. + */ +static void +BufFileLoadBufferTransient(TransientBufFile *file) +{ + Assert(!file->dirty); + Assert(file->pos == 0 && file->nbytes == 0); + +retry: + + /* + * Read whatever we can get, up to a full bufferload. + */ + errno = 0; + pgstat_report_wait_start(WAIT_EVENT_BUFFILE_READ); + file->nbytes = pg_pread(file->fd, file->buffer.data, + sizeof(file->buffer), file->offset); + pgstat_report_wait_end(); + + if (file->nbytes < 0) + { + /* TODO The W32 specific code, see FileWrite. */ + + /* OK to retry if interrupted */ + if (errno == EINTR) + goto retry; + + return; /* failed to write */ + } + + if (file->nbytes < 0) + file->nbytes = 0; + /* we choose not to advance offset here */ +} + +/* + * Write buffer contents to disk. + */ +static void +BufFileDumpBufferTransient(TransientBufFile *file) +{ + int nwritten; + + /* This function should only be needed during write access ... */ + Assert(file->fileFlags & O_WRONLY); + + /* ... and if there's some work to do. */ + Assert(file->dirty); + Assert(file->nbytes > 0); + +retry: + errno = 0; + pgstat_report_wait_start(WAIT_EVENT_BUFFILE_WRITE); + nwritten = pg_pwrite(file->fd, file->buffer.data, file->nbytes, + file->offset); + pgstat_report_wait_end(); + + /* if write didn't set errno, assume problem is no disk space */ + if (nwritten != file->nbytes && errno == 0) + errno = ENOSPC; + + if (nwritten < 0) + { + /* TODO The W32 specific code, see FileWrite. */ + + /* OK to retry if interrupted */ + if (errno == EINTR) + goto retry; + + return; /* failed to write */ + } + + file->dirty = false; + + file->offset += nwritten; + file->pos = 0; + file->nbytes = 0; +} diff --git a/src/include/pgstat.h b/src/include/pgstat.h index ea6cc8b560..cbaeccced5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -907,8 +907,6 @@ typedef enum WAIT_EVENT_RELATION_MAP_READ, WAIT_EVENT_RELATION_MAP_SYNC, WAIT_EVENT_RELATION_MAP_WRITE, - WAIT_EVENT_REORDER_BUFFER_READ, - WAIT_EVENT_REORDER_BUFFER_WRITE, WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ, WAIT_EVENT_REPLICATION_SLOT_READ, WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC, diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 1fba404fe2..8b2845e33f 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -28,9 +28,13 @@ #include "storage/sharedfileset.h" -/* BufFile is an opaque type whose details are not known outside buffile.c. */ +/* + * BufFile and TransientBufFile are opaque types whose details are not known + * outside buffile.c. + */ typedef struct BufFile BufFile; +typedef struct TransientBufFile TransientBufFile; /* * prototypes for functions in buffile.c @@ -51,4 +55,11 @@ extern void BufFileExportShared(BufFile *file); extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name); extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name); +extern TransientBufFile *BufFileOpenTransient(const char *path, int fileFlags); +extern void BufFileCloseTransient(TransientBufFile *file); +extern size_t BufFileWriteTransient(TransientBufFile *file, void *ptr, + size_t size); +extern size_t BufFileReadTransient(TransientBufFile *file, void *ptr, + size_t size); + #endif /* BUFFILE_H */