On Fri, 30 Oct 2020 at 09:16, Amit Kapila <amit.kapil...@gmail.com> wrote
> > 1. > + LOGICAL_REP_MSG_STREAM_ABORT = 'A', > +} LogicalRepMsgType; > > There is no need for a comma after the last message. > > Done. Thanks for noticing it. > 2. > +/* > + * Logical message types > + * > + * Used by logical replication wire protocol. > + * > + * Note: though this is an enum it should fit a single byte and should be > a > + * printable character. > + */ > +typedef enum > +{ > > I think we can expand the comments to probably say why we need these > to fit in a single byte or what problem it can cause if that rule is > disobeyed. This is to make the next person clear why we are imposing > such a rule. > Done. Please check. > > 3. > +typedef enum > +{ > .. > +} LogicalRepMsgType; > > There are places in code where we use the enum name > (LogicalRepMsgType) both in the start and end. See TypeCat, > CoercionMethod, CoercionCodes, etc. I see places where we use the way > you have in the code. I would prefer the way we have used at places > like TypeCat as that makes it easier to read. > Not my favourite style since changing the type name requires changing enum name to keep those consistent. But anyway done. > > 4. > switch (action) > { > - /* BEGIN */ > - case 'B': > + case LOGICAL_REP_MSG_BEGIN: > apply_handle_begin(s); > - break; > - /* COMMIT */ > - case 'C': > + return; > > I think we can simply use 'return apply_handle_begin;' instead of > adding return in another line. Again, I think we changed this handling > in apply_dispatch() to improve the case where we can detect at the > compile time any missing enum but at this stage it is not clear to me > if that is true. > I don't see much value in writing it like "return apply_handle_begin()"; gives an impression that apply_handle_begin() and apply_dispatch() are returning something which they are not. I would prefer return on separate line unless there's something more than style improvement. I have added rationale behind Enum in the commit message as you suggested in one of the later mails. PFA patch addressing your comments. -- Best Wishes, Ashutosh
From f71a600d3fd49756926deec1b593472a9fd8a8cc Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com> Date: Fri, 16 Oct 2020 12:31:35 +0530 Subject: [PATCH] Enumize top level logical replication actions Logical replication protocol uses single byte character to identify a message type in logical repliation protocol. The code uses string literals for the same. Enumize those so that 1. All the string literals used can be found at a single place. This makes it easy to add more types without the risk of conflicts. 2. It's easy to locate the code handling a given type. 3. When used with switch statements, it is easy to identify the missing cases using -Wswitch. Ashutosh Bapat --- src/backend/replication/logical/proto.c | 26 +++---- src/backend/replication/logical/worker.c | 87 ++++++++++++------------ src/include/replication/logicalproto.h | 27 ++++++++ 3 files changed, 83 insertions(+), 57 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index eb19142b48..fdb31182d7 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in); void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn) { - pq_sendbyte(out, 'B'); /* BEGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN); /* fixed fields */ pq_sendint64(out, txn->final_lsn); @@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'C'); /* sending COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT); /* send the flags field (unused for now) */ pq_sendbyte(out, flags); @@ -112,7 +112,7 @@ void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn) { - pq_sendbyte(out, 'O'); /* ORIGIN */ + pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN); /* fixed fields */ pq_sendint64(out, origin_lsn); @@ -141,7 +141,7 @@ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'I'); /* action INSERT */ + pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -185,7 +185,7 @@ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, HeapTuple newtuple, bool binary) { - pq_sendbyte(out, 'U'); /* action UPDATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX); - pq_sendbyte(out, 'D'); /* action DELETE */ + pq_sendbyte(out, LOGICAL_REP_MSG_DELETE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out, int i; uint8 flags = 0; - pq_sendbyte(out, 'T'); /* action TRUNCATE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) { char *relname; - pq_sendbyte(out, 'R'); /* sending RELATION */ + pq_sendbyte(out, LOGICAL_REP_MSG_RELATION); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid) HeapTuple tup; Form_pg_type typtup; - pq_sendbyte(out, 'Y'); /* sending TYPE */ + pq_sendbyte(out, LOGICAL_REP_MSG_TYPE); /* transaction ID (if not valid, we're not streaming) */ if (TransactionIdIsValid(xid)) @@ -755,7 +755,7 @@ void logicalrep_write_stream_start(StringInfo out, TransactionId xid, bool first_segment) { - pq_sendbyte(out, 'S'); /* action STREAM START */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START); Assert(TransactionIdIsValid(xid)); @@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment) void logicalrep_write_stream_stop(StringInfo out) { - pq_sendbyte(out, 'E'); /* action STREAM END */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END); } /* @@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn, { uint8 flags = 0; - pq_sendbyte(out, 'c'); /* action STREAM COMMIT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT); Assert(TransactionIdIsValid(txn->xid)); @@ -849,7 +849,7 @@ void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid) { - pq_sendbyte(out, 'A'); /* action STREAM ABORT */ + pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT); Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid)); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b0f27e0af8..04684912de 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1896,67 +1896,66 @@ apply_handle_truncate(StringInfo s) static void apply_dispatch(StringInfo s) { - char action = pq_getmsgbyte(s); + LogicalRepMsgType action = pq_getmsgbyte(s); switch (action) { - /* BEGIN */ - case 'B': + case LOGICAL_REP_MSG_BEGIN: apply_handle_begin(s); - break; - /* COMMIT */ - case 'C': + return; + + case LOGICAL_REP_MSG_COMMIT: apply_handle_commit(s); - break; - /* INSERT */ - case 'I': + return; + + case LOGICAL_REP_MSG_INSERT: apply_handle_insert(s); - break; - /* UPDATE */ - case 'U': + return; + + case LOGICAL_REP_MSG_UPDATE: apply_handle_update(s); - break; - /* DELETE */ - case 'D': + return; + + case LOGICAL_REP_MSG_DELETE: apply_handle_delete(s); - break; - /* TRUNCATE */ - case 'T': + return; + + case LOGICAL_REP_MSG_TRUNCATE: apply_handle_truncate(s); - break; - /* RELATION */ - case 'R': + return; + + case LOGICAL_REP_MSG_RELATION: apply_handle_relation(s); - break; - /* TYPE */ - case 'Y': + return; + + case LOGICAL_REP_MSG_TYPE: apply_handle_type(s); - break; - /* ORIGIN */ - case 'O': + return; + + case LOGICAL_REP_MSG_ORIGIN: apply_handle_origin(s); - break; - /* STREAM START */ - case 'S': + return; + + case LOGICAL_REP_MSG_STREAM_START: apply_handle_stream_start(s); - break; - /* STREAM END */ - case 'E': + return; + + case LOGICAL_REP_MSG_STREAM_END: apply_handle_stream_stop(s); - break; - /* STREAM ABORT */ - case 'A': + return; + + case LOGICAL_REP_MSG_STREAM_ABORT: apply_handle_stream_abort(s); - break; - /* STREAM COMMIT */ - case 'c': + return; + + case LOGICAL_REP_MSG_STREAM_COMMIT: apply_handle_stream_commit(s); - break; - default: - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid logical replication message type \"%c\"", action))); + return; } + + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid logical replication message type \"%c\"", action))); } /* diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 0c2cda264e..cca13dae96 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -33,6 +33,33 @@ #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM +/* + * Logical message types + * + * Used by logical replication wire protocol. + * + * Note: though this is an enum, the values are used to identify message types + * in logical replication protocol, which uses a single byte to identify a + * message type. Hence the values should be single byte wide and preferrably + * human readable characters. + */ +typedef enum LogicalRepMsgType +{ + LOGICAL_REP_MSG_BEGIN = 'B', + LOGICAL_REP_MSG_COMMIT = 'C', + LOGICAL_REP_MSG_ORIGIN = 'O', + LOGICAL_REP_MSG_INSERT = 'I', + LOGICAL_REP_MSG_UPDATE = 'U', + LOGICAL_REP_MSG_DELETE = 'D', + LOGICAL_REP_MSG_TRUNCATE = 'T', + LOGICAL_REP_MSG_RELATION = 'R', + LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_STREAM_START = 'S', + LOGICAL_REP_MSG_STREAM_END = 'E', + LOGICAL_REP_MSG_STREAM_COMMIT = 'c', + LOGICAL_REP_MSG_STREAM_ABORT = 'A' +} LogicalRepMsgType; + /* * This struct stores a tuple received via logical replication. * Keep in mind that the columns correspond to the *remote* table. -- 2.17.1