Hello,

finally I had some time to revisit patch and all comments from
https://www.postgresql.org/message-id/CAFp7QwqMGEi4OyyaLEK9DR0%2BE%2BoK3UtA4bEjDVCa4bNkwUY2PQ%40mail.gmail.com
and I have prepared simple version of COPY command progress reporting.

To keep the patch small as possible, I have introduced only a minimum
set of columns. It could be extended later if needed.

Columns are inspired by CREATE INDEX progress report system view.

pid - integer - PID of backend
datid - oid - OID of related database
datname - name - name of related database (this seems redundant, since
oid should be enough, but it is the same in CREATE INDEX)
relid - oid - oid of table related to COPY command, when not known
(for example when copying to file, it is 0)
bytes_processed - bigint - amount of bytes processed
bytes_total - bigint - file size in bytes if COPY FROM file (0 if not
COPY FROM file)
lines_processed - bigint - amount of tuples processed

example output of progress for common use case (import from CSV file):

first console:
yr=# COPY test FROM '/home/retro/test.csv' (FORMAT CSV);

second console:
yr=# SELECT * FROM pg_stat_progress_copy;
  pid   | datid | datname | relid | bytes_processed | bytes_total |
lines_processed
--------+-------+---------+-------+-----------------+-------------+-----------------
 803148 | 16384 | yr      | 16394 |       998965248 |  1777777796 |
    56730126
(1 row)

It is simple to get progress in percents for example by:

yr=# SELECT (bytes_processed/bytes_total::decimal)*100 FROM
pg_stat_progress_copy WHERE pid = 803148;
        ?column?
-------------------------
 50.04287948706048525800

^ ~50% of file processed already

I did some dead simple benchmarking as well. The difference is not
huge. Each command works with 100 millions of tuples. Times are in
seconds.

           test             with progress   master (32d6287)   difference
 ------------------------- --------------- ------------------ ------------
  COPY table TO                    46.102             47.499       -1.397
  COPY query TO                    52.168             49.822        2.346
  COPY table TO PROGRAM            52.345             51.882        0.463
  COPY query TO PROGRAM            54.141             52.763        1.378
  COPY table FROM                  88.970             85.161        3.809
  COPY table FROM PROGRAM          94.393             90.346        4.047

Properly formatted table (since I'm not sure everyone here would be
able to see the table formatted well) and the benchmark source is
present at https://github.com/simi/postgres/pull/6. I have also
included an example output in there.

I'll add this to the current commitfest as well.
From 847b227dac1ce2e9554a32ff95b8d618f8725843 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.sima...@gmail.com>
Date: Fri, 1 Jan 2021 01:14:47 +0100
Subject: [PATCH] Add pg_stat_progress_copy view with COPY progress report.

---
 doc/src/sgml/monitoring.sgml             | 101 +++++++++++++++++++++++
 src/backend/catalog/system_views.sql     |  11 +++
 src/backend/commands/copyfrom.c          |  16 +++-
 src/backend/commands/copyfromparse.c     |   4 +
 src/backend/commands/copyto.c            |  21 ++++-
 src/backend/utils/adt/pgstatfuncs.c      |   2 +
 src/include/commands/copyfrom_internal.h |   1 +
 src/include/commands/progress.h          |   5 ++
 src/include/pgstat.h                     |   3 +-
 src/test/regress/expected/rules.out      |   9 ++
 10 files changed, 168 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3d6c90130677..51d261defd94 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -399,6 +399,12 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry>
+      <entry>One row for each backend running <command>COPY</command>, showing current progress.
+       See <xref linkend='copy-progress-reporting'/>.
+      </entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
@@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
    which support progress reporting are <command>ANALYZE</command>,
    <command>CLUSTER</command>,
    <command>CREATE INDEX</command>, <command>VACUUM</command>,
+   <command>COPY</command>,
    and <xref linkend="protocol-replication-base-backup"/> (i.e., replication
    command that <xref linkend="app-pgbasebackup"/> issues to take
    a base backup).
@@ -6396,6 +6403,100 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
   </table>
 
  </sect2>
+
+ <sect2 id="copy-progress-reporting">
+  <title>COPY Progress Reporting</title>
+
+  <para>
+   Whenever <command>COPY</command> is running, the
+   <structname>pg_stat_copy_progress</structname> view will contain one row
+   for each backend that is currently running <command>COPY</command> command.
+   The table bellow describes the information that will be reported and provide
+   information how to interpret it.
+  </para>
+
+  <table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy">
+   <title><structname>pg_stat_progress_copy</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pid</structfield> <type>integer</type>
+      </para>
+      <para>
+       Process ID of backend.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datid</structfield> <type>text</type>
+      </para>
+      <para>
+       OID of the database to which this backend is connected.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datname</structfield> <type>name</type>
+      </para>
+      <para>
+       Name of the database to which this backend is connected.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>relid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the table on which the <command>COPY</command> command is executed. It is set to 0 if <structfield>SELECT query</structfield> is provided.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>bytes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of bytes already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>bytes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Size of source file for <command>COPY FROM</command> command in bytes. It is set to 0 if not available.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>line_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of lines already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect2>
+
  </sect1>
 
  <sect1 id="dynamic-trace">
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b140c210bc79..d2fb40d1d076 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS
         S.param5 AS tablespaces_streamed
     FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
 
+
+CREATE VIEW pg_stat_progress_copy AS
+    SELECT
+        S.pid AS pid, S.datid AS datid, D.datname AS datname,
+        S.relid AS relid,
+        S.param1 AS bytes_processed,
+        S.param2 AS bytes_total,
+        S.param3 AS lines_processed
+    FROM pg_stat_get_progress_info('COPY') AS S
+        LEFT JOIN pg_database D ON S.datid = D.oid;
+
 CREATE VIEW pg_user_mappings AS
     SELECT
         U.oid       AS umid,
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb03..b938a55f66ad 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -25,6 +25,7 @@
 #include "access/xlog.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
@@ -35,6 +36,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate)
 			/*
 			 * We count only tuples not suppressed by a BEFORE INSERT trigger
 			 * or FDW; this is the same definition used by nodeModifyTable.c
-			 * for counting tuples inserted by an INSERT command.
+			 * for counting tuples inserted by an INSERT command. Update
+			 * progress as well
 			 */
-			processed++;
+			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
 		}
 	}
 
@@ -1415,6 +1418,11 @@ BeginCopyFrom(ParseState *pstate,
 		}
 	}
 
+
+	/* initialize progress */
+	pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+	cstate->bytes_processed = 0;
+
 	/* We keep those variables in cstate. */
 	cstate->in_functions = in_functions;
 	cstate->typioparams = typioparams;
@@ -1479,6 +1487,8 @@ BeginCopyFrom(ParseState *pstate,
 				ereport(ERROR,
 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 						 errmsg("\"%s\" is a directory", cstate->filename)));
+
+			pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
 		}
 	}
 
@@ -1522,6 +1532,8 @@ EndCopyFrom(CopyFromState cstate)
 							cstate->filename)));
 	}
 
+	pgstat_progress_end_command();
+
 	MemoryContextDelete(cstate->copycontext);
 	pfree(cstate);
 }
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 34ed3cfcd5b4..e655c6ed9502 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -20,11 +20,13 @@
 
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "executor/executor.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate)
 	cstate->raw_buf[nbytes] = '\0';
 	cstate->raw_buf_index = 0;
 	cstate->raw_buf_len = nbytes;
+	cstate->bytes_processed += nbytes;
+	pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
 	return (inbytes > 0);
 }
 
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c7e5f0444631..a6eba642457c 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -24,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "commands/copy.h"
+#include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
 #include "executor/tuptable.h"
@@ -32,6 +33,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -95,6 +97,7 @@ typedef struct CopyToStateData
 
 	FmgrInfo   *out_functions;	/* lookup info for output functions */
 	MemoryContext rowcontext;	/* per-row evaluation context */
+	uint64      bytes_processed;/* total # of bytes processed, used for progress reporting */
 
 } CopyToStateData;
 
@@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate)
 			break;
 	}
 
+	/* Update the progress */
+	cstate->bytes_processed += fe_msgbuf->len;
+	pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
 	resetStringInfo(fe_msgbuf);
 }
 
@@ -363,6 +370,8 @@ EndCopy(CopyToState cstate)
 							cstate->filename)));
 	}
 
+	pgstat_progress_end_command();
+
 	MemoryContextDelete(cstate->copycontext);
 	pfree(cstate);
 }
@@ -760,6 +769,10 @@ BeginCopyTo(ParseState *pstate,
 		}
 	}
 
+	/* initialize progress */
+	pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+	cstate->bytes_processed = 0;
+
 	MemoryContextSwitchTo(oldcontext);
 
 	return cstate;
@@ -938,7 +951,9 @@ CopyTo(CopyToState cstate)
 
 			/* Format and send the data */
 			CopyOneRowTo(cstate, slot);
-			processed++;
+
+			/* Increment amount of processed tuples and update the progress */
+			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
 		}
 
 		ExecDropSingleTupleTableSlot(slot);
@@ -1303,7 +1318,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Send the data */
 	CopyOneRowTo(cstate, slot);
-	myState->processed++;
+
+	/* Increment amount of processed tuples and update the progress */
+	pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
 
 	return true;
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 6afe1b6f56eb..c83f47390bf4 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 		cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
 	else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
 		cmdtype = PROGRESS_COMMAND_BASEBACKUP;
+	else if (pg_strcasecmp(cmd, "COPY") == 0)
+		cmdtype = PROGRESS_COMMAND_COPY;
 	else
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index c15ea803c329..ae76be295a9b 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -154,6 +154,7 @@ typedef struct CopyFromStateData
 	char	   *raw_buf;
 	int			raw_buf_index;	/* next byte to process */
 	int			raw_buf_len;	/* total # of bytes stored */
+	uint64      bytes_processed;/* total # of bytes processed, used for progress reporting */
 	/* Shorthand for number of unconsumed bytes available in raw_buf */
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 } CopyFromStateData;
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 36b073e67757..fa0f65eb8f58 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -133,4 +133,9 @@
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE		4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL			5
 
+/* Commands of PROGRESS_COPY */
+#define PROGRESS_COPY_BYTES_PROCESSED 0
+#define PROGRESS_COPY_BYTES_TOTAL 1
+#define PROGRESS_COPY_LINES_PROCESSED 2
+
 #endif
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5954068dec53..b2a8cafc9088 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType
 	PROGRESS_COMMAND_ANALYZE,
 	PROGRESS_COMMAND_CLUSTER,
 	PROGRESS_COMMAND_CREATE_INDEX,
-	PROGRESS_COMMAND_BASEBACKUP
+	PROGRESS_COMMAND_BASEBACKUP,
+	PROGRESS_COMMAND_COPY
 } ProgressCommandType;
 
 #define PGSTAT_NUM_PROGRESS_PARAM	20
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6293ab57bcf6..a687e99d1e4f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid,
     s.param8 AS index_rebuild_count
    FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_progress_copy| SELECT s.pid,
+    s.datid,
+    d.datname,
+    s.relid,
+    s.param1 AS bytes_processed,
+    s.param2 AS bytes_total,
+    s.param3 AS lines_processed
+   FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
+     LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
     s.datid,
     d.datname,

Reply via email to