mturk 2005/02/16 00:27:53 Modified: jk/native/common jk_lb_worker.c jk_lb_worker.h Log: Rewrite load balancer. Added byrequest and bytraffic methods. Also keep session and domain models. Added sticky_session_force, for non replicated Tomcats. Revision Changes Path 1.51 +264 -277 jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.c Index: jk_lb_worker.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.c,v retrieving revision 1.50 retrieving revision 1.51 diff -u -r1.50 -r1.51 --- jk_lb_worker.c 15 Feb 2005 08:52:53 -0000 1.50 +++ jk_lb_worker.c 16 Feb 2005 08:27:52 -0000 1.51 @@ -42,16 +42,6 @@ #define WAIT_BEFORE_RECOVER (60*1) #define WORKER_RECOVER_TIME ("recover_time") -static const char *search_types[] = { - "any", - "sticky", - "redirect", - "sticky domain", - "local", - "local domain", - NULL -}; - struct lb_endpoint { jk_endpoint_t *e; @@ -173,160 +163,223 @@ } } +static void retry_worker(worker_record_t *w, + int recover_wait_time, + jk_logger_t *l) +{ + int elapsed = (int)(time(0) - w->s->error_time); + JK_TRACE_ENTER(l); -static int is_worker_candidate(worker_record_t *wr, - int search_id, - const char *search_string, - jk_logger_t *l) -{ - switch (search_id) { - case 0: - return JK_TRUE; - case 1: - if (strcmp(search_string, wr->s->name) == 0) { - return JK_TRUE; - } - break; - case 2: - if (strcmp(search_string, wr->s->name) == 0) { - return JK_TRUE; - } - break; - case 3: - if (strcmp(search_string, wr->s->domain) == 0) { - return JK_TRUE; - } - break; - case 4: - if (wr->s->is_local_worker) { - return JK_TRUE; - } - break; - case 5: - if (wr->s->is_local_domain) { - return JK_TRUE; - } - break; + if (elapsed <= recover_wait_time) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s is in error state - will not yet recover (%d < %d)", + w->s->name, elapsed, recover_wait_time); } - return JK_FALSE; + else { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "worker %s is in error state - will recover", + w->s->name); + w->s->in_recovering = JK_TRUE; + w->s->in_error_state = JK_FALSE; + } + + JK_TRACE_EXIT(l); } -static worker_record_t *get_suitable_worker(lb_worker_t *p, - int search_id, - const char *search_string, - int start, - int stop, - int use_lb_factor, - int *domain_id, - jk_logger_t *l) +static worker_record_t *find_by_session(lb_worker_t *p, + const char *name, + jk_logger_t *l) { worker_record_t *rc = NULL; - int lb_max = 0; - int total_factor = 0; - const char *search_type = search_types[search_id]; - int i; + unsigned int i; - *domain_id = -1; - - JK_ENTER_CS(&(p->cs), i); - if (!i) { - jk_log(l, JK_LOG_ERROR, - "could not lock load balancer = %s", - p->s->name); - return NULL; + for (i = 0; i < p->num_of_workers; i++) { + if (strcmp(p->lb_workers[i].s->name, name) == 0) { + rc = &p->lb_workers[i]; + break; + } } - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "searching for %s worker (%s)", - search_type, search_string); + return rc; +} - for (i = start; i < stop; i++) { - if (search_id < 3 && p->lb_workers[i].s->is_disabled) { +static worker_record_t *find_best_bydomain(lb_worker_t *p, + const char *domain, + jk_logger_t *l)
+{ + unsigned int i; + int total_factor = 0; + worker_record_t *candidate = NULL; + + /* First try to see if we have available candidate */ + for (i = 0; i < p->num_of_workers; i++) { + /* Skip all workers that are not member of domain */ + if (strlen(p->lb_workers[i].s->domain) == 0 || + strcmp(p->lb_workers[i].s->domain, domain)) + continue; + /* Take into calculation only the workers that are + * not in error state or not disabled. + */ + if (!p->lb_workers[i].s->in_error_state && + !p->lb_workers[i].s->is_disabled) { + p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor; + total_factor += p->lb_workers[i].s->lb_factor; + if (!candidate || p->lb_workers[i].s->lb_value > candidate->s->lb_value) + candidate = &p->lb_workers[i]; + } + } + + if (candidate) { + candidate->s->lb_value -= total_factor; + } + + return candidate; +} + + +static worker_record_t *find_best_byrequests(lb_worker_t *p, + jk_logger_t *l) +{ + unsigned int i; + int total_factor = 0; + worker_record_t *candidate = NULL; + + /* First try to see if we have available candidate */ + for (i = 0; i < p->num_of_workers; i++) { + /* If the worker is in error state run + * retry on that worker. It will be marked as + * operational if the retry timeout is elapsed. + * The worker might still be unusable, but we try + * anyway. + */ + if (p->lb_workers[i].s->in_error_state && + !p->lb_workers[i].s->is_disabled) { + retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l); + } + /* Take into calculation only the workers that are + * not in error state or not disabled. + */ + if (!p->lb_workers[i].s->in_error_state && + !p->lb_workers[i].s->is_disabled) { + p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor; + total_factor += p->lb_workers[i].s->lb_factor; + if (!candidate || p->lb_workers[i].s->lb_value > candidate->s->lb_value) + candidate = &p->lb_workers[i]; + } + } + + if (candidate) { + candidate->s->lb_value -= total_factor; + } + + return candidate; +} + +static worker_record_t *find_best_bytraffic(lb_worker_t *p, + jk_logger_t *l) +{ + unsigned int i; + size_t mytraffic = 0; + size_t curmin = 0; + worker_record_t *candidate = NULL; + + /* First try to see if we have available candidate */ + for (i = 0; i < p->num_of_workers; i++) { + /* If the worker is in error state run + * retry on that worker. It will be marked as + * operational if the retry timeout is elapsed. + * The worker might still be unusable, but we try + * anyway. + */ + if (p->lb_workers[i].s->in_error_state && + !p->lb_workers[i].s->is_disabled) { + retry_worker(&p->lb_workers[i], p->s->recover_wait_time, l); + } + /* Take into calculation only the workers that are + * not in error state or not disabled. + */ + if (!p->lb_workers[i].s->in_error_state && + !p->lb_workers[i].s->is_disabled) { + mytraffic = (p->lb_workers[i].s->transferred/p->lb_workers[i].s->lb_factor) + + (p->lb_workers[i].s->readed/p->lb_workers[i].s->lb_factor); + if (!candidate || mytraffic < curmin) { + candidate = &p->lb_workers[i]; + curmin = mytraffic; + } + } + } + + return candidate; +} - continue; - } - if (is_worker_candidate(&(p->lb_workers[i]), search_id, search_string, l)) { - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "found candidate worker %s (%d) for match with %s (%s)", - p->lb_workers[i].s->name, i, search_type, search_string); - if (search_id == 1 && strlen(p->lb_workers[i].s->redirect)) { - *domain_id = i; - } - else if (search_id == 2) { - *domain_id = i; - } - if (!p->lb_workers[i].s->in_error_state || !p->lb_workers[i].s->in_recovering) { - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "found candidate worker %s (%d) with previous load %d in search with %s (%s)", - p->lb_workers[i].s->name, i, p->lb_workers[i].s->lb_value, - search_type, search_string); - - if (p->lb_workers[i].s->in_error_state) { - - time_t now = time(0); - int elapsed = now - p->lb_workers[i].s->error_time; - if (elapsed <= p->s->recover_wait_time) { - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "worker candidate %s (%d) is in error state - will not yet recover (%d < %d)", - p->lb_workers[i].s->name, i, elapsed, p->s->recover_wait_time); - continue; - } - } +static worker_record_t *find_best_worker(lb_worker_t * p, + jk_logger_t *l) +{ + worker_record_t *rc = NULL; - if (use_lb_factor) { - p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor; - total_factor += p->lb_workers[i].s->lb_factor; - if (p->lb_workers[i].s->lb_value > lb_max || !rc) { - lb_max = p->lb_workers[i].s->lb_value; - rc = &(p->lb_workers[i]); - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "new maximal worker %s (%d) with previous load %d in search with %s (%s)", - rc->s->name, i, rc->s->lb_value, search_type, search_string); - } - } else { - rc = &(p->lb_workers[i]); - break; - } - } - else if (JK_IS_DEBUG_LEVEL(l)) { - jk_log(l, JK_LOG_TRACE, - "worker candidate %s (%d) is in error state - already recovers", - p->lb_workers[i].s->name, i); - } - } - } + if (p->lbmethod == JK_LB_BYREQUESTS) + rc = find_best_byrequests(p, l); + else if (p->lbmethod == JK_LB_BYTRAFFIC) + rc = find_best_bytraffic(p, l); - if (rc) { - if (rc->s->in_error_state) { - time_t now = time(0); - rc->s->in_recovering = JK_TRUE; - rc->s->error_time = now; - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "found worker %s is in error state - will recover", - rc->s->name); - } - rc->s->lb_value -= total_factor; - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "found worker %s with new load %d in search with %s (%s)", - rc->s->name, rc->s->lb_value, search_type, search_string); - JK_LEAVE_CS(&(p->cs), i); - return rc; - } - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "found no %s (%s) worker", - search_type, search_string); - JK_LEAVE_CS(&(p->cs), i); return rc; } +static worker_record_t *find_session_route(lb_worker_t *p, + const char *name, + jk_logger_t *l) +{ + unsigned int i; + int total_factor = 0; + int uses_domain = 0; + worker_record_t *candidate = NULL; + + candidate = find_by_session(p, name, l); + if (!candidate) { + uses_domain = 1; + candidate = find_best_bydomain(p, name, l); + } + if (candidate) { + if (candidate->s->in_error_state && !candidate->s->is_disabled) { + retry_worker(candidate, p->s->recover_wait_time, l); + } + if (candidate->s->in_error_state) { + /* We have a worker that is unusable. + * It can be in error or disabled, but in case + * it has a redirection set use that redirection worker. + * This enables to safely remove the member from the + * balancer. Of course you will need a some kind of + * session replication between those two remote. + */ + if (*candidate->s->redirect) + candidate = find_by_session(p, candidate->s->redirect, l); + else if (*candidate->s->domain && !uses_domain) { + uses_domain = 1; + candidate = find_best_bydomain(p, candidate->s->domain, l); + } + if (candidate && candidate->s->in_error_state) + candidate = NULL; + } + } + if (candidate && !uses_domain) { + for (i = 0; i < p->num_of_workers; i++) { + if (!p->lb_workers[i].s->in_error_state && + !p->lb_workers[i].s->is_disabled) { + /* Skip all workers that are not member of candidate domain */ + if (*candidate->s->domain && + strcmp(p->lb_workers[i].s->domain, candidate->s->domain)) + continue; + p->lb_workers[i].s->lb_value += p->lb_workers[i].s->lb_factor; + total_factor += p->lb_workers[i].s->lb_factor; + } + } + candidate->s->lb_value -= total_factor; + } + return candidate; +} + static worker_record_t *get_most_suitable_worker(lb_worker_t * p, jk_ws_service_t *s, int attempt, @@ -335,96 +388,75 @@ worker_record_t *rc = NULL; char *sessionid = NULL; int domain_id = -1; + int r; JK_TRACE_ENTER(l); if (p->s->sticky_session) { sessionid = get_sessionid(s); } - - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "total sessionid is %s.", - sessionid ? sessionid : "empty"); - while (sessionid) { - char *next = strchr(sessionid, ';'); - char *session_route; - const char *session_domain; - if (next) { - *next++ = '\0'; - } - if (JK_IS_DEBUG_LEVEL(l)) + JK_ENTER_CS(&(p->cs), r); + if (!r) { + jk_log(l, JK_LOG_ERROR, + "getting thread lock errno=%d", + errno); + JK_TRACE_EXIT(l); + return NULL; + } + if (sessionid) { + char *session = sessionid; + if (JK_IS_DEBUG_LEVEL(l)) { jk_log(l, JK_LOG_DEBUG, - "searching worker for partial sessionid %s.", - sessionid); - session_route = strchr(sessionid, '.'); - if (session_route) { - ++session_route; + "total sessionid is %s", + sessionid ? sessionid : "empty"); + } + while (sessionid) { + char *next = strchr(sessionid, ';'); + char *session_route = NULL; + if (next) + *next++ = '\0'; + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "searching worker for partial sessionid %s", + sessionid); + session_route = strchr(sessionid, '.'); + if (session_route) { + ++session_route; - rc = get_suitable_worker(p, 1, session_route, 0, p->num_of_workers, 0, &domain_id, l); - if (rc) { - JK_TRACE_EXIT(l); - return rc; - } - if (domain_id >= 0 && domain_id < (int)p->num_of_workers) { - session_domain = p->lb_workers[domain_id].s->domain; if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, - "found redirect %s in route %s", - session_domain, session_route); + "searching worker for session route %s", + session_route); - rc = get_suitable_worker(p, 2, session_domain, 0, p->num_of_workers, - 0, &domain_id, l); + /* We have a session route. Whow! */ + rc = find_session_route(p, session_route, l); if (rc) { - JK_TRACE_EXIT(l); + JK_LEAVE_CS(&(p->cs), r); + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "found worker %s for partial sessionid %s", + rc->s->name, sessionid); + JK_TRACE_EXIT(l); return rc; } } - - if (domain_id >= 0 && domain_id < (int)p->num_of_workers) { - session_domain = p->lb_workers[domain_id].s->domain; - } - else { - session_domain = JK_LB_DEF_DOMAIN_NAME; - } - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "found domain %s in route %s", - session_domain, session_route); - - rc = get_suitable_worker(p, 3, session_domain, 0, p->num_of_workers, - 1, &domain_id, l); - if (rc) { - JK_TRACE_EXIT(l); - return rc; - } - - } - sessionid = next; - } - - - if (p->num_of_local_workers) { - rc = get_suitable_worker(p, 4, "any", 0, p->num_of_local_workers, - 1, &domain_id, l); - if (rc) { - JK_TRACE_EXIT(l); - return rc; + sessionid = next; } - - if (p->s->local_worker_only) { + if (!rc && p->s->sticky_session_force) { + JK_LEAVE_CS(&(p->cs), r); + jk_log(l, JK_LOG_INFO, + "all workers are in error state for session %s", + session); JK_TRACE_EXIT(l); return NULL; } - - rc = get_suitable_worker(p, 5, "any", p->num_of_local_workers, - p->num_of_workers, 1, &domain_id, l); - if (rc) { - JK_TRACE_EXIT(l); - return rc; - } } - rc = get_suitable_worker(p, 0, "any", p->num_of_local_workers, p->num_of_workers, - 1, &domain_id, l); + rc = find_best_worker(p, l); + JK_LEAVE_CS(&(p->cs), r); + if (rc && JK_IS_DEBUG_LEVEL(l)) { + jk_log(l, JK_LOG_DEBUG, + "found best worker (%s) using %s method", rc->s->name, + p->lbmethod == JK_LB_BYREQUESTS ? "by request" : "by traffic"); + } JK_TRACE_EXIT(l); return rc; } @@ -501,7 +533,9 @@ * Error is not recoverable - break with an error. */ jk_log(l, JK_LOG_ERROR, - "lb: unrecoverable error, request failed. Tomcat failed in the middle of request, we can't recover to another instance."); + "unrecoverable error, request failed." + " Tomcat failed in the middle of request," + " we can't recover to another instance."); JK_TRACE_EXIT(l); return JK_FALSE; } @@ -523,7 +557,7 @@ } } - jk_log(l, JK_LOG_ERROR, "lb: end of service with error"); + JK_LOG_NULL_PARAMS(l); JK_TRACE_EXIT(l); return JK_FALSE; } @@ -560,12 +594,9 @@ lb_worker_t *p = pThis->worker_private; char **worker_names; unsigned int num_of_workers; - unsigned int num_of_local_workers; - p->s->in_local_worker_mode = JK_FALSE; - p->s->local_worker_only = jk_get_local_worker_only_flag(props, p->s->name); p->s->sticky_session = jk_get_is_sticky_session(props, p->s->name); - p->num_of_local_workers = 0; + p->s->sticky_session_force = jk_get_is_sticky_session_force(props, p->s->name); if (jk_get_lb_worker_list(props, p->s->name, @@ -592,6 +623,7 @@ } } for (i = 0; i < num_of_workers; i++) { + const char *s; strncpy(p->lb_workers[i].s->name, worker_names[i], JK_SHM_STR_SIZ); p->lb_workers[i].s->lb_factor = @@ -599,14 +631,10 @@ if (p->lb_workers[i].s->lb_factor < 1) { p->lb_workers[i].s->lb_factor = 1; } - strncpy(p->lb_workers[i].s->domain, - jk_get_worker_domain(props, worker_names[i], - JK_LB_DEF_DOMAIN_NAME), - JK_SHM_STR_SIZ); - p->lb_workers[i].s->is_local_worker = - jk_get_is_local_worker(props, worker_names[i]); - if (p->lb_workers[i].s->is_local_worker) - p->s->in_local_worker_mode = JK_TRUE; + if ((s = jk_get_worker_domain(props, worker_names[i], NULL))) + strncpy(p->lb_workers[i].s->domain, s, JK_SHM_STR_SIZ); + if ((s = jk_get_worker_redirect(props, worker_names[i], NULL))) + strncpy(p->lb_workers[i].s->redirect, s, JK_SHM_STR_SIZ); /* * Allow using lb in fault-tolerant mode. * A value of 0 means the worker will be used for all requests without @@ -621,23 +649,6 @@ we, l) || !p->lb_workers[i].w) { break; } - else if (p->lb_workers[i].s->is_local_worker) { - /* - * If lb_value is 0 than move it at the beginning of the list - */ - if (i != j) { - worker_record_t tmp_worker; - tmp_worker = p->lb_workers[j]; - p->lb_workers[j] = p->lb_workers[i]; - p->lb_workers[i] = tmp_worker; - } - j++; - } - } - num_of_local_workers = j; - - if (!p->s->in_local_worker_mode) { - p->s->local_worker_only = JK_FALSE; } if (i != num_of_workers) { @@ -649,37 +660,14 @@ } else { - for (i = 0; i < num_of_local_workers; i++) { - p->lb_workers[i].s->is_local_domain=1; - } - for (i = num_of_local_workers; i < num_of_workers; i++) { - p->lb_workers[i].s->is_local_domain=0; - for (j = 0; j < num_of_local_workers; j++) { - if(!strcmp(p->lb_workers[i].s->domain, p->lb_workers[j].s->domain)) { - p->lb_workers[i].s->is_local_domain = 1; - break; - } - } - } - if (JK_IS_DEBUG_LEVEL(l)) { for (i = 0; i < num_of_workers; i++) { jk_log(l, JK_LOG_DEBUG, - "Balanced worker %i has name %s in domain %s and has local=%d and local_domain=%d", - i, p->lb_workers[i].s->name, p->lb_workers[i].s->domain, - p->lb_workers[i].s->is_local_worker, p->lb_workers[i].s->is_local_domain); + "Balanced worker %i has name %s in domain %s", + i, p->lb_workers[i].s->name, p->lb_workers[i].s->domain); } } - if (JK_IS_DEBUG_LEVEL(l)) { - jk_log(l, JK_LOG_DEBUG, - "in_local_worker_mode: %s", - (p->s->in_local_worker_mode ? "true" : "false")); - jk_log(l, JK_LOG_DEBUG, - "local_worker_only: %s", - (p->s->local_worker_only ? "true" : "false")); - } p->num_of_workers = num_of_workers; - p->num_of_local_workers = num_of_local_workers; JK_TRACE_EXIT(l); return JK_TRUE; } @@ -790,7 +778,6 @@ strncpy(private_data->s->name, name, JK_SHM_STR_SIZ); private_data->lb_workers = NULL; private_data->num_of_workers = 0; - private_data->num_of_local_workers = 0; private_data->worker.worker_private = private_data; private_data->worker.validate = validate; private_data->worker.init = init; 1.12 +8 -3 jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.h Index: jk_lb_worker.h =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_lb_worker.h,v retrieving revision 1.11 retrieving revision 1.12 diff -u -r1.11 -r1.12 --- jk_lb_worker.h 15 Feb 2005 08:52:53 -0000 1.11 +++ jk_lb_worker.h 16 Feb 2005 08:27:52 -0000 1.12 @@ -37,6 +37,11 @@ #define JK_LB_WORKER_TYPE (5) #define JK_LB_DEF_DOMAIN_NAME ("unknown") +#define JK_LB_BYREQUESTS (0) +#define JK_LB_BYTRAFFIC (1) +#define JK_LB_METHOD_REQUESTS ("request") +#define JK_LB_METHODTRAFFIC ("traffic") + struct worker_record { jk_worker_t *w; @@ -48,8 +53,8 @@ struct lb_worker { worker_record_t *lb_workers; - unsigned num_of_workers; - unsigned num_of_local_workers; + unsigned int num_of_workers; + int lbmethod; jk_pool_t p; jk_pool_atom_t buf[TINY_POOL_SIZE]; --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]