Finally, this patch adds the capability to cache out-of-order messages for workers within the coordinator process. Unlike the cache of jobs, which basically are messages as well, these messages don't trigger a job, but might provide additional information or data payload to a worker that's already processing a job.

In case of Postgres-R, the very first change set received starts an applicator job, while every subsequent change set as well as the final commit decision are such ooo-messages, which are forwarded to the same worker.

A parallel querying feature might want to use similar ooo messages for data payload, forth and back. However, simpler jobs like vacuum don't necessarily need this.
*** src/backend/postmaster/coordinator.c	c12c70fed2f6025831ef4ba9555af0debe063003
--- src/backend/postmaster/coordinator.c	7f25538532e2633e8fed8232b33588eb35f4f122
*************** typedef struct cached_job {
*** 121,126 ****
--- 121,132 ----
  	IMessage *cj_msg;
  } cached_job;
  
+ typedef struct cached_msg {
+ 	Dlelem cm_links;
+ 	IMessage *cm_msg;
+ 	BackendId cm_backend_id;
+ } cached_msg;
+ 
  CoordinatorShmemStruct *CoordinatorShmem;
  
  /*
*************** static void process_cached_jobs(co_datab
*** 156,161 ****
--- 162,172 ----
  						 BackendId backend_id);
  static void process_cached_jobs(co_database *codb);
  
+ static void process_ooo_msgs_for(co_database *codb, BackendId backend_id);
+ static void add_ooo_msg(IMessage *msg, co_database *codb,
+ 						BackendId backend_id);
+ 
+ 
  static void manage_workers(bool can_launch);
  
  static void do_start_worker(Oid dboid);
*************** init_co_database(co_database *codb)
*** 271,276 ****
--- 282,290 ----
  	codb->codb_num_cached_jobs = 0;
  	DLInitList(&codb->codb_cached_jobs);
  
+ 	codb->codb_num_ooo_msgs = 0;
+ 	DLInitList(&codb->codb_ooo_msgs);
+ 
  	codb->codb_num_connected_workers = 0;
  }
  
*************** cache_job(IMessage *msg, co_database *co
*** 291,296 ****
--- 305,328 ----
  	codb->codb_num_cached_jobs++;
  }
  
+ void
+ add_ooo_msg(IMessage *msg, co_database *codb, BackendId backend_id)
+ {
+ 	cached_msg *cm;
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: storing out-of-order message of type %s for database %d",
+ 		 decode_imessage_type(msg->type), codb->codb_dboid);
+ #endif
+ 
+ 	cm = palloc(sizeof(cached_msg));
+ 	DLInitElem(&cm->cm_links, cm);
+ 	cm->cm_msg = msg;
+ 	cm->cm_backend_id = backend_id;
+ 	DLAddTail(&codb->codb_ooo_msgs, &cm->cm_links);
+ 	codb->codb_num_ooo_msgs++;
+ }
+ 
  /*
   * get_idle_worker
   *
*************** dispatch_job(IMessage *msg, co_database 
*** 361,375 ****
  void
  dispatch_job(IMessage *msg, co_database *codb)
  {
!     WorkerInfo worker;
  
! 	if (codb->codb_num_idle_workers > 0)
      {
!         worker = get_idle_worker(codb); 
! 		forward_job(msg, codb, worker->wi_backend_id);
      }
  	else
! 		cache_job(msg, codb);
  }
  
  /*
--- 393,436 ----
  void
  dispatch_job(IMessage *msg, co_database *codb)
  {
! 	bool can_deliver;
! 	BackendId target = InvalidBackendId;
  
! 	can_deliver = can_deliver_cached_job(codb, msg, &target);
! 
! 	if (can_deliver && target == InvalidBackendId)
! 		can_deliver = (codb->codb_num_idle_workers > 0);
! 
! 	if (can_deliver)
! 	{
! 		if (target == InvalidBackendId)
! 			target = get_idle_worker(codb)->wi_backend_id;
! 		forward_job(msg, codb, target);
! 	}
! 	else
! 		cache_job(msg, codb);
! }
! 
! void
! dispatch_ooo_msg(IMessage *msg, co_database *codb)
! {
! 	bool can_deliver;
! 	BackendId target = InvalidBackendId;
! 
! 	can_deliver = can_deliver_cached_job(codb, msg, &target);
! 
! 	if (can_deliver && target == InvalidBackendId)
! 		can_deliver = (codb->codb_num_idle_workers > 0);
! 
! 	if (can_deliver)
      {
! 		if (target == InvalidBackendId)
! 			target = get_idle_worker(codb)->wi_backend_id;
! 		forward_job(msg, codb, target);
! 		process_ooo_msgs_for(codb, target);
      }
  	else
! 		add_ooo_msg(msg, codb, InvalidBackendId);
  }
  
  /*
*************** process_cached_jobs(co_database *codb)
*** 409,414 ****
--- 470,482 ----
  
  			forward_job(job->cj_msg, codb, target);
  			pfree(job);
+ 
+ 			/*
+ 			 * Trigger subsequent ooo messages, as the delivery of a job
+ 			 * might change the delivery status of further out-of-order
+ 			 * messages 
+ 			 */
+ 			process_ooo_msgs_for(codb, target);
  			
  			job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
  		}
*************** process_cached_jobs(co_database *codb)
*** 417,422 ****
--- 485,569 ----
  	}
  }
  
+ void
+ process_ooo_msgs_for(co_database *codb, BackendId backend_id)
+ {
+ 	BackendId target;
+ 	cached_msg *cm;
+ 	bool can_deliver;
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: out-of-order messages: %d",
+ 		 codb->codb_num_ooo_msgs);
+ #endif
+ 
+ 	cm = (cached_msg*) DLGetHead(&codb->codb_ooo_msgs);
+ 	while (cm != NULL)
+ 	{
+ 		target = InvalidBackendId;
+ 		can_deliver = false;
+ 
+ 		if (cm->cm_backend_id == InvalidBackendId ||
+ 			cm->cm_backend_id == backend_id)
+ 		{
+ 			can_deliver = can_deliver_cached_job(codb, cm->cm_msg, &target);
+ 		}
+ 
+ 		if (can_deliver)
+ 		{
+ 			if (target == InvalidBackendId)
+ 			{
+ 				elog(FATAL, "process_ooo_msg_for: no PGPROC, database %d, backend %d, msg type: %s",
+ 					 codb->codb_dboid, backend_id, decode_imessage_type(cm->cm_msg->type));
+ 			}
+ 
+ 			Assert(target != InvalidBackendId);
+ 
+ 			/* remove the message from the cache */
+ 			DLRemove(&cm->cm_links);
+ 			codb->codb_num_ooo_msgs--;
+ 
+ 			forward_job(cm->cm_msg, codb, target);
+ 			pfree(cm);
+ 
+ 			/* re-scan the list of ooo messages */
+ 			cm = (cached_msg*) DLGetHead(&codb->codb_ooo_msgs);
+ 		}
+ 		else
+ 			cm = (cached_msg*) DLGetSucc(&cm->cm_links);
+ 	}
+ }
+ 
+ void
+ drop_ooo_msgs_for(co_database *codb, BackendId backend_id)
+ {
+ 	cached_msg *cm, *next;
+ 
+ 	Assert(backend_id != InvalidBackendId);
+ 
+ #ifdef COORDINATOR_DEBUG
+ 	elog(DEBUG5, "Coordinator: dropping ooo msgs for backend %d",
+ 		 backend_id);
+ #endif
+ 
+ 	cm = (cached_msg*) DLGetHead(&codb->codb_ooo_msgs);
+ 	while (cm != NULL)
+ 	{
+ 		next = (cached_msg*) DLGetSucc(&cm->cm_links);
+ 
+ 		if (cm->cm_backend_id == backend_id)
+ 		{
+ 			/* drop the message */
+ 			DLRemove(&cm->cm_links);
+ 			codb->codb_num_ooo_msgs--;
+ 			pfree(cm);
+ 		}
+ 
+ 		/* continue scanning the list of ooo messages */
+ 		cm = next;
+ 	}
+ }
+ 
  /*
   * populate_co_databases
   *
============================================================
*** src/include/postmaster/coordinator.h	77a64f4b18c483a00779f8d5922066d8aa6fc156
--- src/include/postmaster/coordinator.h	c1dec58e4a6adf1a6870de32b573cc969b9af0dd
*************** typedef struct co_database
*** 117,125 ****
  {
  	Oid					codb_dboid;
  
! 	/* for internal use by the coordinator */
  	int                 codb_num_cached_jobs;
  	Dllist              codb_cached_jobs;
  
  	/* tracking of idle workers, shared */
  	int				    codb_num_idle_workers;
--- 117,140 ----
  {
  	Oid					codb_dboid;
  
! 	/*
! 	 * The following fields are for internal use by the coordinator only. Used
! 	 * to keep track of messages that cannot currently be delivered. We
! 	 * differentiate two types of caches:
! 	 *
! 	 * Cached jobs, which can be delivered to any background worker (but
! 	 * might get cached because all workers are busy). This is a FIFO queue,
! 	 * so an idle worker always gets the first job from the queue.
! 	 *
! 	 * Out of order messages, which are part of another job. The coordinator
! 	 * already knows the background worker these messages need to be sent to.
! 	 * However, to maintain the required ordering of messages, it may decide
! 	 * to cache a message until it is deliverable.
! 	 */
  	int                 codb_num_cached_jobs;
  	Dllist              codb_cached_jobs;
+ 	int                 codb_num_ooo_msgs;
+ 	Dllist              codb_ooo_msgs;
  
  	/* tracking of idle workers, shared */
  	int				    codb_num_idle_workers;
*************** extern void dispatch_job(IMessage *msg, 
*** 159,164 ****
--- 174,181 ----
  extern bool CoordinatorCanLaunchWorker(TimestampTz current_time);
  
  extern void dispatch_job(IMessage *msg, co_database *codb);
+ extern void dispatch_ooo_msg(IMessage *msg, co_database *codb);
+ extern void drop_ooo_msgs_for(co_database *codb, BackendId backend_id);
  
  /* Process startup functions */
  extern int	StartCoordinator(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to