On Tue, May 19, 2020 at 6:01 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Fri, May 15, 2020 at 2:48 PM Dilip Kumar <dilipbal...@gmail.com> wrote: > >
I have further reviewed v22 and below are my comments: v22-0005-Implement-streaming-mode-in-ReorderBuffer -------------------------------------------------------------------------- 1. + * Note: We never do both stream and serialize a transaction (we only spill + * to disk when streaming is not supported by the plugin), so only one of + * those two flags may be set at any given time. + */ +#define rbtxn_is_streamed(txn) \ +( \ + ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \ +) The above 'Note' is not correct as per the latest implementation. v22-0006-Add-support-for-streaming-to-built-in-replicatio ---------------------------------------------------------------------------- 2. --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -14,7 +14,6 @@ * *------------------------------------------------------------------------- */ - #include "postgres.h" Spurious line removal. 3. +void +logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + uint8 flags = 0; + + pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + + Assert(TransactionIdIsValid(txn->xid)); + + /* transaction ID (we're starting to stream, so must be valid) */ + pq_sendint32(out, txn->xid); The part of the comment "we're starting to stream, so must be valid" is not correct as we are not at the start of the stream here. The patch has used the same incorrect sentence at few places, kindly fix those as well. 4. + * XXX Do we need to allocate it in TopMemoryContext? + */ +static void +subxact_info_add(TransactionId xid) { .. For this and other places in a patch like in function stream_open_file(), instead of using TopMemoryContext, can we consider using a new memory context LogicalStreamingContext or something like that. We can create LogicalStreamingContext under TopMemoryContext. I don't see any need of using TopMemoryContext here. 5. +static void +subxact_info_add(TransactionId xid) This function has assumed a valid value for global variables like stream_fd and stream_xid. I think it is better to have Assert for those in this function before using them. The Assert for those are present in handle_streamed_transaction but I feel they should be in subxact_info_add. 6. +subxact_info_add(TransactionId xid) /* + * In most cases we're checking the same subxact as we've already seen in + * the last call, so make ure just ignore it (this change comes later). + */ + if (subxact_last == xid) + return; Typo and minor correction, /ure just/sure to 7. +subxact_info_write(Oid subid, TransactionId xid) { .. + /* + * But we free the memory allocated for subxact info. There might be one + * exceptional transaction with many subxacts, and we don't want to keep + * the memory allocated forewer. + * + */ a. Typo, /forewer/forever b. The extra line at the end of the comment is not required. 8. + * XXX Maybe we should only include the checksum when the cluster is + * initialized with checksums? + */ +static void +subxact_info_write(Oid subid, TransactionId xid) Do we really need to have the checksum for temporary files? I have checked a few other similar cases like SharedFileSet stuff for parallel hash join but didn't find them using checksums. Can you also once see other usages of temporary files and then let us decide if we see any reason to have checksums for this? Another point is we don't seem to be doing this for 'changes' file, see stream_write_change. So, not sure, there is any sense to write checksum for subxact file. Tomas, do you see any reason for the same? 9. +subxact_filename(char *path, Oid subid, TransactionId xid) +{ + char tempdirpath[MAXPGPATH]; + + TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID); + + /* + * We might need to create the tablespace's tempfile directory, if no + * one has yet done so. + */ + if ((MakePGDirectory(tempdirpath) < 0) && errno != EEXIST) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + tempdirpath))); + + snprintf(path, MAXPGPATH, "%s/logical-%u-%u.subxacts", + tempdirpath, subid, xid); +} Temporary files created in PGDATA/base/pgsql_tmp follow a certain naming convention (see docs[1]) which is not followed here. You can also refer SharedFileSetPath and OpenTemporaryFile. I think we can just try to follow that convention and then additionally append subid, xid and .subxacts. Also, a similar change is required for changes_filename. I would like to know if there is a reason why we want to use different naming convention here? 10. + * This can only be called at the beginning of a "streaming" block, i.e. + * between stream_start/stream_stop messages from the upstream. + */ +static void +stream_close_file(void) The comment seems to be wrong. I think this can be only called at stream end, so it should be "This can only be called at the end of a "streaming" block, i.e. at stream_stop message from the upstream." 11. + * the order the transactions are sent in. So streamed trasactions are + * handled separately by using schema_sent flag in ReorderBufferTXN. + * * For partitions, 'pubactions' considers not only the table's own * publications, but also those of all of its ancestors. */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - + TransactionId xid; /* transaction that created the record */ /* * Did we send the schema? If ancestor relid is set, its schema must also * have been sent for this to be true. */ bool schema_sent; + List *streamed_txns; /* streamed toplevel transactions with this + * schema */ The part of comment "So streamed trasactions are handled separately by using schema_sent flag in ReorderBufferTXN." doesn't seem to match with what we are doing in the latest version of the patch. 12. maybe_send_schema() { .. + if (in_streaming) + { + /* + * TOCHECK: We have to send schema after each catalog change and it may + * occur when streaming already started, so we have to track new catalog + * changes somehow. + */ + schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid); .. .. } I think it is good to once verify/test what this comment says but as per code we should be sending the schema after each catalog change as we invalidate the streamed_txns list in rel_sync_cache_relation_cb which must be called during relcache invalidation. Do we see any problem with that mechanism? 13. +/* + * Notify downstream to discard the streamed transaction (along with all + * it's subtransactions, if it's a toplevel transaction). + */ +static void +pgoutput_stream_commit(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) This comment is copied from pgoutput_stream_abort, so doesn't match what this function is doing. [1] - https://www.postgresql.org/docs/devel/storage-file-layout.html -- With Regards, Amit Kapila. EnterpriseDB: http://www.enterprisedb.com