Hello, finally I had some time to revisit patch and all comments from https://www.postgresql.org/message-id/CAFp7QwqMGEi4OyyaLEK9DR0%2BE%2BoK3UtA4bEjDVCa4bNkwUY2PQ%40mail.gmail.com and I have prepared simple version of COPY command progress reporting.
To keep the patch small as possible, I have introduced only a minimum set of columns. It could be extended later if needed. Columns are inspired by CREATE INDEX progress report system view. pid - integer - PID of backend datid - oid - OID of related database datname - name - name of related database (this seems redundant, since oid should be enough, but it is the same in CREATE INDEX) relid - oid - oid of table related to COPY command, when not known (for example when copying to file, it is 0) bytes_processed - bigint - amount of bytes processed bytes_total - bigint - file size in bytes if COPY FROM file (0 if not COPY FROM file) lines_processed - bigint - amount of tuples processed example output of progress for common use case (import from CSV file): first console: yr=# COPY test FROM '/home/retro/test.csv' (FORMAT CSV); second console: yr=# SELECT * FROM pg_stat_progress_copy; pid | datid | datname | relid | bytes_processed | bytes_total | lines_processed --------+-------+---------+-------+-----------------+-------------+----------------- 803148 | 16384 | yr | 16394 | 998965248 | 1777777796 | 56730126 (1 row) It is simple to get progress in percents for example by: yr=# SELECT (bytes_processed/bytes_total::decimal)*100 FROM pg_stat_progress_copy WHERE pid = 803148; ?column? ------------------------- 50.04287948706048525800 ^ ~50% of file processed already I did some dead simple benchmarking as well. The difference is not huge. Each command works with 100 millions of tuples. Times are in seconds. test with progress master (32d6287) difference ------------------------- --------------- ------------------ ------------ COPY table TO 46.102 47.499 -1.397 COPY query TO 52.168 49.822 2.346 COPY table TO PROGRAM 52.345 51.882 0.463 COPY query TO PROGRAM 54.141 52.763 1.378 COPY table FROM 88.970 85.161 3.809 COPY table FROM PROGRAM 94.393 90.346 4.047 Properly formatted table (since I'm not sure everyone here would be able to see the table formatted well) and the benchmark source is present at https://github.com/simi/postgres/pull/6. I have also included an example output in there. I'll add this to the current commitfest as well.
From 847b227dac1ce2e9554a32ff95b8d618f8725843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.sima...@gmail.com> Date: Fri, 1 Jan 2021 01:14:47 +0100 Subject: [PATCH] Add pg_stat_progress_copy view with COPY progress report. --- doc/src/sgml/monitoring.sgml | 101 +++++++++++++++++++++++ src/backend/catalog/system_views.sql | 11 +++ src/backend/commands/copyfrom.c | 16 +++- src/backend/commands/copyfromparse.c | 4 + src/backend/commands/copyto.c | 21 ++++- src/backend/utils/adt/pgstatfuncs.c | 2 + src/include/commands/copyfrom_internal.h | 1 + src/include/commands/progress.h | 5 ++ src/include/pgstat.h | 3 +- src/test/regress/expected/rules.out | 9 ++ 10 files changed, 168 insertions(+), 5 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3d6c90130677..51d261defd94 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -399,6 +399,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry> + <entry>One row for each backend running <command>COPY</command>, showing current progress. + See <xref linkend='copy-progress-reporting'/>. + </entry> + </row> </tbody> </tgroup> </table> @@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, which support progress reporting are <command>ANALYZE</command>, <command>CLUSTER</command>, <command>CREATE INDEX</command>, <command>VACUUM</command>, + <command>COPY</command>, and <xref linkend="protocol-replication-base-backup"/> (i.e., replication command that <xref linkend="app-pgbasebackup"/> issues to take a base backup). @@ -6396,6 +6403,100 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, </table> </sect2> + + <sect2 id="copy-progress-reporting"> + <title>COPY Progress Reporting</title> + + <para> + Whenever <command>COPY</command> is running, the + <structname>pg_stat_copy_progress</structname> view will contain one row + for each backend that is currently running <command>COPY</command> command. + The table bellow describes the information that will be reported and provide + information how to interpret it. + </para> + + <table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy"> + <title><structname>pg_stat_progress_copy</structname> View</title> + <tgroup cols="1"> + <thead> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + Column Type + </para> + <para> + Description + </para></entry> + </row> + </thead> + + <tbody> + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>pid</structfield> <type>integer</type> + </para> + <para> + Process ID of backend. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>datid</structfield> <type>text</type> + </para> + <para> + OID of the database to which this backend is connected. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>datname</structfield> <type>name</type> + </para> + <para> + Name of the database to which this backend is connected. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>relid</structfield> <type>oid</type> + </para> + <para> + OID of the table on which the <command>COPY</command> command is executed. It is set to 0 if <structfield>SELECT query</structfield> is provided. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>bytes_processed</structfield> <type>bigint</type> + </para> + <para> + Number of bytes already processed by <command>COPY</command> command. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>bytes_processed</structfield> <type>bigint</type> + </para> + <para> + Size of source file for <command>COPY FROM</command> command in bytes. It is set to 0 if not available. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>line_processed</structfield> <type>bigint</type> + </para> + <para> + Number of lines already processed by <command>COPY</command> command. + </para></entry> + </row> + </tbody> + </tgroup> + </table> + </sect2> + </sect1> <sect1 id="dynamic-trace"> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b140c210bc79..d2fb40d1d076 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS S.param5 AS tablespaces_streamed FROM pg_stat_get_progress_info('BASEBACKUP') AS S; + +CREATE VIEW pg_stat_progress_copy AS + SELECT + S.pid AS pid, S.datid AS datid, D.datname AS datname, + S.relid AS relid, + S.param1 AS bytes_processed, + S.param2 AS bytes_total, + S.param3 AS lines_processed + FROM pg_stat_get_progress_info('COPY') AS S + LEFT JOIN pg_database D ON S.datid = D.oid; + CREATE VIEW pg_user_mappings AS SELECT U.oid AS umid, diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1b14e9a6eb03..b938a55f66ad 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -25,6 +25,7 @@ #include "access/xlog.h" #include "commands/copy.h" #include "commands/copyfrom_internal.h" +#include "commands/progress.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -35,6 +36,7 @@ #include "libpq/pqformat.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate) /* * We count only tuples not suppressed by a BEFORE INSERT trigger * or FDW; this is the same definition used by nodeModifyTable.c - * for counting tuples inserted by an INSERT command. + * for counting tuples inserted by an INSERT command. Update + * progress as well */ - processed++; + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } } @@ -1415,6 +1418,11 @@ BeginCopyFrom(ParseState *pstate, } } + + /* initialize progress */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + cstate->bytes_processed = 0; + /* We keep those variables in cstate. */ cstate->in_functions = in_functions; cstate->typioparams = typioparams; @@ -1479,6 +1487,8 @@ BeginCopyFrom(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); + + pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size); } } @@ -1522,6 +1532,8 @@ EndCopyFrom(CopyFromState cstate) cstate->filename))); } + pgstat_progress_end_command(); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 34ed3cfcd5b4..e655c6ed9502 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -20,11 +20,13 @@ #include "commands/copy.h" #include "commands/copyfrom_internal.h" +#include "commands/progress.h" #include "executor/executor.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "pgstat.h" #include "port/pg_bswap.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate) cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf_index = 0; cstate->raw_buf_len = nbytes; + cstate->bytes_processed += nbytes; + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); return (inbytes > 0); } diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index c7e5f0444631..a6eba642457c 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -24,6 +24,7 @@ #include "access/xact.h" #include "access/xlog.h" #include "commands/copy.h" +#include "commands/progress.h" #include "executor/execdesc.h" #include "executor/executor.h" #include "executor/tuptable.h" @@ -32,6 +33,7 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "optimizer/optimizer.h" +#include "pgstat.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -95,6 +97,7 @@ typedef struct CopyToStateData FmgrInfo *out_functions; /* lookup info for output functions */ MemoryContext rowcontext; /* per-row evaluation context */ + uint64 bytes_processed;/* total # of bytes processed, used for progress reporting */ } CopyToStateData; @@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate) break; } + /* Update the progress */ + cstate->bytes_processed += fe_msgbuf->len; + pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed); + resetStringInfo(fe_msgbuf); } @@ -363,6 +370,8 @@ EndCopy(CopyToState cstate) cstate->filename))); } + pgstat_progress_end_command(); + MemoryContextDelete(cstate->copycontext); pfree(cstate); } @@ -760,6 +769,10 @@ BeginCopyTo(ParseState *pstate, } } + /* initialize progress */ + pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + cstate->bytes_processed = 0; + MemoryContextSwitchTo(oldcontext); return cstate; @@ -938,7 +951,9 @@ CopyTo(CopyToState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - processed++; + + /* Increment amount of processed tuples and update the progress */ + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -1303,7 +1318,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - myState->processed++; + + /* Increment amount of processed tuples and update the progress */ + pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed); return true; } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 6afe1b6f56eb..c83f47390bf4 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) cmdtype = PROGRESS_COMMAND_CREATE_INDEX; else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0) cmdtype = PROGRESS_COMMAND_BASEBACKUP; + else if (pg_strcasecmp(cmd, "COPY") == 0) + cmdtype = PROGRESS_COMMAND_COPY; else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index c15ea803c329..ae76be295a9b 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -154,6 +154,7 @@ typedef struct CopyFromStateData char *raw_buf; int raw_buf_index; /* next byte to process */ int raw_buf_len; /* total # of bytes stored */ + uint64 bytes_processed;/* total # of bytes processed, used for progress reporting */ /* Shorthand for number of unconsumed bytes available in raw_buf */ #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) } CopyFromStateData; diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 36b073e67757..fa0f65eb8f58 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -133,4 +133,9 @@ #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 +/* Commands of PROGRESS_COPY */ +#define PROGRESS_COPY_BYTES_PROCESSED 0 +#define PROGRESS_COPY_BYTES_TOTAL 1 +#define PROGRESS_COPY_LINES_PROCESSED 2 + #endif diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 5954068dec53..b2a8cafc9088 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType PROGRESS_COMMAND_ANALYZE, PROGRESS_COMMAND_CLUSTER, PROGRESS_COMMAND_CREATE_INDEX, - PROGRESS_COMMAND_BASEBACKUP + PROGRESS_COMMAND_BASEBACKUP, + PROGRESS_COMMAND_COPY } ProgressCommandType; #define PGSTAT_NUM_PROGRESS_PARAM 20 diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 6293ab57bcf6..a687e99d1e4f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid, s.param8 AS index_rebuild_count FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); +pg_stat_progress_copy| SELECT s.pid, + s.datid, + d.datname, + s.relid, + s.param1 AS bytes_processed, + s.param2 AS bytes_total, + s.param3 AS lines_processed + FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) + LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_create_index| SELECT s.pid, s.datid, d.datname,