I'm attaching the whole patch since commitfest failed to ingest the
last incremental on CI.


út 5. 1. 2021 v 2:32 odesílatel Josef Šimánek <josef.sima...@gmail.com> napsal:
>
> út 5. 1. 2021 v 0:46 odesílatel Tomas Vondra
> <tomas.von...@enterprisedb.com> napsal:
> >
> > Hi,
> >
> > I did take a quick look today, and I have a couple minor comments:
>
> Hi! Thanks for your time.
>
> > 1) The catalog sgml docs seem to mention bytes_processed twice (one of
> > that should be bytes_total), and line_processed (should be "lines_").
>
> Fixed in attached patch.
>
> >
> > 2) I'm not quite sure about not including any info about the command.
> > For example pg_stat_progress_create_index includes info about the
> > command, although it's not very detailed. Not sure how useful would it
> > be to show just COPY FROM / COPY TO, without more details.
> >
> > It's probably possible to extract this from pg_stat_activity, but that
> > may be rather cumbersome (parsing arbitrary SQL and all that). Also,
> > what if the COPY is called from a function etc.?
>
> Any idea where to discuss this? My usecase is really simple. I would
> like to be able to see progress of COPY command by pid. There is a lot
> of COPY info inside CopyToStateData and CopyFromStateData structs. The
> only limitation I see is support of only int64 values for progress
> reporting columns. I'm not sure if it is easily possible to expose for
> example filename this way.
>
> >
> > 3) I wonder if we should have something like lines_estimated. For COPY
> > TO it's pretty simple to calculate it as
> >
> >      bytes_total / (bytes_processed / lines_processed)
> >
> > but what about
> >
> >      COPY (query) TO file
> >
> > In that case we don't know the total amount of bytes / rows, so we can't
> > calculate any estimate. So maybe we could peek into the query plan? But
> > I agree this is something we can add later.
>
> If I remember well one of the original ideas was to be able to pass
> custom bytes_total (or lines_total) by COPY options to be stored in
> copy progress. I can add this in some following patch if still
> welcomed.
>
> For estimates I would prefer to add an additional column to not mix
> those two together (or at least boolean estimated = true/false and
> reuse bytes_total column). If query estimates are welcomed, I can take
> a look at how to reach the query plan and expose those numbers when
> the query is used to estimated_lines and potentially estimated_bytes
> columns. It would be probably a little tricky to calculate
> estimated_bytes for some column types.
>
> Also currently only COPY FROM file supports bytes_total (it is 0 for
> all other scenarios). I think it should be possible for PostgreSQL to
> know the actual amount of lines query returns for some kind of
> queries, but I have no idea where to look at this. If that's possible
> to get, it would be one of the next steps to introduce additional
> column lines_total.
>
> >
> > 4) This comment is a bit confusing, as it mixes "total" and "processed".
> > I'd just say "number of bytes processed so far" instead.
> >
> Fixed in attached patch.
> >
> > Other than that, it seems fine. I'm sure we could add more features, but
> > it seems like a good start - I plan to get this committed once I get a
> > patch fixing the docs issues.
>
> Patch is attached, it should be applied to the top of the previous
> patch. Overall patch (having both patches merged together) could be
> found at 
> https://patch-diff.githubusercontent.com/raw/simi/postgres/pull/6.patch.
>
> > regards
> >
> > --
> > Tomas Vondra
> > EnterpriseDB: http://www.enterprisedb.com
> > The Enterprise PostgreSQL Company
From 847b227dac1ce2e9554a32ff95b8d618f8725843 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.sima...@gmail.com>
Date: Fri, 1 Jan 2021 01:14:47 +0100
Subject: [PATCH 1/2] Add pg_stat_progress_copy view with COPY progress report.

---
 doc/src/sgml/monitoring.sgml             | 101 +++++++++++++++++++++++
 src/backend/catalog/system_views.sql     |  11 +++
 src/backend/commands/copyfrom.c          |  16 +++-
 src/backend/commands/copyfromparse.c     |   4 +
 src/backend/commands/copyto.c            |  21 ++++-
 src/backend/utils/adt/pgstatfuncs.c      |   2 +
 src/include/commands/copyfrom_internal.h |   1 +
 src/include/commands/progress.h          |   5 ++
 src/include/pgstat.h                     |   3 +-
 src/test/regress/expected/rules.out      |   9 ++
 10 files changed, 168 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3d6c90130677..51d261defd94 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -399,6 +399,12 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       </entry>
      </row>
 
+     <row>
+      <entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry>
+      <entry>One row for each backend running <command>COPY</command>, showing current progress.
+       See <xref linkend='copy-progress-reporting'/>.
+      </entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
@@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
    which support progress reporting are <command>ANALYZE</command>,
    <command>CLUSTER</command>,
    <command>CREATE INDEX</command>, <command>VACUUM</command>,
+   <command>COPY</command>,
    and <xref linkend="protocol-replication-base-backup"/> (i.e., replication
    command that <xref linkend="app-pgbasebackup"/> issues to take
    a base backup).
@@ -6396,6 +6403,100 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
   </table>
 
  </sect2>
+
+ <sect2 id="copy-progress-reporting">
+  <title>COPY Progress Reporting</title>
+
+  <para>
+   Whenever <command>COPY</command> is running, the
+   <structname>pg_stat_copy_progress</structname> view will contain one row
+   for each backend that is currently running <command>COPY</command> command.
+   The table bellow describes the information that will be reported and provide
+   information how to interpret it.
+  </para>
+
+  <table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy">
+   <title><structname>pg_stat_progress_copy</structname> View</title>
+   <tgroup cols="1">
+    <thead>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       Column Type
+      </para>
+      <para>
+       Description
+      </para></entry>
+     </row>
+    </thead>
+
+    <tbody>
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>pid</structfield> <type>integer</type>
+      </para>
+      <para>
+       Process ID of backend.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datid</structfield> <type>text</type>
+      </para>
+      <para>
+       OID of the database to which this backend is connected.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>datname</structfield> <type>name</type>
+      </para>
+      <para>
+       Name of the database to which this backend is connected.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>relid</structfield> <type>oid</type>
+      </para>
+      <para>
+       OID of the table on which the <command>COPY</command> command is executed. It is set to 0 if <structfield>SELECT query</structfield> is provided.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>bytes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of bytes already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>bytes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Size of source file for <command>COPY FROM</command> command in bytes. It is set to 0 if not available.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>line_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of lines already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+    </tbody>
+   </tgroup>
+  </table>
+ </sect2>
+
  </sect1>
 
  <sect1 id="dynamic-trace">
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b140c210bc79..d2fb40d1d076 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS
         S.param5 AS tablespaces_streamed
     FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
 
+
+CREATE VIEW pg_stat_progress_copy AS
+    SELECT
+        S.pid AS pid, S.datid AS datid, D.datname AS datname,
+        S.relid AS relid,
+        S.param1 AS bytes_processed,
+        S.param2 AS bytes_total,
+        S.param3 AS lines_processed
+    FROM pg_stat_get_progress_info('COPY') AS S
+        LEFT JOIN pg_database D ON S.datid = D.oid;
+
 CREATE VIEW pg_user_mappings AS
     SELECT
         U.oid       AS umid,
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 1b14e9a6eb03..b938a55f66ad 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -25,6 +25,7 @@
 #include "access/xlog.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
@@ -35,6 +36,7 @@
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate)
 			/*
 			 * We count only tuples not suppressed by a BEFORE INSERT trigger
 			 * or FDW; this is the same definition used by nodeModifyTable.c
-			 * for counting tuples inserted by an INSERT command.
+			 * for counting tuples inserted by an INSERT command. Update
+			 * progress as well
 			 */
-			processed++;
+			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
 		}
 	}
 
@@ -1415,6 +1418,11 @@ BeginCopyFrom(ParseState *pstate,
 		}
 	}
 
+
+	/* initialize progress */
+	pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+	cstate->bytes_processed = 0;
+
 	/* We keep those variables in cstate. */
 	cstate->in_functions = in_functions;
 	cstate->typioparams = typioparams;
@@ -1479,6 +1487,8 @@ BeginCopyFrom(ParseState *pstate,
 				ereport(ERROR,
 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 						 errmsg("\"%s\" is a directory", cstate->filename)));
+
+			pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
 		}
 	}
 
@@ -1522,6 +1532,8 @@ EndCopyFrom(CopyFromState cstate)
 							cstate->filename)));
 	}
 
+	pgstat_progress_end_command();
+
 	MemoryContextDelete(cstate->copycontext);
 	pfree(cstate);
 }
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 34ed3cfcd5b4..e655c6ed9502 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -20,11 +20,13 @@
 
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
+#include "commands/progress.h"
 #include "executor/executor.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate)
 	cstate->raw_buf[nbytes] = '\0';
 	cstate->raw_buf_index = 0;
 	cstate->raw_buf_len = nbytes;
+	cstate->bytes_processed += nbytes;
+	pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
 	return (inbytes > 0);
 }
 
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c7e5f0444631..a6eba642457c 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -24,6 +24,7 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "commands/copy.h"
+#include "commands/progress.h"
 #include "executor/execdesc.h"
 #include "executor/executor.h"
 #include "executor/tuptable.h"
@@ -32,6 +33,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
 #include "tcop/tcopprot.h"
@@ -95,6 +97,7 @@ typedef struct CopyToStateData
 
 	FmgrInfo   *out_functions;	/* lookup info for output functions */
 	MemoryContext rowcontext;	/* per-row evaluation context */
+	uint64      bytes_processed;/* total # of bytes processed, used for progress reporting */
 
 } CopyToStateData;
 
@@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate)
 			break;
 	}
 
+	/* Update the progress */
+	cstate->bytes_processed += fe_msgbuf->len;
+	pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
+
 	resetStringInfo(fe_msgbuf);
 }
 
@@ -363,6 +370,8 @@ EndCopy(CopyToState cstate)
 							cstate->filename)));
 	}
 
+	pgstat_progress_end_command();
+
 	MemoryContextDelete(cstate->copycontext);
 	pfree(cstate);
 }
@@ -760,6 +769,10 @@ BeginCopyTo(ParseState *pstate,
 		}
 	}
 
+	/* initialize progress */
+	pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+	cstate->bytes_processed = 0;
+
 	MemoryContextSwitchTo(oldcontext);
 
 	return cstate;
@@ -938,7 +951,9 @@ CopyTo(CopyToState cstate)
 
 			/* Format and send the data */
 			CopyOneRowTo(cstate, slot);
-			processed++;
+
+			/* Increment amount of processed tuples and update the progress */
+			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
 		}
 
 		ExecDropSingleTupleTableSlot(slot);
@@ -1303,7 +1318,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Send the data */
 	CopyOneRowTo(cstate, slot);
-	myState->processed++;
+
+	/* Increment amount of processed tuples and update the progress */
+	pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
 
 	return true;
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 6afe1b6f56eb..c83f47390bf4 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 		cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
 	else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
 		cmdtype = PROGRESS_COMMAND_BASEBACKUP;
+	else if (pg_strcasecmp(cmd, "COPY") == 0)
+		cmdtype = PROGRESS_COMMAND_COPY;
 	else
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index c15ea803c329..ae76be295a9b 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -154,6 +154,7 @@ typedef struct CopyFromStateData
 	char	   *raw_buf;
 	int			raw_buf_index;	/* next byte to process */
 	int			raw_buf_len;	/* total # of bytes stored */
+	uint64      bytes_processed;/* total # of bytes processed, used for progress reporting */
 	/* Shorthand for number of unconsumed bytes available in raw_buf */
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 } CopyFromStateData;
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 36b073e67757..fa0f65eb8f58 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -133,4 +133,9 @@
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE		4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL			5
 
+/* Commands of PROGRESS_COPY */
+#define PROGRESS_COPY_BYTES_PROCESSED 0
+#define PROGRESS_COPY_BYTES_TOTAL 1
+#define PROGRESS_COPY_LINES_PROCESSED 2
+
 #endif
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5954068dec53..b2a8cafc9088 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType
 	PROGRESS_COMMAND_ANALYZE,
 	PROGRESS_COMMAND_CLUSTER,
 	PROGRESS_COMMAND_CREATE_INDEX,
-	PROGRESS_COMMAND_BASEBACKUP
+	PROGRESS_COMMAND_BASEBACKUP,
+	PROGRESS_COMMAND_COPY
 } ProgressCommandType;
 
 #define PGSTAT_NUM_PROGRESS_PARAM	20
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6293ab57bcf6..a687e99d1e4f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid,
     s.param8 AS index_rebuild_count
    FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_progress_copy| SELECT s.pid,
+    s.datid,
+    d.datname,
+    s.relid,
+    s.param1 AS bytes_processed,
+    s.param2 AS bytes_total,
+    s.param3 AS lines_processed
+   FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
+     LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
     s.datid,
     d.datname,

From 6d2ee68b227c05ce4d1eb95a4c4a9c4f7dd6fbfa Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.sima...@gmail.com>
Date: Tue, 5 Jan 2021 02:07:03 +0100
Subject: [PATCH 2/2] Fix docs and comment.

---
 doc/src/sgml/monitoring.sgml             | 4 ++--
 src/include/commands/copyfrom_internal.h | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 51d261defd94..875133303e19 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6477,7 +6477,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>bytes_processed</structfield> <type>bigint</type>
+       <structfield>bytes_total</structfield> <type>bigint</type>
       </para>
       <para>
        Size of source file for <command>COPY FROM</command> command in bytes. It is set to 0 if not available.
@@ -6486,7 +6486,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>line_processed</structfield> <type>bigint</type>
+       <structfield>lines_processed</structfield> <type>bigint</type>
       </para>
       <para>
        Number of lines already processed by <command>COPY</command> command.
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index ae76be295a9b..80fac1e58a12 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -154,7 +154,7 @@ typedef struct CopyFromStateData
 	char	   *raw_buf;
 	int			raw_buf_index;	/* next byte to process */
 	int			raw_buf_len;	/* total # of bytes stored */
-	uint64      bytes_processed;/* total # of bytes processed, used for progress reporting */
+	uint64      bytes_processed;/* number of bytes processed so far */
 	/* Shorthand for number of unconsumed bytes available in raw_buf */
 #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
 } CopyFromStateData;

Reply via email to