Thanks Ashutosh For your review, my comments are inline.
On Fri, Jun 19, 2020 at 5:41 PM Ashutosh Sharma <ashu.coe...@gmail.com> wrote:
>
> Hi,
>
> I just got some time to review the first patch in the list i.e. 
> 0001-Copy-code-readjustment-to-support-parallel-copy.patch. As the patch name 
> suggests, it is just trying to reshuffle the existing code for COPY command 
> here and there. There is no extra changes added in the patch as such, but 
> still I do have some review comments, please have a look:
>
> 1) Can you please add some comments atop the new function 
> PopulateAttributes() describing its functionality in detail. Further, this 
> new function contains the code from BeginCopy() to set attribute level 
> options used with COPY FROM such as FORCE_QUOTE, FORCE_NOT_NULL, FORCE_NULL 
> etc. in cstate and along with that it also copies the code from BeginCopy() 
> to set other infos such as client encoding type, encoding conversion etc. 
> Hence, I think it would be good to give it some better name, basically 
> something that matches with what actually it is doing.
>

There is no new code added in this function, some part of code from
BeginCopy was made in to a new function as this part of code will also
be required for the parallel copy workers before the workers start the
actual copy operation. This code was made into a function to avoid
duplication. Changed the function name to PopulateGlobalsForCopyFrom &
added few comments.

> 2) Again, the name for the new function CheckCopyFromValidity() doesn't look 
> good to me. From the function name it appears as if it does the sanity check 
> of the entire COPY FROM command, but actually it is just doing the sanity 
> check for the target relation specified with COPY FROM. So, probably 
> something like CheckTargetRelValidity would look more sensible, I think? TBH, 
> I am not good at naming the functions so you can always ignore my suggestions 
> about function and variable names :)
>

Changed as suggested.
> 3) Any reason for not making CheckCopyFromValidity as a macro instead of a 
> new function. It is just doing the sanity check for the target relation.
>

I felt there is reasonable number of lines in the function & it is not
in performance intensive path, so I preferred function over macro.
Your thoughts?

> 4) Earlier in CopyReadLine() function while trying to clear the EOL marker 
> from cstate->line_buf.data (copied data), we were not checking if the line 
> read by CopyReadLineText() function is a header line or not, but I can see 
> that your patch checks that before clearing the EOL marker. Any reason for 
> this extra check?
>

If you see the caller of CopyReadLine, i.e. NextCopyFromRawFields does
nothing for the header line, server basically calls CopyReadLine
again, it is a kind of small optimization. Anyway server is not going
to do anything with header line, I felt no need to clear EOL marker
for header lines.
/* on input just throw the header line away */
if (cstate->cur_lineno == 0 && cstate->header_line)
{
cstate->cur_lineno++;
if (CopyReadLine(cstate))
return false; /* done */
}

cstate->cur_lineno++;

/* Actually read the line into memory here */
done = CopyReadLine(cstate);
I think no need to make a fix for this. Your thoughts?

> 5) I noticed the below spurious line removal in the patch.
>
> @@ -3839,7 +3953,6 @@ static bool
>  CopyReadLine(CopyState cstate)
>  {
>     bool        result;
> -
>

Fixed.
I have attached the patch for the same with the fixes.
Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From 4455d3e067bda56316bb292e5d010bdf40254fec Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Tue, 23 Jun 2020 06:49:22 +0530
Subject: [PATCH 1/6] Copy code readjustment to support parallel copy.

This patch has the copy code slightly readjusted so that the common code is
separated to functions/macros, these functions/macros will be used by the
workers in the parallel copy code of the upcoming patches. EOL removal is moved
from CopyReadLine to CopyReadLineText, this change was required because in case
of parallel copy the record identification and record updation is done in
CopyReadLineText, before record information is updated in shared memory the new
line characters should be removed.
---
 src/backend/commands/copy.c | 372 ++++++++++++++++++++++++++------------------
 1 file changed, 221 insertions(+), 151 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6b1fd6d..65a504f 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -95,6 +95,9 @@ typedef enum CopyInsertMethod
 	CIM_MULTI_CONDITIONAL		/* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
+#define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
+#define IsHeaderLine()			(cstate->header_line && cstate->cur_lineno == 1)
+
 /*
  * This struct contains all the state variables used throughout a COPY
  * operation. For simplicity, we use the same struct for all variants of COPY,
@@ -219,7 +222,6 @@ typedef struct CopyStateData
 	 * converts it.  Note: we guarantee that there is a \0 at
 	 * raw_buf[raw_buf_len].
 	 */
-#define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
 	char	   *raw_buf;
 	int			raw_buf_index;	/* next byte to process */
 	int			raw_buf_len;	/* total # of bytes stored */
@@ -347,6 +349,88 @@ if (1) \
 	goto not_end_of_copy; \
 } else ((void) 0)
 
+/*
+ * CONVERT_TO_SERVER_ENCODING - convert contents to server encoding.
+ */
+#define CONVERT_TO_SERVER_ENCODING(cstate) \
+{ \
+	/* Done reading the line.  Convert it to server encoding. */ \
+	if (cstate->need_transcoding) \
+	{ \
+		char	   *cvt; \
+		cvt = pg_any_to_server(cstate->line_buf.data, \
+							   cstate->line_buf.len, \
+							   cstate->file_encoding); \
+		if (cvt != cstate->line_buf.data) \
+		{ \
+			/* transfer converted data back to line_buf */ \
+			resetStringInfo(&cstate->line_buf); \
+			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt)); \
+			pfree(cvt); \
+		} \
+	} \
+	/* Now it's safe to use the buffer in error messages */ \
+	cstate->line_buf_converted = true; \
+}
+
+/*
+ * CLEAR_EOL_FROM_COPIED_DATA - Clear EOL from the copied data.
+ */
+#define CLEAR_EOL_FROM_COPIED_DATA(copy_line_data, copy_line_pos, copy_line_size) \
+{ \
+	/* \
+	 * If we didn't hit EOF, then we must have transferred the EOL marker \
+	 * to line_buf along with the data.  Get rid of it. \
+	 */ \
+   switch (cstate->eol_type) \
+   { \
+	   case EOL_NL: \
+		   Assert(copy_line_size >= 1); \
+		   Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
+		   copy_line_data[copy_line_pos - 1] = '\0'; \
+		   copy_line_size--; \
+		   break; \
+	   case EOL_CR: \
+		   Assert(copy_line_size >= 1); \
+		   Assert(copy_line_data[copy_line_pos - 1] == '\r'); \
+		   copy_line_data[copy_line_pos - 1] = '\0'; \
+		   copy_line_size--; \
+		   break; \
+	   case EOL_CRNL: \
+		   Assert(copy_line_size >= 2); \
+		   Assert(copy_line_data[copy_line_pos - 2] == '\r'); \
+		   Assert(copy_line_data[copy_line_pos - 1] == '\n'); \
+		   copy_line_data[copy_line_pos - 2] = '\0'; \
+		   copy_line_size -= 2; \
+		   break; \
+	   case EOL_UNKNOWN: \
+		   /* shouldn't get here */ \
+		   Assert(false); \
+		   break; \
+   } \
+}
+
+/*
+ * CLEAR_EOL_LINE - Wrapper for clearing EOL.
+ */
+#define CLEAR_EOL_LINE() \
+if (!result && !IsHeaderLine()) \
+	CLEAR_EOL_FROM_COPIED_DATA(cstate->line_buf.data, \
+								cstate->line_buf.len, \
+								cstate->line_buf.len) \
+
+/*
+ * INCREMENTPROCESSED - Increment the lines processed.
+ */
+#define INCREMENTPROCESSED(processed)  \
+processed++;
+
+/*
+ * GETPROCESSED - Get the lines processed.
+ */
+#define GETPROCESSED(processed) \
+return processed;
+
 static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
@@ -392,6 +476,8 @@ static bool CopyGetInt32(CopyState cstate, int32 *val);
 static void CopySendInt16(CopyState cstate, int16 val);
 static bool CopyGetInt16(CopyState cstate, int16 *val);
 
+static void PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc	tup_desc,
+							   List *attnamelist);
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -793,6 +879,7 @@ CopyLoadRawBuf(CopyState cstate)
 {
 	int			nbytes;
 	int			inbytes;
+	int         minread = 1;
 
 	if (cstate->raw_buf_index < cstate->raw_buf_len)
 	{
@@ -804,8 +891,11 @@ CopyLoadRawBuf(CopyState cstate)
 	else
 		nbytes = 0;				/* no data need be saved */
 
+	if (cstate->copy_dest == COPY_NEW_FE)
+		minread = RAW_BUF_SIZE - nbytes;
+
 	inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
-						  1, RAW_BUF_SIZE - nbytes);
+						  minread, RAW_BUF_SIZE - nbytes);
 	nbytes += inbytes;
 	cstate->raw_buf[nbytes] = '\0';
 	cstate->raw_buf_index = 0;
@@ -1463,7 +1553,6 @@ BeginCopy(ParseState *pstate,
 {
 	CopyState	cstate;
 	TupleDesc	tupDesc;
-	int			num_phys_attrs;
 	MemoryContext oldcontext;
 
 	/* Allocate workspace and zero all fields */
@@ -1629,6 +1718,24 @@ BeginCopy(ParseState *pstate,
 		tupDesc = cstate->queryDesc->tupDesc;
 	}
 
+	PopulateGlobalsForCopyFrom(cstate, tupDesc, attnamelist);
+	cstate->copy_dest = COPY_FILE;	/* default */
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return cstate;
+}
+
+/*
+ * PopulateGlobalsForCopyFrom - Populates the common variables required for copy
+ * from operation. This is a helper function for BeginCopy function.
+ */
+static void
+PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc	tupDesc,
+							List *attnamelist)
+{
+	int			num_phys_attrs;
+
 	/* Generate or convert list of attributes to process */
 	cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
 
@@ -1748,12 +1855,6 @@ BeginCopy(ParseState *pstate,
 		 pg_database_encoding_max_length() > 1);
 	/* See Multibyte encoding comment above */
 	cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
-
-	cstate->copy_dest = COPY_FILE;	/* default */
-
-	MemoryContextSwitchTo(oldcontext);
-
-	return cstate;
 }
 
 /*
@@ -2646,32 +2747,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 }
 
 /*
- * Copy FROM file to relation.
+ * Check if the relation specified in copy from is valid.
  */
-uint64
-CopyFrom(CopyState cstate)
+static void
+CheckTargetRelValidity(CopyState cstate)
 {
-	ResultRelInfo *resultRelInfo;
-	ResultRelInfo *target_resultRelInfo;
-	ResultRelInfo *prevResultRelInfo = NULL;
-	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
-	ModifyTableState *mtstate;
-	ExprContext *econtext;
-	TupleTableSlot *singleslot = NULL;
-	MemoryContext oldcontext = CurrentMemoryContext;
-
-	PartitionTupleRouting *proute = NULL;
-	ErrorContextCallback errcallback;
-	CommandId	mycid = GetCurrentCommandId(true);
-	int			ti_options = 0; /* start with default options for insert */
-	BulkInsertState bistate = NULL;
-	CopyInsertMethod insertMethod;
-	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
-	uint64		processed = 0;
-	bool		has_before_insert_row_trig;
-	bool		has_instead_insert_row_trig;
-	bool		leafpart_use_multi_insert = false;
-
 	Assert(cstate->rel);
 
 	/*
@@ -2708,27 +2788,6 @@ CopyFrom(CopyState cstate)
 							RelationGetRelationName(cstate->rel))));
 	}
 
-	/*
-	 * If the target file is new-in-transaction, we assume that checking FSM
-	 * for free space is a waste of time.  This could possibly be wrong, but
-	 * it's unlikely.
-	 */
-	if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
-		(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
-		 cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId))
-		ti_options |= TABLE_INSERT_SKIP_FSM;
-
-	/*
-	 * Optimize if new relfilenode was created in this subxact or one of its
-	 * committed children and we won't see those rows later as part of an
-	 * earlier scan or command. The subxact test ensures that if this subxact
-	 * aborts then the frozen rows won't be visible after xact cleanup.  Note
-	 * that the stronger test of exactly which subtransaction created it is
-	 * crucial for correctness of this optimization. The test for an earlier
-	 * scan or command tolerates false negatives. FREEZE causes other sessions
-	 * to see rows they would not see under MVCC, and a false negative merely
-	 * spreads that anomaly to the current session.
-	 */
 	if (cstate->freeze)
 	{
 		/*
@@ -2766,9 +2825,61 @@ CopyFrom(CopyState cstate)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
+	}
+}
+
+/*
+ * Copy FROM file to relation.
+ */
+uint64
+CopyFrom(CopyState cstate)
+{
+	ResultRelInfo *resultRelInfo;
+	ResultRelInfo *target_resultRelInfo;
+	ResultRelInfo *prevResultRelInfo = NULL;
+	EState	   *estate = CreateExecutorState(); /* for ExecConstraints() */
+	ModifyTableState *mtstate;
+	ExprContext *econtext;
+	TupleTableSlot *singleslot = NULL;
+	MemoryContext oldcontext = CurrentMemoryContext;
+
+	PartitionTupleRouting *proute = NULL;
+	ErrorContextCallback errcallback;
+	CommandId	mycid = GetCurrentCommandId(true);
+	int			ti_options = 0; /* start with default options for insert */
+	BulkInsertState bistate = NULL;
+	CopyInsertMethod insertMethod;
+	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
+	uint64		processed = 0;
+	bool		has_before_insert_row_trig;
+	bool		has_instead_insert_row_trig;
+	bool		leafpart_use_multi_insert = false;
+
+	CheckTargetRelValidity(cstate);
 
+	/*
+	 * If the target file is new-in-transaction, we assume that checking FSM
+	 * for free space is a waste of time.  This could possibly be wrong, but
+	 * it's unlikely.
+	 */
+	if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
+		(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
+		 cstate->rel->rd_firstRelfilenodeSubid != InvalidSubTransactionId))
+		ti_options |= TABLE_INSERT_SKIP_FSM;
+
+	/*
+	 * Optimize if new relfilenode was created in this subxact or one of its
+	 * committed children and we won't see those rows later as part of an
+	 * earlier scan or command. The subxact test ensures that if this subxact
+	 * aborts then the frozen rows won't be visible after xact cleanup.  Note
+	 * that the stronger test of exactly which subtransaction created it is
+	 * crucial for correctness of this optimization. The test for an earlier
+	 * scan or command tolerates false negatives. FREEZE causes other sessions
+	 * to see rows they would not see under MVCC, and a false negative merely
+	 * spreads that anomaly to the current session.
+	 */
+	if (cstate->freeze)
 		ti_options |= TABLE_INSERT_FROZEN;
-	}
 
 	/*
 	 * We need a ResultRelInfo so we can use the regular executor's
@@ -3261,7 +3372,7 @@ CopyFrom(CopyState cstate)
 			 * or FDW; this is the same definition used by nodeModifyTable.c
 			 * for counting tuples inserted by an INSERT command.
 			 */
-			processed++;
+			INCREMENTPROCESSED(processed)
 		}
 	}
 
@@ -3316,30 +3427,15 @@ CopyFrom(CopyState cstate)
 
 	FreeExecutorState(estate);
 
-	return processed;
+	GETPROCESSED(processed)
 }
 
 /*
- * Setup to read tuples from a file for COPY FROM.
- *
- * 'rel': Used as a template for the tuples
- * 'filename': Name of server-local file to read
- * 'attnamelist': List of char *, columns to include. NIL selects all cols.
- * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
- *
- * Returns a CopyState, to be passed to NextCopyFrom and related functions.
+ * PopulateCatalogInformation - populate the catalog information.
  */
-CopyState
-BeginCopyFrom(ParseState *pstate,
-			  Relation rel,
-			  const char *filename,
-			  bool is_program,
-			  copy_data_source_cb data_source_cb,
-			  List *attnamelist,
-			  List *options)
+static void
+PopulateCatalogInformation(CopyState cstate)
 {
-	CopyState	cstate;
-	bool		pipe = (filename == NULL);
 	TupleDesc	tupDesc;
 	AttrNumber	num_phys_attrs,
 				num_defaults;
@@ -3349,31 +3445,8 @@ BeginCopyFrom(ParseState *pstate,
 	Oid			in_func_oid;
 	int		   *defmap;
 	ExprState **defexprs;
-	MemoryContext oldcontext;
 	bool		volatile_defexprs;
 
-	cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
-	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
-
-	/* Initialize state variables */
-	cstate->reached_eof = false;
-	cstate->eol_type = EOL_UNKNOWN;
-	cstate->cur_relname = RelationGetRelationName(cstate->rel);
-	cstate->cur_lineno = 0;
-	cstate->cur_attname = NULL;
-	cstate->cur_attval = NULL;
-
-	/* Set up variables to avoid per-attribute overhead. */
-	initStringInfo(&cstate->attribute_buf);
-	initStringInfo(&cstate->line_buf);
-	cstate->line_buf_converted = false;
-	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
-	cstate->raw_buf_index = cstate->raw_buf_len = 0;
-
-	/* Assign range table, we'll need it in CopyFrom. */
-	if (pstate)
-		cstate->range_table = pstate->p_rtable;
-
 	tupDesc = RelationGetDescr(cstate->rel);
 	num_phys_attrs = tupDesc->natts;
 	num_defaults = 0;
@@ -3451,6 +3524,54 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->defexprs = defexprs;
 	cstate->volatile_defexprs = volatile_defexprs;
 	cstate->num_defaults = num_defaults;
+}
+
+/*
+ * Setup to read tuples from a file for COPY FROM.
+ *
+ * 'rel': Used as a template for the tuples
+ * 'filename': Name of server-local file to read
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyState, to be passed to NextCopyFrom and related functions.
+ */
+CopyState
+BeginCopyFrom(ParseState *pstate,
+			  Relation rel,
+			  const char *filename,
+			  bool is_program,
+			  copy_data_source_cb data_source_cb,
+			  List *attnamelist,
+			  List *options)
+{
+	CopyState	cstate;
+	bool		pipe = (filename == NULL);
+	MemoryContext oldcontext;
+
+	cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
+	oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
+	/* Initialize state variables */
+	cstate->reached_eof = false;
+	cstate->eol_type = EOL_UNKNOWN;
+	cstate->cur_relname = RelationGetRelationName(cstate->rel);
+	cstate->cur_lineno = 0;
+	cstate->cur_attname = NULL;
+	cstate->cur_attval = NULL;
+
+	/* Set up variables to avoid per-attribute overhead. */
+	initStringInfo(&cstate->attribute_buf);
+	initStringInfo(&cstate->line_buf);
+	cstate->line_buf_converted = false;
+	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+	cstate->raw_buf_index = cstate->raw_buf_len = 0;
+
+	/* Assign range table, we'll need it in CopyFrom. */
+	if (pstate)
+		cstate->range_table = pstate->p_rtable;
+
+	PopulateCatalogInformation(cstate);
 	cstate->is_program = is_program;
 
 	if (data_source_cb)
@@ -3860,60 +3981,8 @@ CopyReadLine(CopyState cstate)
 			} while (CopyLoadRawBuf(cstate));
 		}
 	}
-	else
-	{
-		/*
-		 * If we didn't hit EOF, then we must have transferred the EOL marker
-		 * to line_buf along with the data.  Get rid of it.
-		 */
-		switch (cstate->eol_type)
-		{
-			case EOL_NL:
-				Assert(cstate->line_buf.len >= 1);
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
-				cstate->line_buf.len--;
-				cstate->line_buf.data[cstate->line_buf.len] = '\0';
-				break;
-			case EOL_CR:
-				Assert(cstate->line_buf.len >= 1);
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
-				cstate->line_buf.len--;
-				cstate->line_buf.data[cstate->line_buf.len] = '\0';
-				break;
-			case EOL_CRNL:
-				Assert(cstate->line_buf.len >= 2);
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
-				Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
-				cstate->line_buf.len -= 2;
-				cstate->line_buf.data[cstate->line_buf.len] = '\0';
-				break;
-			case EOL_UNKNOWN:
-				/* shouldn't get here */
-				Assert(false);
-				break;
-		}
-	}
-
-	/* Done reading the line.  Convert it to server encoding. */
-	if (cstate->need_transcoding)
-	{
-		char	   *cvt;
-
-		cvt = pg_any_to_server(cstate->line_buf.data,
-							   cstate->line_buf.len,
-							   cstate->file_encoding);
-		if (cvt != cstate->line_buf.data)
-		{
-			/* transfer converted data back to line_buf */
-			resetStringInfo(&cstate->line_buf);
-			appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
-			pfree(cvt);
-		}
-	}
-
-	/* Now it's safe to use the buffer in error messages */
-	cstate->line_buf_converted = true;
 
+	CONVERT_TO_SERVER_ENCODING(cstate)
 	return result;
 }
 
@@ -4277,6 +4346,7 @@ not_end_of_copy:
 	 * Transfer any still-uncopied data to line_buf.
 	 */
 	REFILL_LINEBUF;
+	CLEAR_EOL_LINE()
 
 	return result;
 }
-- 
1.8.3.1

Reply via email to