Itagaki Takahiro wrote:
> The point is *memory leak* in dblink when a query is canceled or
> become time-out. I think it is a bug, and my patch could fix it.

Please see if this works for you.

Joe

Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.84
diff -c -r1.84 dblink.c
*** dblink.c	12 Sep 2009 23:20:52 -0000	1.84
--- dblink.c	4 Oct 2009 02:19:17 -0000
***************
*** 97,109 ****
  static char *generate_relation_name(Oid relid);
  static void dblink_connstr_check(const char *connstr);
  static void dblink_security_check(PGconn *conn, remoteConn *rconn);
! static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
  static char *get_connect_string(const char *servername);
  static char *escape_param_str(const char *from);
  
  /* Global */
  static remoteConn *pconn = NULL;
  static HTAB *remoteConnHash = NULL;
  
  /*
   *	Following is list that holds multiple remote connections.
--- 97,111 ----
  static char *generate_relation_name(Oid relid);
  static void dblink_connstr_check(const char *connstr);
  static void dblink_security_check(PGconn *conn, remoteConn *rconn);
! static void dblink_res_error(const char *conname, const char *dblink_context_msg, bool fail);
  static char *get_connect_string(const char *servername);
  static char *escape_param_str(const char *from);
+ static void dblink_error_callback(void *arg);
  
  /* Global */
  static remoteConn *pconn = NULL;
  static HTAB *remoteConnHash = NULL;
+ static PGresult *res = NULL;
  
  /*
   *	Following is list that holds multiple remote connections.
***************
*** 143,149 ****
--- 145,154 ----
  	do { \
  			msg = pstrdup(PQerrorMessage(conn)); \
  			if (res) \
+ 			{ \
  				PQclear(res); \
+ 				res = NULL; \
+ 			} \
  			elog(ERROR, "%s: %s", p2, msg); \
  	} while (0)
  
***************
*** 212,217 ****
--- 217,238 ----
  			} \
  	} while (0)
  
+ #define ERRORCONTEXTCALLBACK \
+ 	ErrorContextCallback	dberrcontext
+ 
+ #define PUSH_DBERRCONTEXT(_error_callback_) \
+ 	do { \
+ 		dberrcontext.callback = _error_callback_; \
+ 		dberrcontext.arg = (void *) NULL; \
+ 		dberrcontext.previous = error_context_stack; \
+ 		error_context_stack = &dberrcontext; \
+ 	} while (0)
+ 
+ #define POP_DBERRCONTEXT \
+ 	do { \
+ 		error_context_stack = dberrcontext.previous; \
+ 	} while (0)
+ 
  /*
   * Create a persistent connection to another database
   */
***************
*** 325,331 ****
  dblink_open(PG_FUNCTION_ARGS)
  {
  	char	   *msg;
- 	PGresult   *res = NULL;
  	PGconn	   *conn = NULL;
  	char	   *curname = NULL;
  	char	   *sql = NULL;
--- 346,351 ----
***************
*** 333,339 ****
--- 353,361 ----
  	StringInfoData buf;
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
+ 	ERRORCONTEXTCALLBACK;
  
+ 	PUSH_DBERRCONTEXT(dblink_error_callback);
  	DBLINK_INIT;
  	initStringInfo(&buf);
  
***************
*** 381,389 ****
  	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
  	{
  		res = PQexec(conn, "BEGIN");
! 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
  			DBLINK_RES_INTERNALERROR("begin error");
  		PQclear(res);
  		rconn->newXactForCursor = TRUE;
  
  		/*
--- 403,412 ----
  	if (PQtransactionStatus(conn) == PQTRANS_IDLE)
  	{
  		res = PQexec(conn, "BEGIN");
! 		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  			DBLINK_RES_INTERNALERROR("begin error");
  		PQclear(res);
+ 		res = NULL;
  		rconn->newXactForCursor = TRUE;
  
  		/*
***************
*** 402,412 ****
  	res = PQexec(conn, buf.data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
! 		dblink_res_error(conname, res, "could not open cursor", fail);
  		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
  	}
- 
  	PQclear(res);
  	PG_RETURN_TEXT_P(cstring_to_text("OK"));
  }
  
--- 425,437 ----
  	res = PQexec(conn, buf.data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
! 		dblink_res_error(conname, "could not open cursor", fail);
  		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
  	}
  	PQclear(res);
+ 	res = NULL;
+ 
+ 	POP_DBERRCONTEXT;
  	PG_RETURN_TEXT_P(cstring_to_text("OK"));
  }
  
***************
*** 418,431 ****
  dblink_close(PG_FUNCTION_ARGS)
  {
  	PGconn	   *conn = NULL;
- 	PGresult   *res = NULL;
  	char	   *curname = NULL;
  	char	   *conname = NULL;
  	StringInfoData buf;
  	char	   *msg;
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
! 
  	DBLINK_INIT;
  	initStringInfo(&buf);
  
--- 443,457 ----
  dblink_close(PG_FUNCTION_ARGS)
  {
  	PGconn	   *conn = NULL;
  	char	   *curname = NULL;
  	char	   *conname = NULL;
  	StringInfoData buf;
  	char	   *msg;
  	remoteConn *rconn = NULL;
  	bool		fail = true;	/* default to backward compatible behavior */
! 	ERRORCONTEXTCALLBACK;
! 	
! 	PUSH_DBERRCONTEXT(dblink_error_callback);
  	DBLINK_INIT;
  	initStringInfo(&buf);
  
***************
*** 471,481 ****
  	res = PQexec(conn, buf.data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
! 		dblink_res_error(conname, res, "could not close cursor", fail);
  		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
  	}
- 
  	PQclear(res);
  
  	/* if we started a transaction, decrement cursor count */
  	if (rconn->newXactForCursor)
--- 497,507 ----
  	res = PQexec(conn, buf.data);
  	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  	{
! 		dblink_res_error(conname, "could not close cursor", fail);
  		PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
  	}
  	PQclear(res);
+ 	res = NULL;
  
  	/* if we started a transaction, decrement cursor count */
  	if (rconn->newXactForCursor)
***************
*** 488,499 ****
  			rconn->newXactForCursor = FALSE;
  
  			res = PQexec(conn, "COMMIT");
! 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
  				DBLINK_RES_INTERNALERROR("commit error");
  			PQclear(res);
  		}
  	}
  
  	PG_RETURN_TEXT_P(cstring_to_text("OK"));
  }
  
--- 514,527 ----
  			rconn->newXactForCursor = FALSE;
  
  			res = PQexec(conn, "COMMIT");
! 			if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
  				DBLINK_RES_INTERNALERROR("commit error");
  			PQclear(res);
+ 			res = NULL;
  		}
  	}
  
+ 	POP_DBERRCONTEXT;
  	PG_RETURN_TEXT_P(cstring_to_text("OK"));
  }
  
***************
*** 509,519 ****
  	int			call_cntr;
  	int			max_calls;
  	AttInMetadata *attinmeta;
- 	PGresult   *res = NULL;
  	MemoryContext oldcontext;
  	char	   *conname = NULL;
  	remoteConn *rconn = NULL;
! 
  	DBLINK_INIT;
  
  	/* stuff done only on the first call of the function */
--- 537,548 ----
  	int			call_cntr;
  	int			max_calls;
  	AttInMetadata *attinmeta;
  	MemoryContext oldcontext;
  	char	   *conname = NULL;
  	remoteConn *rconn = NULL;
! 	ERRORCONTEXTCALLBACK;
! 	
! 	PUSH_DBERRCONTEXT(dblink_error_callback);
  	DBLINK_INIT;
  
  	/* stuff done only on the first call of the function */
***************
*** 585,597 ****
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
  		{
! 			dblink_res_error(conname, res, "could not fetch from cursor", fail);
  			SRF_RETURN_DONE(funcctx);
  		}
  		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
  		{
  			/* cursor does not exist - closed already or bad name */
  			PQclear(res);
  			ereport(ERROR,
  					(errcode(ERRCODE_INVALID_CURSOR_NAME),
  					 errmsg("cursor \"%s\" does not exist", curname)));
--- 614,627 ----
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
  		{
! 			dblink_res_error(conname, "could not fetch from cursor", fail);
  			SRF_RETURN_DONE(funcctx);
  		}
  		else if (PQresultStatus(res) == PGRES_COMMAND_OK)
  		{
  			/* cursor does not exist - closed already or bad name */
  			PQclear(res);
+ 			res = NULL;
  			ereport(ERROR,
  					(errcode(ERRCODE_INVALID_CURSOR_NAME),
  					 errmsg("cursor \"%s\" does not exist", curname)));
***************
*** 635,640 ****
--- 665,671 ----
  		if (funcctx->max_calls < 1)
  		{
  			PQclear(res);
+ 			res = NULL;
  			SRF_RETURN_DONE(funcctx);
  		}
  
***************
*** 690,701 ****
--- 721,736 ----
  		/* make the tuple into a datum */
  		result = HeapTupleGetDatum(tuple);
  
+ 		POP_DBERRCONTEXT;
  		SRF_RETURN_NEXT(funcctx, result);
  	}
  	else
  	{
  		/* do when there is no more left */
  		PQclear(res);
+ 		res = NULL;
+ 
+ 		POP_DBERRCONTEXT;
  		SRF_RETURN_DONE(funcctx);
  	}
  }
***************
*** 755,766 ****
  	int			max_calls;
  	AttInMetadata *attinmeta;
  	char	   *msg;
- 	PGresult   *res = NULL;
  	bool		is_sql_cmd = false;
  	char	   *sql_cmd_status = NULL;
  	MemoryContext oldcontext;
  	bool		freeconn = false;
! 
  	DBLINK_INIT;
  
  	/* stuff done only on the first call of the function */
--- 790,802 ----
  	int			max_calls;
  	AttInMetadata *attinmeta;
  	char	   *msg;
  	bool		is_sql_cmd = false;
  	char	   *sql_cmd_status = NULL;
  	MemoryContext oldcontext;
  	bool		freeconn = false;
! 	ERRORCONTEXTCALLBACK;
! 	
! 	PUSH_DBERRCONTEXT(dblink_error_callback);
  	DBLINK_INIT;
  
  	/* stuff done only on the first call of the function */
***************
*** 857,863 ****
  		{
  			if (freeconn)
  				PQfinish(conn);
! 			dblink_res_error(conname, res, "could not execute query", fail);
  			MemoryContextSwitchTo(oldcontext);
  			SRF_RETURN_DONE(funcctx);
  		}
--- 893,899 ----
  		{
  			if (freeconn)
  				PQfinish(conn);
! 			dblink_res_error(conname, "could not execute query", fail);
  			MemoryContextSwitchTo(oldcontext);
  			SRF_RETURN_DONE(funcctx);
  		}
***************
*** 926,932 ****
--- 962,971 ----
  		if (funcctx->max_calls < 1)
  		{
  			if (res)
+ 			{
  				PQclear(res);
+ 				res = NULL;
+ 			}
  			MemoryContextSwitchTo(oldcontext);
  			SRF_RETURN_DONE(funcctx);
  		}
***************
*** 984,995 ****
--- 1023,1038 ----
  		/* make the tuple into a datum */
  		result = HeapTupleGetDatum(tuple);
  
+ 		POP_DBERRCONTEXT;
  		SRF_RETURN_NEXT(funcctx, result);
  	}
  	else
  	{
  		/* do when there is no more left */
  		PQclear(res);
+ 		res = NULL;
+ 
+ 		POP_DBERRCONTEXT;
  		SRF_RETURN_DONE(funcctx);
  	}
  }
***************
*** 1119,1125 ****
  dblink_exec(PG_FUNCTION_ARGS)
  {
  	char	   *msg;
- 	PGresult   *res = NULL;
  	text	   *sql_cmd_status = NULL;
  	TupleDesc	tupdesc = NULL;
  	PGconn	   *conn = NULL;
--- 1162,1167 ----
***************
*** 1129,1135 ****
  	remoteConn *rconn = NULL;
  	bool		freeconn = false;
  	bool		fail = true;	/* default to backward compatible behavior */
! 
  	DBLINK_INIT;
  
  	if (PG_NARGS() == 3)
--- 1171,1179 ----
  	remoteConn *rconn = NULL;
  	bool		freeconn = false;
  	bool		fail = true;	/* default to backward compatible behavior */
! 	ERRORCONTEXTCALLBACK;
! 	
! 	PUSH_DBERRCONTEXT(dblink_error_callback);
  	DBLINK_INIT;
  
  	if (PG_NARGS() == 3)
***************
*** 1172,1178 ****
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
! 		dblink_res_error(conname, res, "could not execute command", fail);
  
  		/* need a tuple descriptor representing one TEXT column */
  		tupdesc = CreateTemplateTupleDesc(1, false);
--- 1216,1222 ----
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
  	{
! 		dblink_res_error(conname, "could not execute command", fail);
  
  		/* need a tuple descriptor representing one TEXT column */
  		tupdesc = CreateTemplateTupleDesc(1, false);
***************
*** 1198,1207 ****
--- 1242,1253 ----
  		 */
  		sql_cmd_status = cstring_to_text(PQcmdStatus(res));
  		PQclear(res);
+ 		res = NULL;
  	}
  	else
  	{
  		PQclear(res);
+ 		res = NULL;
  		ereport(ERROR,
  				(errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
  				 errmsg("statement returning results not allowed")));
***************
*** 1211,1216 ****
--- 1257,1263 ----
  	if (freeconn)
  		PQfinish(conn);
  
+ 	POP_DBERRCONTEXT;
  	PG_RETURN_TEXT_P(sql_cmd_status);
  }
  
***************
*** 2418,2424 ****
  }
  
  static void
! dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail)
  {
  	int			level;
  	char	   *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
--- 2465,2471 ----
  }
  
  static void
! dblink_res_error(const char *conname, const char *dblink_context_msg, bool fail)
  {
  	int			level;
  	char	   *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
***************
*** 2453,2459 ****
--- 2500,2509 ----
  	xpstrdup(message_context, pg_diag_context);
  
  	if (res)
+ 	{
  		PQclear(res);
+ 		res = NULL;
+ 	}
  
  	if (conname)
  		dblink_context_conname = conname;
***************
*** 2550,2552 ****
--- 2600,2615 ----
  
  	return buf->data;
  }
+ 
+ /*
+  * error context callback to let us free resources
+  */
+ static void
+ dblink_error_callback(void *arg)
+ {
+ 	if (res)
+ 	{
+ 		PQclear(res);
+ 		res = NULL;
+ 	}
+ }

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to