*** dblink.c	Wed Sep 16 13:01:21 2009
--- dblink-fix.c	Wed Sep 16 12:59:23 2009
***************
*** 35,45 ****
--- 35,47 ----
  #include <limits.h>
  
  #include "libpq-fe.h"
+ #include "libpq-int.h"
  #include "fmgr.h"
  #include "funcapi.h"
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/tupdesc.h"
+ #include "access/xact.h"
  #include "catalog/indexing.h"
  #include "catalog/namespace.h"
  #include "catalog/pg_index.h"
***************
*** 54,59 ****
--- 56,62 ----
  #include "nodes/nodes.h"
  #include "nodes/pg_list.h"
  #include "parser/parse_type.h"
+ #include "storage/ipc.h"
  #include "utils/acl.h"
  #include "utils/array.h"
  #include "utils/builtins.h"
***************
*** 76,81 ****
--- 79,87 ----
  	bool		newXactForCursor;		/* Opened a transaction for a cursor */
  } remoteConn;
  
+ extern void _PG_init(void);
+ extern void _PG_fini(void);
+ 
  /*
   * Internal declarations
   */
***************
*** 100,109 ****
--- 106,133 ----
  static void dblink_res_error(const char *conname, PGresult *res, const char *dblink_context_msg, bool fail);
  static char *get_connect_string(const char *servername);
  static char *escape_param_str(const char *from);
+ static PGresult *execute_query(PGconn *conn, const char *sql, bool freeconn);
+ static PGresult *wait_for_result(PGconn *conn, bool freeconn);
+ static void register_result(PGresult *res);
+ static void unregister_result(PGresult *res);
+ static void AtEOXact_dblink(XactEvent event, void *arg);
  
  /* Global */
  static remoteConn *pconn = NULL;
  static HTAB *remoteConnHash = NULL;
+ static List *managedResults = NIL;
+ 
+ void
+ _PG_init(void)
+ {
+ 	RegisterXactCallback(AtEOXact_dblink, 0);
+ }
+ 
+ void
+ _PG_fini(void)
+ {
+ 	UnregisterXactCallback(AtEOXact_dblink, 0);
+ }
  
  /*
   *	Following is list that holds multiple remote connections.
***************
*** 580,586 ****
  		 * PGresult will be long-lived even though we are still in a
  		 * short-lived memory context.
  		 */
! 		res = PQexec(conn, buf.data);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 604,610 ----
  		 * PGresult will be long-lived even though we are still in a
  		 * short-lived memory context.
  		 */
! 		res = execute_query(conn, buf.data, false);
  		if (!res ||
  			(PQresultStatus(res) != PGRES_COMMAND_OK &&
  			 PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 637,642 ****
--- 661,667 ----
  			PQclear(res);
  			SRF_RETURN_DONE(funcctx);
  		}
+ 		register_result(res);
  
  		/*
  		 * switch to memory context appropriate for multiple function calls,
***************
*** 695,701 ****
  	else
  	{
  		/* do when there is no more left */
! 		PQclear(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
--- 720,726 ----
  	else
  	{
  		/* do when there is no more left */
! 		unregister_result(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
***************
*** 839,848 ****
  
  		/* synchronous query, or async result retrieval */
  		if (!is_async)
! 			res = PQexec(conn, sql);
  		else
  		{
! 			res = PQgetResult(conn);
  			/* NULL means we're all done with the async results */
  			if (!res)
  			{
--- 864,873 ----
  
  		/* synchronous query, or async result retrieval */
  		if (!is_async)
! 			res = execute_query(conn, sql, freeconn);
  		else
  		{
! 			res = wait_for_result(conn, freeconn);
  			/* NULL means we're all done with the async results */
  			if (!res)
  			{
***************
*** 930,935 ****
--- 955,961 ----
  			MemoryContextSwitchTo(oldcontext);
  			SRF_RETURN_DONE(funcctx);
  		}
+ 		register_result(res);
  
  		/* store needed metadata for subsequent calls */
  		attinmeta = TupleDescGetAttInMetadata(tupdesc);
***************
*** 989,995 ****
  	else
  	{
  		/* do when there is no more left */
! 		PQclear(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
--- 1015,1021 ----
  	else
  	{
  		/* do when there is no more left */
! 		unregister_result(res);
  		SRF_RETURN_DONE(funcctx);
  	}
  }
***************
*** 1167,1173 ****
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = PQexec(conn, sql);
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
--- 1193,1199 ----
  	if (!conn)
  		DBLINK_CONN_NOT_AVAIL;
  
! 	res = execute_query(conn, sql, freeconn);
  	if (!res ||
  		(PQresultStatus(res) != PGRES_COMMAND_OK &&
  		 PQresultStatus(res) != PGRES_TUPLES_OK))
***************
*** 2550,2552 ****
--- 2576,2748 ----
  
  	return buf->data;
  }
+ 
+ static PGresult *
+ execute_query(PGconn *conn, const char *sql, bool freeconn)
+ {
+ 	/* async query send */
+ 	if (PQsendQuery(conn, sql) != 1)
+ 		elog(NOTICE, "%s", PQerrorMessage(conn));
+ 
+ 	return wait_for_result(conn, freeconn);
+ }
+ 
+ typedef struct
+ {
+ 	PGconn	   *conn;
+ 	bool		freeconn;
+ } cancel_query_args;
+ 
+ static void
+ cancel_query(int code, Datum arg)
+ {
+ 	cancel_query_args *args = (cancel_query_args *) DatumGetPointer(arg);
+ 	PGconn	   *conn = args->conn;
+ 	PGresult   *res;
+ 
+ 	/* cancel current query */
+ 	if (PQisBusy(conn))
+ 	{
+ 		PGcancel   *cancel;
+ 		char		errbuf[256];
+ 
+ 		cancel = PQgetCancel(conn);
+ 		if (cancel != NULL)
+ 		{
+ 			PQcancel(cancel, errbuf, 256);
+ 			PQfreeCancel(cancel);
+ 		}
+ 	}
+ 
+ 	/* discard all results */
+ 	while ((res = PQgetResult(conn)) != NULL)
+ 		PQclear(res);
+ 
+ 	/* disconnect if a temporary connection */
+ 	if (args->freeconn)
+ 		PQfinish(conn);
+ }
+ 
+ /*
+  * copied from PQexecFinish in libpq.
+  */
+ static PGresult *
+ getLastResult(PGconn *conn)
+ {
+ 	PGresult   *result;
+ 	PGresult   *lastResult;
+ 
+ 	lastResult = NULL;
+ 	while ((result = PQgetResult(conn)) != NULL)
+ 	{
+ 		if (lastResult)
+ 		{
+ 			if (lastResult->resultStatus == PGRES_FATAL_ERROR &&
+ 				result->resultStatus == PGRES_FATAL_ERROR)
+ 			{
+ 				PQExpBufferData errorBuf;
+ 
+ 				initPQExpBuffer(&errorBuf);
+ 				if (lastResult->errMsg)
+ 				{
+ 					appendPQExpBufferStr(&errorBuf, lastResult->errMsg);
+ 					free(lastResult->errMsg);
+ 				}
+ 				appendPQExpBufferStr(&errorBuf, result->errMsg);
+ 				lastResult->errMsg = errorBuf.data;
+ 
+ 				PQclear(result);
+ 				result = lastResult;
+ 
+ 				/* Make sure PQerrorMessage agrees with concatenated result */
+ 				resetPQExpBuffer(&conn->errorMessage);
+ 				appendPQExpBufferStr(&conn->errorMessage, result->errMsg);
+ 			}
+ 			else
+ 				PQclear(lastResult);
+ 		}
+ 		lastResult = result;
+ 		if (result->resultStatus == PGRES_COPY_IN ||
+ 			result->resultStatus == PGRES_COPY_OUT ||
+ 			conn->status == CONNECTION_BAD)
+ 			break;
+ 	}
+ 
+ 	return lastResult;
+ }
+ 
+ static PGresult *
+ wait_for_result(PGconn *conn, bool freeconn)
+ {
+ 	PGresult		   *res = NULL;
+ 	cancel_query_args	args;
+ 
+ 	args.conn = conn;
+ 	args.freeconn = freeconn;
+ 
+ 	PG_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(&args));
+ 	{
+ 		for (;;)
+ 		{
+ 			fd_set			rset;
+ 			int				sock;
+ 			struct timeval	tv;
+ 
+ 			CHECK_FOR_INTERRUPTS();
+ 
+ 			PQconsumeInput(conn);
+ 			if (!PQisBusy(conn))
+ 			{
+ 				res = getLastResult(conn);
+ 				break;
+ 			}
+ 
+ 			sock = PQsocket(conn);
+ 			if (sock < 0)
+ 				break;
+ 			FD_ZERO(&rset);
+ 			FD_SET(sock, &rset);
+ 			tv.tv_sec = 1;
+ 			tv.tv_usec = 0;
+ 			if (select(sock + 1, &rset, NULL, NULL, &tv) < 0 && errno != EINTR)
+ 				break;
+ 		}
+ 	}
+ 	PG_END_ENSURE_ERROR_CLEANUP(cancel_query, PointerGetDatum(&args));
+ 
+ 	return res;
+ }
+ 
+ static void
+ register_result(PGresult *res)
+ {
+ 	MemoryContext oldcontext;
+ 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ 	managedResults = lappend(managedResults, res);
+ 	MemoryContextSwitchTo(oldcontext);
+ }
+ 
+ /* unregister and free result */
+ static void
+ unregister_result(PGresult *res)
+ {
+ 	managedResults = list_delete_ptr(managedResults, res);
+ 	PQclear(res);
+ }
+ 
+ static void
+ AtEOXact_dblink(XactEvent event, void *arg)
+ {
+ 	if (managedResults != NIL)
+ 	{
+ 		ListCell *cell;
+ 
+ 		foreach(cell, managedResults)
+ 		{
+ 			PQclear((PGresult *) lfirst(cell));
+ 		}
+ 
+ 		list_free(managedResults);
+ 		managedResults = NIL;
+ 	}
+ }
