On Fri, 30 Oct 2020 at 09:16, Amit Kapila <amit.kapil...@gmail.com> wrote

>
> 1.
> + LOGICAL_REP_MSG_STREAM_ABORT = 'A',
> +} LogicalRepMsgType;
>
> There is no need for a comma after the last message.
>
> Done. Thanks for noticing it.


> 2.
> +/*
> + * Logical message types
> + *
> + * Used by logical replication wire protocol.
> + *
> + * Note: though this is an enum it should fit a single byte and should be
> a
> + * printable character.
> + */
> +typedef enum
> +{
>
> I think we can expand the comments to probably say why we need these
> to fit in a single byte or what problem it can cause if that rule is
> disobeyed. This is to make the next person clear why we are imposing
> such a rule.
>

Done. Please check.


>
> 3.
> +typedef enum
> +{
> ..
> +} LogicalRepMsgType;
>
> There are places in code where we use the enum name
> (LogicalRepMsgType) both in the start and end. See TypeCat,
> CoercionMethod, CoercionCodes, etc. I see places where we use the way
> you have in the code. I would prefer the way we have used at places
> like TypeCat as that makes it easier to read.
>

Not my favourite style since changing the type name requires changing enum
name to keep those consistent. But anyway done.


>
> 4.
>   switch (action)
>   {
> - /* BEGIN */
> - case 'B':
> + case LOGICAL_REP_MSG_BEGIN:
>   apply_handle_begin(s);
> - break;
> - /* COMMIT */
> - case 'C':
> + return;
>
> I think we can simply use 'return apply_handle_begin;' instead of
> adding return in another line. Again, I think we changed this handling
> in apply_dispatch() to improve the case where we can detect at the
> compile time any missing enum but at this stage it is not clear to me
> if that is true.
>

I don't see much value in writing it like "return apply_handle_begin()";
gives an impression that apply_handle_begin() and apply_dispatch() are
returning something which they are not. I would prefer return on separate
line unless there's something more than style improvement.

I have added rationale behind Enum in the commit message as you suggested
in one of the later mails.

PFA patch addressing your comments.
-- 
Best Wishes,
Ashutosh
From f71a600d3fd49756926deec1b593472a9fd8a8cc Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com>
Date: Fri, 16 Oct 2020 12:31:35 +0530
Subject: [PATCH] Enumize top level logical replication actions

Logical replication protocol uses single byte character to identify a
message type in logical repliation protocol. The code uses string
literals for the same. Enumize those so that

1. All the string literals used can be found at a single place. This
makes it easy to add more types without the risk of conflicts.

2. It's easy to locate the code handling a given type.

3. When used with switch statements, it is easy to identify the missing
cases using -Wswitch.

Ashutosh Bapat
---
 src/backend/replication/logical/proto.c  | 26 +++----
 src/backend/replication/logical/worker.c | 87 ++++++++++++------------
 src/include/replication/logicalproto.h   | 27 ++++++++
 3 files changed, 83 insertions(+), 57 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index eb19142b48..fdb31182d7 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -44,7 +44,7 @@ static const char *logicalrep_read_namespace(StringInfo in);
 void
 logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn)
 {
-	pq_sendbyte(out, 'B');		/* BEGIN */
+	pq_sendbyte(out, LOGICAL_REP_MSG_BEGIN);
 
 	/* fixed fields */
 	pq_sendint64(out, txn->final_lsn);
@@ -76,7 +76,7 @@ logicalrep_write_commit(StringInfo out, ReorderBufferTXN *txn,
 {
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'C');		/* sending COMMIT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_COMMIT);
 
 	/* send the flags field (unused for now) */
 	pq_sendbyte(out, flags);
@@ -112,7 +112,7 @@ void
 logicalrep_write_origin(StringInfo out, const char *origin,
 						XLogRecPtr origin_lsn)
 {
-	pq_sendbyte(out, 'O');		/* ORIGIN */
+	pq_sendbyte(out, LOGICAL_REP_MSG_ORIGIN);
 
 	/* fixed fields */
 	pq_sendint64(out, origin_lsn);
@@ -141,7 +141,7 @@ void
 logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 						HeapTuple newtuple, bool binary)
 {
-	pq_sendbyte(out, 'I');		/* action INSERT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_INSERT);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -185,7 +185,7 @@ void
 logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
 						HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 {
-	pq_sendbyte(out, 'U');		/* action UPDATE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
 	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -263,7 +263,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
 		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
-	pq_sendbyte(out, 'D');		/* action DELETE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -317,7 +317,7 @@ logicalrep_write_truncate(StringInfo out,
 	int			i;
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'T');		/* action TRUNCATE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_TRUNCATE);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -369,7 +369,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 {
 	char	   *relname;
 
-	pq_sendbyte(out, 'R');		/* sending RELATION */
+	pq_sendbyte(out, LOGICAL_REP_MSG_RELATION);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -425,7 +425,7 @@ logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
 	HeapTuple	tup;
 	Form_pg_type typtup;
 
-	pq_sendbyte(out, 'Y');		/* sending TYPE */
+	pq_sendbyte(out, LOGICAL_REP_MSG_TYPE);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
@@ -755,7 +755,7 @@ void
 logicalrep_write_stream_start(StringInfo out,
 							  TransactionId xid, bool first_segment)
 {
-	pq_sendbyte(out, 'S');		/* action STREAM START */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_START);
 
 	Assert(TransactionIdIsValid(xid));
 
@@ -788,7 +788,7 @@ logicalrep_read_stream_start(StringInfo in, bool *first_segment)
 void
 logicalrep_write_stream_stop(StringInfo out)
 {
-	pq_sendbyte(out, 'E');		/* action STREAM END */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_END);
 }
 
 /*
@@ -800,7 +800,7 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 {
 	uint8		flags = 0;
 
-	pq_sendbyte(out, 'c');		/* action STREAM COMMIT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_COMMIT);
 
 	Assert(TransactionIdIsValid(txn->xid));
 
@@ -849,7 +849,7 @@ void
 logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 							  TransactionId subxid)
 {
-	pq_sendbyte(out, 'A');		/* action STREAM ABORT */
+	pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
 
 	Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
 
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b0f27e0af8..04684912de 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1896,67 +1896,66 @@ apply_handle_truncate(StringInfo s)
 static void
 apply_dispatch(StringInfo s)
 {
-	char		action = pq_getmsgbyte(s);
+	LogicalRepMsgType action = pq_getmsgbyte(s);
 
 	switch (action)
 	{
-			/* BEGIN */
-		case 'B':
+		case LOGICAL_REP_MSG_BEGIN:
 			apply_handle_begin(s);
-			break;
-			/* COMMIT */
-		case 'C':
+			return;
+
+		case LOGICAL_REP_MSG_COMMIT:
 			apply_handle_commit(s);
-			break;
-			/* INSERT */
-		case 'I':
+			return;
+
+		case LOGICAL_REP_MSG_INSERT:
 			apply_handle_insert(s);
-			break;
-			/* UPDATE */
-		case 'U':
+			return;
+
+		case LOGICAL_REP_MSG_UPDATE:
 			apply_handle_update(s);
-			break;
-			/* DELETE */
-		case 'D':
+			return;
+
+		case LOGICAL_REP_MSG_DELETE:
 			apply_handle_delete(s);
-			break;
-			/* TRUNCATE */
-		case 'T':
+			return;
+
+		case LOGICAL_REP_MSG_TRUNCATE:
 			apply_handle_truncate(s);
-			break;
-			/* RELATION */
-		case 'R':
+			return;
+
+		case LOGICAL_REP_MSG_RELATION:
 			apply_handle_relation(s);
-			break;
-			/* TYPE */
-		case 'Y':
+			return;
+
+		case LOGICAL_REP_MSG_TYPE:
 			apply_handle_type(s);
-			break;
-			/* ORIGIN */
-		case 'O':
+			return;
+
+		case LOGICAL_REP_MSG_ORIGIN:
 			apply_handle_origin(s);
-			break;
-			/* STREAM START */
-		case 'S':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_START:
 			apply_handle_stream_start(s);
-			break;
-			/* STREAM END */
-		case 'E':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_END:
 			apply_handle_stream_stop(s);
-			break;
-			/* STREAM ABORT */
-		case 'A':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_ABORT:
 			apply_handle_stream_abort(s);
-			break;
-			/* STREAM COMMIT */
-		case 'c':
+			return;
+
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
 			apply_handle_stream_commit(s);
-			break;
-		default:
-			ereport(ERROR,
-					(errcode(ERRCODE_PROTOCOL_VIOLATION),
-					 errmsg("invalid logical replication message type \"%c\"", action)));
+			return;
 	}
+
+	ereport(ERROR,
+			(errcode(ERRCODE_PROTOCOL_VIOLATION),
+			 errmsg("invalid logical replication message type \"%c\"", action)));
 }
 
 /*
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 0c2cda264e..cca13dae96 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -33,6 +33,33 @@
 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM
 
+/*
+ * Logical message types
+ *
+ * Used by logical replication wire protocol.
+ *
+ * Note: though this is an enum, the values are used to identify message types
+ * in logical replication protocol, which uses a single byte to identify a
+ * message type. Hence the values should be single byte wide and preferrably
+ * human readable characters.
+ */
+typedef enum LogicalRepMsgType
+{
+	LOGICAL_REP_MSG_BEGIN = 'B',
+	LOGICAL_REP_MSG_COMMIT = 'C',
+	LOGICAL_REP_MSG_ORIGIN = 'O',
+	LOGICAL_REP_MSG_INSERT = 'I',
+	LOGICAL_REP_MSG_UPDATE = 'U',
+	LOGICAL_REP_MSG_DELETE = 'D',
+	LOGICAL_REP_MSG_TRUNCATE = 'T',
+	LOGICAL_REP_MSG_RELATION = 'R',
+	LOGICAL_REP_MSG_TYPE = 'Y',
+	LOGICAL_REP_MSG_STREAM_START = 'S',
+	LOGICAL_REP_MSG_STREAM_END = 'E',
+	LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
+	LOGICAL_REP_MSG_STREAM_ABORT = 'A'
+} LogicalRepMsgType;
+
 /*
  * This struct stores a tuple received via logical replication.
  * Keep in mind that the columns correspond to the *remote* table.
-- 
2.17.1

Reply via email to