Thanks Andres for your review. Thanks Li, Horiguchi-san and Amit for your comments.
On Tue, 20 Oct 2020 at 04:57, Andres Freund <and...@anarazel.de> wrote: > Hi, > > On 2020-10-16 12:55:26 +0530, Ashutosh Bapat wrote: > > Here's a patch simplifying that for top level logical replication > > messages. > > I think that's a good plan. One big benefit for me is that it's much > easier to search for an enum than for a single letter > constant. Including searching for all the places that deal with any sort > of logical rep message type. > > > void > > logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) > > { > > - pq_sendbyte(out, 'B'); /* BEGIN */ > > + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); /* BEGIN */ > > I think if we have the LOGICAL_REP_MSG_BEGIN we don't need the /* BEGIN */. > Yes. Fixed all places. I have attached two places - 0001 which is previous 0001 patch with your comments addressed. 0002 adds wrappers on top of pq_sendbyte() and pq_getmsgbyte() to send and receive a logical replication message type respectively. These wrappers add more protection to make sure that the enum definitions fit one byte. This also removes the default case from apply_dispatch() so that we can detect any LogicalRepMsgType not handled by that function. These two patches are intended to be committed together as a single commit. For now the second one is separate so that it's easy to remove the changes if they are not acceptable. -- Best Wishes, Ashutosh
From 8da06210710033946de1541d6587ecc783fa3649 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com> Date: Fri, 16 Oct 2020 12:31:35 +0530 Subject: [PATCH 1/2] Enumize top level logical replication actions Logical replication protocol uses single byte character to identify different chunks of logical repliation messages. The code uses string literals for the same. Enumize those so that 1. All the string literals used can be found at a single place. This makes it easy to add more actions without the risk of conflicts. 2. It's easy to locate the code handling a given action. Ashutosh Bapat --- src/backend/replication/logical/proto.c | 26 ++++++------ src/backend/replication/logical/worker.c | 54 ++++++++++++------------ src/include/replication/logicalproto.h | 25 +++++++++++ 3 files changed, 65 insertions(+), 40 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index eb19142b48..fdb31182d7 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in); void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) { - pq_sendbyte(out, 'B'); /* BEGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); /* fixed fields */ pq_sendint64(out, txn->final_lsn); @@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'C'); /* sending COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); /* send the flags field (unused for now) */ pq_sendbyte(out, flags); @@ -112,7 +112,7 @@ void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn) { - pq_sendbyte(out, 'O'); /* ORIGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN); /* fixed fields */ pq_sendint64(out, origin_lsn); @@ -141,7 +141,7 @@ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'I'); /* action INSERT */ + pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -185,7 +185,7 @@ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'U'); /* action UPDATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, 'D'); /* action DELETE */ + pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out, int i; uint8 flags = 0; - pq_sendbyte(out, 'T'); /* action TRUNCATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; - pq_sendbyte(out, 'R'); /* sending RELATION */ + pq_sendbyte(out, LOGICAL_REP_MSG_RELATION); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) HeapTuple tup; Form_pg_type typtup; - pq_sendbyte(out, 'Y'); /* sending TYPE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TYPE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -755,7 +755,7 @@ void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment) { - pq_sendbyte(out, 'S'); /* action STREAM START */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START); Assert(TransactionIdIsValid(xid)); @@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) void logicalrep_write_stream_stop(StringInfo out) { - pq_sendbyte(out, 'E'); /* action STREAM END */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END); } /* @@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT); Assert(TransactionIdIsValid(txn->xid)); @@ -849,7 +849,7 @@ void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid) { - pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3a5b733ee3..ec21cc55e5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1897,62 +1897,62 @@ apply_handle_truncate(StringInfo s) static void apply_dispatch(StringInfo s) { - char action = pq_getmsgbyte(s); + LogicalRepMsgType action = pq_getmsgbyte(s); switch (action) { - /* BEGIN */ - case 'B': + case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); break; - /* COMMIT */ - case 'C': + + case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); break; - /* INSERT */ - case 'I': + + case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); break; - /* UPDATE */ - case 'U': + + case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); break; - /* DELETE */ - case 'D': + + case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); break; - /* TRUNCATE */ - case 'T': + + case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); break; - /* RELATION */ - case 'R': + + case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); break; - /* TYPE */ - case 'Y': + + case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); break; - /* ORIGIN */ - case 'O': + + case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); break; - /* STREAM START */ - case 'S': + + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); break; - /* STREAM END */ - case 'E': + + case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); break; - /* STREAM ABORT */ - case 'A': + + case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); break; - /* STREAM COMMIT */ - case 'c': + + case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); break; + default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 0c2cda264e..15ee2304c8 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -33,6 +33,31 @@ #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM +/* + * Logical message types + * + * Used by logical replication wire protocol. + * + * Note: though this is an enum it should fit a single byte and should be a + * printable character. + */ +typedef enum +{ + LOGICAL_REP_MSG_BEGIN = 'B', + LOGICAL_REP_MSG_COMMIT = 'C', + LOGICAL_REP_MSG_ORIGIN = 'O', + LOGICAL_REP_MSG_INSERT = 'I', + LOGICAL_REP_MSG_UPDATE = 'U', + LOGICAL_REP_MSG_DELETE = 'D', + LOGICAL_REP_MSG_TRUNCATE = 'T', + LOGICAL_REP_MSG_RELATION = 'R', + LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_STREAM_START = 'S', + LOGICAL_REP_MSG_STREAM_END = 'E', + LOGICAL_REP_MSG_STREAM_COMMIT = 'c', + LOGICAL_REP_MSG_STREAM_ABORT = 'A', +} LogicalRepMsgType; + /* * This struct stores a tuple received via logical replication. * Keep in mind that the columns correspond to the *remote* table. -- 2.17.1
From ecd692fbefa352ebe5b9698153f47102a93b7e16 Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com> Date: Thu, 22 Oct 2020 11:52:24 +0530 Subject: [PATCH 2/2] Functions to send and receive LogicalRepMsgType Add wrappers on top of pq_sendbyte() and pq_getmsgbyte() to send and receive a logical replication message type respectively. This also removes default case from apply_dispatch() so that we can detect any LogicalRepMsgType not handled by that function. Ashutosh Bapat --- src/backend/replication/logical/proto.c | 37 +++++++++++++-------- src/backend/replication/logical/worker.c | 41 ++++++++++++++++++++---- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index fdb31182d7..8c57ff03ec 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -38,13 +38,24 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); +/* + * Wrapper around pq_sendbyte to send logical replication message type. + */ +static void +pq_send_logicalrep_msg_type(StringInfo out, LogicalRepMsgType msgtype) +{ + /* A logical message type should fit a single byte */ + Assert((char) msgtype == msgtype); + pq_sendbyte(out, (char) msgtype); +} + /* * Write BEGIN to the output stream. */ void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) { - pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_BEGIN); /* fixed fields */ pq_sendint64(out, txn->final_lsn); @@ -76,7 +87,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_COMMIT); /* send the flags field (unused for now) */ pq_sendbyte(out, flags); @@ -112,7 +123,7 @@ void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn) { - pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_ORIGIN); /* fixed fields */ pq_sendint64(out, origin_lsn); @@ -141,7 +152,7 @@ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_INSERT); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -185,7 +196,7 @@ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_UPDATE); Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -263,7 +274,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_DELETE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -317,7 +328,7 @@ logicalrep_write_truncate(StringInfo out, int i; uint8 flags = 0; - pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_TRUNCATE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -369,7 +380,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; - pq_sendbyte(out, LOGICAL_REP_MSG_RELATION); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_RELATION); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -425,7 +436,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) HeapTuple tup; Form_pg_type typtup; - pq_sendbyte(out, LOGICAL_REP_MSG_TYPE); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_TYPE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -755,7 +766,7 @@ void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment) { - pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_STREAM_START); Assert(TransactionIdIsValid(xid)); @@ -788,7 +799,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) void logicalrep_write_stream_stop(StringInfo out) { - pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_STREAM_END); } /* @@ -800,7 +811,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_STREAM_COMMIT); Assert(TransactionIdIsValid(txn->xid)); @@ -849,7 +860,7 @@ void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid) { - pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); + pq_send_logicalrep_msg_type(out, LOGICAL_REP_MSG_STREAM_ABORT); Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ec21cc55e5..f516c056a1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1890,6 +1890,40 @@ apply_handle_truncate(StringInfo s) CommandCounterIncrement(); } +/* + * Wrapper around pq_getmsgbyte() to extract logical replication message type + * from stream. + */ +static LogicalRepMsgType +pq_get_logicalrep_msg_type(StringInfo s) +{ + LogicalRepMsgType msgtype = pq_getmsgbyte(s); + + switch (msgtype) + { + case LOGICAL_REP_MSG_BEGIN: + case LOGICAL_REP_MSG_COMMIT: + case LOGICAL_REP_MSG_INSERT: + case LOGICAL_REP_MSG_UPDATE: + case LOGICAL_REP_MSG_DELETE: + case LOGICAL_REP_MSG_TRUNCATE: + case LOGICAL_REP_MSG_RELATION: + case LOGICAL_REP_MSG_TYPE: + case LOGICAL_REP_MSG_ORIGIN: + case LOGICAL_REP_MSG_STREAM_START: + case LOGICAL_REP_MSG_STREAM_END: + case LOGICAL_REP_MSG_STREAM_ABORT: + case LOGICAL_REP_MSG_STREAM_COMMIT: + return msgtype; + } + + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", msgtype))); + + /* Unreachable, keep compiler happy */ + return msgtype; +} /* * Logical replication protocol message dispatcher. @@ -1897,7 +1931,7 @@ apply_handle_truncate(StringInfo s) static void apply_dispatch(StringInfo s) { - LogicalRepMsgType action = pq_getmsgbyte(s); + LogicalRepMsgType action = pq_get_logicalrep_msg_type(s); switch (action) { @@ -1952,11 +1986,6 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); break; - - default: - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid logical replication message type \"%c\"", action))); } } -- 2.17.1