On Mon, Mar 18, 2019 at 7:33 PM Julien Rouhaud <rjuju...@gmail.com> wrote:
>
> On Mon, Mar 18, 2019 at 6:23 PM Yun Li <liyunjuany...@gmail.com> wrote:
> >
> > Let's take one step back. Since queryId is stored in core as Julien pointed 
> > out, can we just add that global to the pg_stat_get_activity and ultimately 
> > exposed in pg_stat_activity view? Then no matter whether PGSS  is on or 
> > off, or however the customer extensions are updating that filed, we expose 
> > that field in that view then enable user to leverage that id to join with 
> > pgss or their extension. Will this sounds a good idea?
>
> I'd greatly welcome expose queryid exposure in pg_stat_activity, and
> also in log_line_prefix.  I'm afraid that it's too late for pg12
> inclusion, but I'll be happy to provide a patch for that for pg13.

Here's a prototype patch for queryid exposure in pg_stat_activity and
log_line prefix.
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index d383de2512..37570825be 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -6260,6 +6260,11 @@ local0.*    /var/log/postgresql
              session processes</entry>
              <entry>no</entry>
             </row>
+            <row>
+             <entry><literal>%Q</literal></entry>
+             <entry>queryid: identifier of session's current query, if any</entry>
+             <entry>yes</entry>
+            </row>
             <row>
              <entry><literal>%%</literal></entry>
              <entry>Literal <literal>%</literal></entry>
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index ac2721c8ad..726c9430d5 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -800,6 +800,19 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
      <entry><type>xid</type></entry>
      <entry>The current backend's <literal>xmin</literal> horizon.</entry>
     </row>
+    <row>
+     <entry><structfield>queryid</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Identifier this backend's most recent query. If
+      <structfield>state</structfield> is <literal>active</literal> this field
+      shows the identifier of the currently executing query. In all other
+      states, it shows the identifier of last query that was executed, unless
+      an error occured which will reset this field to 0.  By default, query
+      identifiers are not computed, so this field will always display 0, unless
+      an additional module that compute query identifiers, such as <xref
+      linkend="pgstatstatements"/>, is configured.
+     </entry>
+    </row>
     <row>
      <entry><structfield>query</structfield></entry>
      <entry><type>text</type></entry>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index d962648bc5..6b62c7db1c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -708,6 +708,7 @@ CREATE VIEW pg_stat_activity AS
             S.state,
             S.backend_xid,
             s.backend_xmin,
+            S.queryid,
             S.query,
             S.backend_type
     FROM pg_stat_get_activity(NULL) AS S
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 63a34760ee..955722d3a4 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -140,6 +140,8 @@ static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
 void
 ExecutorStart(QueryDesc *queryDesc, int eflags)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorStart_hook)
 		(*ExecutorStart_hook) (queryDesc, eflags);
 	else
@@ -300,6 +302,8 @@ ExecutorRun(QueryDesc *queryDesc,
 			ScanDirection direction, uint64 count,
 			bool execute_once)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorRun_hook)
 		(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);
 	else
@@ -399,6 +403,8 @@ standard_ExecutorRun(QueryDesc *queryDesc,
 void
 ExecutorFinish(QueryDesc *queryDesc)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorFinish_hook)
 		(*ExecutorFinish_hook) (queryDesc);
 	else
@@ -459,6 +465,8 @@ standard_ExecutorFinish(QueryDesc *queryDesc)
 void
 ExecutorEnd(QueryDesc *queryDesc)
 {
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	if (ExecutorEnd_hook)
 		(*ExecutorEnd_hook) (queryDesc);
 	else
@@ -538,6 +546,8 @@ ExecutorRewind(QueryDesc *queryDesc)
 	/* It's probably not sensible to rescan updating queries */
 	Assert(queryDesc->operation == CMD_SELECT);
 
+	pg_atomic_write_u64(&MyProc->queryId, queryDesc->plannedstmt->queryId);
+
 	/*
 	 * Switch into per-query memory context
 	 */
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index d898f4ca78..0729c2f1a3 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -24,6 +24,7 @@
 #include "executor/executor.h"
 #include "executor/spi_priv.h"
 #include "miscadmin.h"
+#include "storage/proc.h"
 #include "tcop/pquery.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
@@ -1879,6 +1880,7 @@ _SPI_prepare_plan(const char *src, SPIPlanPtr plan)
 	List	   *plancache_list;
 	ListCell   *list_item;
 	ErrorContextCallback spierrcontext;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 
 	/*
 	 * Setup error traceback support for ereport()
@@ -1935,6 +1937,8 @@ _SPI_prepare_plan(const char *src, SPIPlanPtr plan)
 											   _SPI_current->queryEnv);
 		}
 
+		pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 		/* Finish filling in the CachedPlanSource */
 		CompleteCachedPlan(plansource,
 						   stmt_list,
@@ -2046,6 +2050,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	int			res = 0;
 	bool		pushed_active_snap = false;
 	ErrorContextCallback spierrcontext;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 	CachedPlan *cplan = NULL;
 	ListCell   *lc1;
 
@@ -2135,6 +2140,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 												   _SPI_current->queryEnv);
 			}
 
+			pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 			/* Finish filling in the CachedPlanSource */
 			CompleteCachedPlan(plansource,
 							   stmt_list,
@@ -2305,6 +2312,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				}
 			}
 
+			pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 			/*
 			 * The last canSetTag query sets the status values returned to the
 			 * caller.  Be careful to free any tuptables not returned, to
@@ -2408,6 +2417,7 @@ static int
 _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 {
 	int			operation = queryDesc->operation;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 	int			eflags;
 	int			res;
 
@@ -2472,6 +2482,8 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
 	ExecutorEnd(queryDesc);
 	/* FreeQueryDesc is done by the caller */
 
+	pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 #ifdef SPI_EXECUTOR_STATS
 	if (ShowExecutorStats)
 		ShowUsage("SPI EXECUTOR STATS");
@@ -2519,6 +2531,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 					  DestReceiver *dest)
 {
 	uint64		nfetched;
+	uint64		old_queryId = pg_atomic_read_u64(&MyProc->queryId);
 
 	/* Check that the portal is valid */
 	if (!PortalIsValid(portal))
@@ -2553,6 +2566,8 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count,
 	if (dest->mydest == DestSPI && _SPI_checktuples())
 		elog(ERROR, "consistency check on SPI tuple count failed");
 
+	pg_atomic_write_u64(&MyProc->queryId, old_queryId);
+
 	/* Put the result into place for access by caller */
 	SPI_processed = _SPI_current->processed;
 	SPI_tuptable = _SPI_current->tuptable;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index d6cdd16607..9ee0d72746 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -44,6 +44,7 @@
 #include "parser/parse_target.h"
 #include "parser/parsetree.h"
 #include "rewrite/rewriteManip.h"
+#include "storage/proc.h"
 #include "utils/rel.h"
 
 
@@ -118,6 +119,8 @@ parse_analyze(RawStmt *parseTree, const char *sourceText,
 	if (post_parse_analyze_hook)
 		(*post_parse_analyze_hook) (pstate, query);
 
+	pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
 	free_parsestate(pstate);
 
 	return query;
@@ -151,6 +154,8 @@ parse_analyze_varparams(RawStmt *parseTree, const char *sourceText,
 	if (post_parse_analyze_hook)
 		(*post_parse_analyze_hook) (pstate, query);
 
+	pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
 	free_parsestate(pstate);
 
 	return query;
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 0da5b19719..4693597aa1 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -284,6 +284,7 @@ InitProcGlobal(void)
 		 */
 		pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO);
 		pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO);
+		pg_atomic_init_u64(&(procs[i].queryId), 0);
 	}
 
 	/*
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index f9ce3d8f22..73b92d243f 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -744,6 +744,8 @@ pg_analyze_and_rewrite_params(RawStmt *parsetree,
 	if (post_parse_analyze_hook)
 		(*post_parse_analyze_hook) (pstate, query);
 
+	pg_atomic_write_u64(&MyProc->queryId, query->queryId);
+
 	free_parsestate(pstate);
 
 	if (log_parser_stats)
@@ -4027,6 +4029,12 @@ PostgresMain(int argc, char *argv[],
 		 */
 		debug_query_string = NULL;
 
+		/*
+		 * Also reset the queryId, as any new error encountered before a
+		 * specific query is executed isn't linked to the last saved value
+		 */
+		pg_atomic_write_u64(&MyProc->queryId, 0);
+
 		/*
 		 * Abort the current transaction in order to recover.
 		 */
@@ -4106,6 +4114,12 @@ PostgresMain(int argc, char *argv[],
 		 */
 		doing_extended_query_message = false;
 
+		/*
+		 * Also reset the queryId, so any error encountered before a specific
+		 * query is executed won't display the last saved value
+		 */
+		pg_atomic_write_u64(&MyProc->queryId, 0);
+
 		/*
 		 * Release storage left over from prior query cycle, and create a new
 		 * query input buffer in the cleared MessageContext.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index da1d685c08..b8ba5819d2 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -541,7 +541,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	26
+#define PG_STAT_GET_ACTIVITY_COLS	27
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -855,6 +855,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				values[18] = BoolGetDatum(false);	/* ssl */
 				nulls[19] = nulls[20] = nulls[21] = nulls[22] = nulls[23] = nulls[24] = nulls[25] = true;
 			}
+			values[26] = DatumGetUInt64(pg_atomic_read_u64(&proc->queryId));
 		}
 		else
 		{
@@ -879,6 +880,7 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			nulls[23] = true;
 			nulls[24] = true;
 			nulls[25] = true;
+			nulls[26] = true;
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 8b4720ef3a..8e611bd239 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -2594,6 +2594,20 @@ log_line_prefix(StringInfo buf, ErrorData *edata)
 				else
 					appendStringInfoString(buf, unpack_sql_state(edata->sqlerrcode));
 				break;
+			case 'Q':
+				if (MyProc != NULL)
+				{
+					if (padding != 0)
+						appendStringInfo(buf, "%*ld", padding,
+								pg_atomic_read_u64(&MyProc->queryId));
+					else
+						appendStringInfo(buf, "%ld",
+								pg_atomic_read_u64(&MyProc->queryId));
+				}
+				else if (padding != 0)
+					appendStringInfoSpaces(buf,
+										   padding > 0 ? padding : -padding);
+				break;
 			default:
 				/* format error - ignore it */
 				break;
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index cccb5f145a..1c3efdff9c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -515,6 +515,7 @@
 					#   %t = timestamp without milliseconds
 					#   %m = timestamp with milliseconds
 					#   %n = timestamp with milliseconds (as a Unix epoch)
+					#   %Q = query ID (0 if none or not computed)
 					#   %i = command tag
 					#   %e = SQL state
 					#   %c = session ID
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 84120de362..d796e4905d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5089,9 +5089,10 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn}',
+  proallargtypes =>
+  '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,queryid}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1cee7db89d..8e3a6ae9ca 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -173,6 +173,7 @@ struct PGPROC
 	 */
 	TransactionId procArrayGroupMemberXid;
 
+	pg_atomic_uint64	queryId;	/* current queryid if any */
 	uint32		wait_event_info;	/* proc's wait information */
 
 	/* Support for group transaction status update. */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index f104dc4a62..b10c827507 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1737,9 +1737,10 @@ pg_stat_activity| SELECT s.datid,
     s.state,
     s.backend_xid,
     s.backend_xmin,
+    s.queryid,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, queryid)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1872,7 +1873,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, queryid)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_ssl| SELECT s.pid,
@@ -1884,7 +1885,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn);
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, queryid);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
     st.pid,

Reply via email to