Finally, this patch adds the capability to cache out-of-order messages
for workers within the coordinator process. Unlike the cache of jobs,
which basically are messages as well, these messages don't trigger a
job, but might provide additional information or data payload to a
worker that's already processing a job.
In case of Postgres-R, the very first change set received starts an
applicator job, while every subsequent change set as well as the final
commit decision are such ooo-messages, which are forwarded to the same
worker.
A parallel querying feature might want to use similar ooo messages for
data payload, forth and back. However, simpler jobs like vacuum don't
necessarily need this.
*** src/backend/postmaster/coordinator.c c12c70fed2f6025831ef4ba9555af0debe063003
--- src/backend/postmaster/coordinator.c 7f25538532e2633e8fed8232b33588eb35f4f122
*************** typedef struct cached_job {
*** 121,126 ****
--- 121,132 ----
IMessage *cj_msg;
} cached_job;
+ typedef struct cached_msg {
+ Dlelem cm_links;
+ IMessage *cm_msg;
+ BackendId cm_backend_id;
+ } cached_msg;
+
CoordinatorShmemStruct *CoordinatorShmem;
/*
*************** static void process_cached_jobs(co_datab
*** 156,161 ****
--- 162,172 ----
BackendId backend_id);
static void process_cached_jobs(co_database *codb);
+ static void process_ooo_msgs_for(co_database *codb, BackendId backend_id);
+ static void add_ooo_msg(IMessage *msg, co_database *codb,
+ BackendId backend_id);
+
+
static void manage_workers(bool can_launch);
static void do_start_worker(Oid dboid);
*************** init_co_database(co_database *codb)
*** 271,276 ****
--- 282,290 ----
codb->codb_num_cached_jobs = 0;
DLInitList(&codb->codb_cached_jobs);
+ codb->codb_num_ooo_msgs = 0;
+ DLInitList(&codb->codb_ooo_msgs);
+
codb->codb_num_connected_workers = 0;
}
*************** cache_job(IMessage *msg, co_database *co
*** 291,296 ****
--- 305,328 ----
codb->codb_num_cached_jobs++;
}
+ void
+ add_ooo_msg(IMessage *msg, co_database *codb, BackendId backend_id)
+ {
+ cached_msg *cm;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: storing out-of-order message of type %s for database %d",
+ decode_imessage_type(msg->type), codb->codb_dboid);
+ #endif
+
+ cm = palloc(sizeof(cached_msg));
+ DLInitElem(&cm->cm_links, cm);
+ cm->cm_msg = msg;
+ cm->cm_backend_id = backend_id;
+ DLAddTail(&codb->codb_ooo_msgs, &cm->cm_links);
+ codb->codb_num_ooo_msgs++;
+ }
+
/*
* get_idle_worker
*
*************** dispatch_job(IMessage *msg, co_database
*** 361,375 ****
void
dispatch_job(IMessage *msg, co_database *codb)
{
! WorkerInfo worker;
! if (codb->codb_num_idle_workers > 0)
{
! worker = get_idle_worker(codb);
! forward_job(msg, codb, worker->wi_backend_id);
}
else
! cache_job(msg, codb);
}
/*
--- 393,436 ----
void
dispatch_job(IMessage *msg, co_database *codb)
{
! bool can_deliver;
! BackendId target = InvalidBackendId;
! can_deliver = can_deliver_cached_job(codb, msg, &target);
!
! if (can_deliver && target == InvalidBackendId)
! can_deliver = (codb->codb_num_idle_workers > 0);
!
! if (can_deliver)
! {
! if (target == InvalidBackendId)
! target = get_idle_worker(codb)->wi_backend_id;
! forward_job(msg, codb, target);
! }
! else
! cache_job(msg, codb);
! }
!
! void
! dispatch_ooo_msg(IMessage *msg, co_database *codb)
! {
! bool can_deliver;
! BackendId target = InvalidBackendId;
!
! can_deliver = can_deliver_cached_job(codb, msg, &target);
!
! if (can_deliver && target == InvalidBackendId)
! can_deliver = (codb->codb_num_idle_workers > 0);
!
! if (can_deliver)
{
! if (target == InvalidBackendId)
! target = get_idle_worker(codb)->wi_backend_id;
! forward_job(msg, codb, target);
! process_ooo_msgs_for(codb, target);
}
else
! add_ooo_msg(msg, codb, InvalidBackendId);
}
/*
*************** process_cached_jobs(co_database *codb)
*** 409,414 ****
--- 470,482 ----
forward_job(job->cj_msg, codb, target);
pfree(job);
+
+ /*
+ * Trigger subsequent ooo messages, as the delivery of a job
+ * might change the delivery status of further out-of-order
+ * messages
+ */
+ process_ooo_msgs_for(codb, target);
job = (cached_job*) DLGetHead(&codb->codb_cached_jobs);
}
*************** process_cached_jobs(co_database *codb)
*** 417,422 ****
--- 485,569 ----
}
}
+ void
+ process_ooo_msgs_for(co_database *codb, BackendId backend_id)
+ {
+ BackendId target;
+ cached_msg *cm;
+ bool can_deliver;
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: out-of-order messages: %d",
+ codb->codb_num_ooo_msgs);
+ #endif
+
+ cm = (cached_msg*) DLGetHead(&codb->codb_ooo_msgs);
+ while (cm != NULL)
+ {
+ target = InvalidBackendId;
+ can_deliver = false;
+
+ if (cm->cm_backend_id == InvalidBackendId ||
+ cm->cm_backend_id == backend_id)
+ {
+ can_deliver = can_deliver_cached_job(codb, cm->cm_msg, &target);
+ }
+
+ if (can_deliver)
+ {
+ if (target == InvalidBackendId)
+ {
+ elog(FATAL, "process_ooo_msg_for: no PGPROC, database %d, backend %d, msg type: %s",
+ codb->codb_dboid, backend_id, decode_imessage_type(cm->cm_msg->type));
+ }
+
+ Assert(target != InvalidBackendId);
+
+ /* remove the message from the cache */
+ DLRemove(&cm->cm_links);
+ codb->codb_num_ooo_msgs--;
+
+ forward_job(cm->cm_msg, codb, target);
+ pfree(cm);
+
+ /* re-scan the list of ooo messages */
+ cm = (cached_msg*) DLGetHead(&codb->codb_ooo_msgs);
+ }
+ else
+ cm = (cached_msg*) DLGetSucc(&cm->cm_links);
+ }
+ }
+
+ void
+ drop_ooo_msgs_for(co_database *codb, BackendId backend_id)
+ {
+ cached_msg *cm, *next;
+
+ Assert(backend_id != InvalidBackendId);
+
+ #ifdef COORDINATOR_DEBUG
+ elog(DEBUG5, "Coordinator: dropping ooo msgs for backend %d",
+ backend_id);
+ #endif
+
+ cm = (cached_msg*) DLGetHead(&codb->codb_ooo_msgs);
+ while (cm != NULL)
+ {
+ next = (cached_msg*) DLGetSucc(&cm->cm_links);
+
+ if (cm->cm_backend_id == backend_id)
+ {
+ /* drop the message */
+ DLRemove(&cm->cm_links);
+ codb->codb_num_ooo_msgs--;
+ pfree(cm);
+ }
+
+ /* continue scanning the list of ooo messages */
+ cm = next;
+ }
+ }
+
/*
* populate_co_databases
*
============================================================
*** src/include/postmaster/coordinator.h 77a64f4b18c483a00779f8d5922066d8aa6fc156
--- src/include/postmaster/coordinator.h c1dec58e4a6adf1a6870de32b573cc969b9af0dd
*************** typedef struct co_database
*** 117,125 ****
{
Oid codb_dboid;
! /* for internal use by the coordinator */
int codb_num_cached_jobs;
Dllist codb_cached_jobs;
/* tracking of idle workers, shared */
int codb_num_idle_workers;
--- 117,140 ----
{
Oid codb_dboid;
! /*
! * The following fields are for internal use by the coordinator only. Used
! * to keep track of messages that cannot currently be delivered. We
! * differentiate two types of caches:
! *
! * Cached jobs, which can be delivered to any background worker (but
! * might get cached because all workers are busy). This is a FIFO queue,
! * so an idle worker always gets the first job from the queue.
! *
! * Out of order messages, which are part of another job. The coordinator
! * already knows the background worker these messages need to be sent to.
! * However, to maintain the required ordering of messages, it may decide
! * to cache a message until it is deliverable.
! */
int codb_num_cached_jobs;
Dllist codb_cached_jobs;
+ int codb_num_ooo_msgs;
+ Dllist codb_ooo_msgs;
/* tracking of idle workers, shared */
int codb_num_idle_workers;
*************** extern void dispatch_job(IMessage *msg,
*** 159,164 ****
--- 174,181 ----
extern bool CoordinatorCanLaunchWorker(TimestampTz current_time);
extern void dispatch_job(IMessage *msg, co_database *codb);
+ extern void dispatch_ooo_msg(IMessage *msg, co_database *codb);
+ extern void drop_ooo_msgs_for(co_database *codb, BackendId backend_id);
/* Process startup functions */
extern int StartCoordinator(void);
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers