costin 01/12/05 12:46:47 Modified: jk/native2/common jk_lb_worker.c Log: A bit of cleanup after reorganising the data structures. There is still one configuration issue, I want to eliminate the duplicated creation of workers ( lb can create workers as well ). Revision Changes Path 1.5 +220 -299 jakarta-tomcat-connectors/jk/native2/common/jk_lb_worker.c Index: jk_lb_worker.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_lb_worker.c,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- jk_lb_worker.c 2001/12/04 23:56:07 1.4 +++ jk_lb_worker.c 2001/12/05 20:46:47 1.5 @@ -60,7 +60,7 @@ * several workers. * * Author: Gal Shachor <[EMAIL PROTECTED]> * * Based on: * - * Version: $Revision: 1.4 $ * + * Version: $Revision: 1.5 $ * ***************************************************************************/ #include "jk_pool.h" @@ -72,157 +72,83 @@ #define DEFAULT_LB_FACTOR (1.0) -int JK_METHOD lb_worker_factory(jk_worker_t **w, - const char *name, - jk_logger_t *l); - - -/* - * The load balancing code in this - */ - - -/* - * Time to wait before retry... - */ +/* Time to wait before retry... */ #define WAIT_BEFORE_RECOVER (60*1) -#define ADDITINAL_WAIT_LOAD (20) - -struct worker_record { - char *name; - double lb_factor; - double lb_value; - int in_error_state; - int in_recovering; - time_t error_time; - jk_worker_t *w; -}; -typedef struct worker_record worker_record_t; -struct lb_worker { - worker_record_t *lb_workers; - unsigned num_of_workers; - - jk_pool_t p; - jk_pool_atom_t buf[TINY_POOL_SIZE]; +#define ADDITINAL_WAIT_LOAD (20) - char *name; - jk_worker_t worker; -}; -typedef struct lb_worker lb_worker_t; +int JK_METHOD jk_worker_lb_factory(jk_env_t *env, + void **result, + char *type, + char *name); -struct lb_endpoint { - jk_endpoint_t *e; - lb_worker_t *worker; - - jk_endpoint_t endpoint; -}; -typedef struct lb_endpoint lb_endpoint_t; -static void close_workers(lb_worker_t *p, - int num_of_workers, - jk_logger_t *l) -{ - int i = 0; - for(i = 0 ; i < num_of_workers ; i++) { - p->lb_workers[i].w->destroy(&(p->lb_workers[i].w), - l); - } -} -static double get_max_lb(lb_worker_t *p) +/* Find the biggest lb_value for all my workers. + * This + ADDITIONAL_WAIT_LOAD will be set on all the workers + * that recover after an error. + */ +static double get_max_lb(jk_worker_t *p) { - unsigned i; + int i; double rc = 0.0; for(i = 0 ; i < p->num_of_workers ; i++) { - if(!p->lb_workers[i].in_error_state) { - if(p->lb_workers[i].lb_value > rc) { - rc = p->lb_workers[i].lb_value; + if(!p->lb_workers[i]->in_error_state) { + if(p->lb_workers[i]->lb_value > rc) { + rc = p->lb_workers[i]->lb_value; } } } - return rc; - } - -double jk_get_lb_factor(jk_map_t *m, - const char *wname) -{ - char buf[1024]; - - if(!m || !wname) { - return DEFAULT_LB_FACTOR; - } - sprintf(buf, "%s.%s.%s", "worker", wname, "lbfactor"); +/** Find the best worker. In process, check if timeout expired + for workers that failed in the past and give them another chance. - return map_get_double(m, buf, DEFAULT_LB_FACTOR); -} + This will check the JSESSIONID and forward to the right worker + if in a session. -int jk_get_lb_worker_list(jk_map_t *m, - const char *lb_wname, - char ***list, - unsigned *num_of_wokers) + It'll also adjust the load balancing factors. +*/ +static jk_worker_t *get_most_suitable_worker(jk_worker_t *p, + jk_ws_service_t *s) { - char buf[1024]; - - if(m && list && num_of_wokers && lb_wname) { - char **ar = NULL; - - sprintf(buf, "%s.%s.%s", "worker", lb_wname, "balanced_workers"); - ar = map_get_string_list(m, buf, num_of_wokers, NULL); - if(ar) { - *list = ar; - return JK_TRUE; - } - *list = NULL; - *num_of_wokers = 0; - } - - return JK_FALSE; -} - - -static worker_record_t *get_most_suitable_worker(lb_worker_t *p, - jk_ws_service_t *s) -{ - worker_record_t *rc = NULL; + jk_worker_t *rc = NULL; double lb_min = 0.0; - unsigned i; + int i; char *session_route = jk_requtil_getSessionRoute(s); if(session_route) { for(i = 0 ; i < p->num_of_workers ; i++) { - if(0 == strcmp(session_route, p->lb_workers[i].name)) { - if(p->lb_workers[i].in_error_state) { + if(0 == strcmp(session_route, p->lb_workers[i]->name)) { + if(p->lb_workers[i]->in_error_state) { break; } else { - return &(p->lb_workers[i]); + return p->lb_workers[i]; } } } } for(i = 0 ; i < p->num_of_workers ; i++) { - if(p->lb_workers[i].in_error_state) { - if(!p->lb_workers[i].in_recovering) { + if(p->lb_workers[i]->in_error_state) { + if(!p->lb_workers[i]->in_recovering) { time_t now = time(0); - if((now - p->lb_workers[i].error_time) > WAIT_BEFORE_RECOVER) { + if((now - p->lb_workers[i]->error_time) > WAIT_BEFORE_RECOVER) { - p->lb_workers[i].in_recovering = JK_TRUE; - p->lb_workers[i].error_time = now; - rc = &(p->lb_workers[i]); + p->lb_workers[i]->in_recovering = JK_TRUE; + p->lb_workers[i]->error_time = now; + rc = p->lb_workers[i]; break; } } } else { - if(p->lb_workers[i].lb_value < lb_min || !rc) { - lb_min = p->lb_workers[i].lb_value; - rc = &(p->lb_workers[i]); + if(p->lb_workers[i]->lb_value < lb_min || !rc) { + lb_min = p->lb_workers[i]->lb_value; + rc = p->lb_workers[i]; } } } @@ -233,102 +159,107 @@ return rc; } - + + static int JK_METHOD service(jk_endpoint_t *e, jk_ws_service_t *s, jk_logger_t *l, int *is_recoverable_error) { + /* The 'real' endpoint */ + jk_endpoint_t *end = NULL; + l->jkLog(l, JK_LOG_DEBUG, "Into jk_endpoint_t::service\n"); - if(e && e->endpoint_private && s && is_recoverable_error) { - lb_endpoint_t *p = e->endpoint_private; - jk_endpoint_t *end = NULL; - - /* you can not recover on another load balancer */ - *is_recoverable_error = JK_FALSE; - - - while(1) { - worker_record_t *rec = get_most_suitable_worker(p->worker, s); - int rc; + if(e==NULL || s==NULL || is_recoverable_error==NULL) { + l->jkLog(l, JK_LOG_ERROR, + "In jk_endpoint_t::service: NULL Parameters\n"); + return JK_FALSE; + } - if(rec) { - int is_recoverable = JK_FALSE; + /* you can not recover on another load balancer */ + *is_recoverable_error = JK_FALSE; + e->realEndpoint=NULL; + + while(1) { + jk_worker_t *rec = get_most_suitable_worker(e->worker, s); + int rc; + int is_recoverable = JK_FALSE; + + if(rec == NULL) { + /* NULL record, no more workers left ... */ + l->jkLog(l, JK_LOG_ERROR, + "lb_worker.service() No suitable workers left \n"); + return JK_FALSE; + } - s->jvm_route = s->pool->pstrdup(s->pool, rec->name); + s->jvm_route = s->pool->pstrdup(s->pool, rec->name); - rc = rec->w->get_endpoint(rec->w, &end, l); - if(rc && end) { - int src = end->service(end, s, l, &is_recoverable); - end->done(&end, l); - if(src) { - if(rec->in_recovering) { - rec->lb_value = get_max_lb(p->worker) + ADDITINAL_WAIT_LOAD; - } - rec->in_error_state = JK_FALSE; - rec->in_recovering = JK_FALSE; - rec->error_time = 0; - return JK_TRUE; - } - } - - /* - * Service failed !!! - * - * Time for fault tolerance (if possible)... - */ + rc = rec->get_endpoint(rec, &end, l); + if( rc!= JK_TRUE || + end==NULL ) { + + } else { - rec->in_error_state = JK_TRUE; - rec->in_recovering = JK_FALSE; - rec->error_time = time(0); + rc = end->service(end, s, l, &is_recoverable); + end->done(&end, l); - if(!is_recoverable) { - /* - * Error is not recoverable - break with an error. - */ - l->jkLog(l, JK_LOG_ERROR, - "In jk_endpoint_t::service, none recoverable error...\n"); - break; + if(rc==JK_TRUE) { + if(rec->in_recovering) { + rec->lb_value = get_max_lb(e->worker) + + ADDITINAL_WAIT_LOAD; } - - /* - * Error is recoverable by submitting the request to - * another worker... Lets try to do that. - */ - l->jkLog(l, JK_LOG_DEBUG, - "In jk_endpoint_t::service, recoverable error... will try to recover on other host\n"); - } else { - /* NULL record, no more workers left ... */ - l->jkLog(l, JK_LOG_ERROR, - "In jk_endpoint_t::service, No more workers left, can not submit the request\n"); - break; + rec->in_error_state = JK_FALSE; + rec->in_recovering = JK_FALSE; + rec->error_time = 0; + /* the endpoint that succeeded is saved for done() */ + e->realEndpoint = end; + return JK_TRUE; } } + + /* + * Service failed !!! + * + * Time for fault tolerance (if possible)... + */ + rec->in_error_state = JK_TRUE; + rec->in_recovering = JK_FALSE; + rec->error_time = time(0); + + if(!is_recoverable) { + /* Error is not recoverable - break with an error. */ + l->jkLog(l, JK_LOG_ERROR, + "In jk_endpoint_t::service, none recoverable error...\n"); + break; + } + + /* + * Error is recoverable by submitting the request to + * another worker... Lets try to do that. + */ + l->jkLog(l, JK_LOG_DEBUG, + "lb_worker.service() recoverable error. Try other host\n"); } - - l->jkLog(l, JK_LOG_ERROR, - "In jk_endpoint_t::service: NULL Parameters\n"); - return JK_FALSE; } -static int JK_METHOD done(jk_endpoint_t **e, +static int JK_METHOD done(jk_endpoint_t **eP, jk_logger_t *l) { l->jkLog(l, JK_LOG_DEBUG, "Into jk_endpoint_t::done\n"); - if(e && *e && (*e)->endpoint_private) { - lb_endpoint_t *p = (*e)->endpoint_private; + if(eP && *eP ) { + jk_endpoint_t *e = *eP; - if(p->e) { - p->e->done(&p->e, l); - } + if(e->realEndpoint!=NULL) { + e->realEndpoint->done(&e->realEndpoint, l); + } - free(p); - *e = NULL; + free(e); + *eP = NULL; return JK_TRUE; } @@ -338,75 +269,74 @@ return JK_FALSE; } -static int JK_METHOD validate(jk_worker_t *pThis, +static int JK_METHOD validate(jk_worker_t *_this, jk_map_t *props, jk_workerEnv_t *we, jk_logger_t *l) { int err; - l->jkLog(l, JK_LOG_DEBUG, - "Into jk_worker_t::validate\n"); - - if(pThis && pThis->worker_private) { - lb_worker_t *p = pThis->worker_private; - char **worker_names; - unsigned num_of_workers; - - if(jk_get_lb_worker_list(props, - p->name, - &worker_names, - &num_of_workers) && num_of_workers) { - unsigned i = 0; - - p->lb_workers = p->p.alloc(&p->p, - num_of_workers * sizeof(worker_record_t)); + char **worker_names; + unsigned num_of_workers; + unsigned i = 0; + + l->jkLog(l, JK_LOG_DEBUG, "lb_workers.validate()\n"); - if(!p->lb_workers) { - return JK_FALSE; - } + if(! _this ) { + l->jkLog(l, JK_LOG_ERROR,"lb_worker.validate(): NPE\n"); + return JK_FALSE; + } - for(i = 0 ; i < num_of_workers ; i++) { - p->lb_workers[i].name = p->p.pstrdup(&p->p, worker_names[i]); - p->lb_workers[i].lb_factor = jk_get_lb_factor(props, - worker_names[i]); - p->lb_workers[i].lb_factor = 1/p->lb_workers[i].lb_factor; - /* - * Allow using lb in fault-tolerant mode. - * Just set lbfactor in worker.properties to 0 to have - * a worker used only when principal is down or session route - * point to it. Provided by Paul Frieden <[EMAIL PROTECTED]> - */ - p->lb_workers[i].lb_value = p->lb_workers[i].lb_factor; - p->lb_workers[i].in_error_state = JK_FALSE; - p->lb_workers[i].in_recovering = JK_FALSE; - - p->lb_workers[i].w= - pThis->workerEnv->createWorker( pThis->workerEnv, - p->lb_workers[i].name, - props ); + worker_names=map_getListProp( props, "worker", _this->name, + "balanced_workers", &num_of_workers ); + if( worker_names==NULL || num_of_workers==0 ) { + l->jkLog(l, JK_LOG_ERROR,"lb_worker.validate(): no defined workers\n"); + return JK_FALSE; + } - if( p->lb_workers[i].w == NULL ) { - break; - } - } + _this->lb_workers = + _this->pool->alloc(_this->pool, + num_of_workers * sizeof(jk_worker_t *)); + + if(!_this->lb_workers) { + l->jkLog(l, JK_LOG_ERROR,"lb_worker.validate(): OutOfMemoryException\n"); + return JK_FALSE; + } - if(i != num_of_workers) { - close_workers(p, i, l); - l->jkLog(l, JK_LOG_ERROR, - "In jk_worker_t::validate: Failed to create worker %s\n", - p->lb_workers[i].name); + _this->num_of_workers=0; + for(i = 0 ; i < num_of_workers ; i++) { + char *name = _this->pool->pstrdup(_this->pool, worker_names[i]); + + _this->lb_workers[i]= + _this->workerEnv->createWorker( _this->workerEnv, + name, props ); + + if( _this->lb_workers[i] == NULL ) { + l->jkLog(l, JK_LOG_ERROR, + "lb_worker.validate(): Error creating %s %s\n", + _this->name, name); + /* Ignore, we may have other workers */ + continue; + } + + _this->lb_workers[i]->lb_factor = + map_getDoubleProp( props, "worker", name, "lbfactor", + DEFAULT_LB_FACTOR); + + _this->lb_workers[i]->lb_factor = 1/ _this->lb_workers[i]->lb_factor; - } else { - p->num_of_workers = num_of_workers; - return JK_TRUE; - } - } + /* + * Allow using lb in fault-tolerant mode. + * Just set lbfactor in worker.properties to 0 to have + * a worker used only when principal is down or session route + * point to it. Provided by Paul Frieden <[EMAIL PROTECTED]> + */ + _this->lb_workers[i]->lb_value = _this->lb_workers[i]->lb_factor; + _this->lb_workers[i]->in_error_state = JK_FALSE; + _this->lb_workers[i]->in_recovering = JK_FALSE; + _this->num_of_workers++; } - - l->jkLog(l, JK_LOG_ERROR, - "In jk_worker_t::validate: NULL Parameters\n"); - return JK_FALSE; + return JK_TRUE; } static int JK_METHOD init(jk_worker_t *pThis, @@ -422,99 +352,90 @@ jk_endpoint_t **pend, jk_logger_t *l) { - l->jkLog(l, JK_LOG_DEBUG, - "Into jk_worker_t::get_endpoint\n"); - - if(pThis && pThis->worker_private && pend) { - lb_endpoint_t *p = (lb_endpoint_t *)malloc(sizeof(lb_endpoint_t)); - if(p) { - p->e = NULL; - p->worker = pThis->worker_private; - p->endpoint.endpoint_private = p; - p->endpoint.service = service; - p->endpoint.done = done; - p->endpoint.channelData = NULL; - *pend = &p->endpoint; + jk_endpoint_t *e; + + l->jkLog(l, JK_LOG_DEBUG, "lb_worker.getEndpoint()\n"); - return JK_TRUE; - } + if(pThis==NULL || pend==NULL ) { l->jkLog(l, JK_LOG_ERROR, - "In jk_worker_t::get_endpoint, malloc failed\n"); - } else { + "lb_worker.getEndpoint() NPE\n"); + } + + e = (jk_endpoint_t *)malloc(sizeof(jk_endpoint_t)); + if(e==NULL) { l->jkLog(l, JK_LOG_ERROR, - "In jk_worker_t::get_endpoint, NULL parameters\n"); + "lb_worker.getEndpoint() OutOfMemoryException\n"); + return JK_FALSE; } + + e->worker = pThis; + e->service = service; + e->done = done; + e->channelData = NULL; + *pend = e; - return JK_FALSE; + return JK_TRUE; } + static int JK_METHOD destroy(jk_worker_t **pThis, jk_logger_t *l) { - l->jkLog(l, JK_LOG_DEBUG, - "Into jk_worker_t::destroy\n"); - - if(pThis && *pThis && (*pThis)->worker_private) { - lb_worker_t *private_data = (*pThis)->worker_private; + int i = 0; - close_workers(private_data, - private_data->num_of_workers, - l); + l->jkLog(l, JK_LOG_DEBUG, "lb_worker.destroy()\n"); - private_data->p.close(&private_data->p); - free(private_data); + if(pThis && *pThis ) { + l->jkLog(l, JK_LOG_ERROR, "lb_worker.destroy() NullPointerException\n"); + return JK_FALSE; + } - return JK_TRUE; + for(i = 0 ; i < (*pThis)->num_of_workers ; i++) { + (*pThis)->lb_workers[i]->destroy( & (*pThis)->lb_workers[i], + l); } - l->jkLog(l, JK_LOG_ERROR, - "In jk_worker_t::destroy, NULL parameters\n"); - return JK_FALSE; + (*pThis)->pool->close((*pThis)->pool); + free(pThis); + + return JK_TRUE; } + int JK_METHOD jk_worker_lb_factory(jk_env_t *env, void **result, char *type, char *name) { jk_logger_t *l=env->logger; - jk_worker_t **w=result; - lb_worker_t *private_data; + jk_worker_t *_this; - l->jkLog(l, JK_LOG_DEBUG, "Into lb_worker_factory\n"); - - if(NULL != name && NULL != w) { - l->jkLog(l, JK_LOG_ERROR,"In lb_worker_factory, NULL parameters\n"); - return JK_FALSE; - } + l->jkLog(l, JK_LOG_DEBUG, "lb_worker.factory()\n"); - private_data = (lb_worker_t *)malloc(sizeof(lb_worker_t)); - if(!private_data) { - l->jkLog(l, JK_LOG_ERROR,"In lb_worker_factory, malloc failed\n"); + if(NULL != name ) { + l->jkLog(l, JK_LOG_ERROR,"lb_worker.factory() NullPointerException\n"); return JK_FALSE; } + + _this = (jk_worker_t *)malloc(sizeof(jk_worker_t)); - jk_open_pool(&private_data->p, - private_data->buf, - sizeof(jk_pool_atom_t) * TINY_POOL_SIZE); - - private_data->name = private_data->p.pstrdup(&private_data->p, name); - if(! private_data->name) { - l->jkLog(l, JK_LOG_ERROR,"In lb_worker_factory, malloc failed\n"); - private_data->p.close(&private_data->p); - free(private_data); + if(_this==NULL) { + l->jkLog(l, JK_LOG_ERROR,"lb_worker.factory() OutOfMemoryException\n"); return JK_FALSE; } - private_data->lb_workers = NULL; - private_data->num_of_workers = 0; - private_data->worker.worker_private = private_data; - private_data->worker.validate = validate; - private_data->worker.init = init; - private_data->worker.get_endpoint = get_endpoint; - private_data->worker.destroy = destroy; + _this->lb_workers = NULL; + _this->num_of_workers = 0; + _this->worker_private = NULL; + _this->validate = validate; + _this->init = init; + _this->get_endpoint = get_endpoint; + _this->destroy = destroy; + + *result=_this; + + /* name, pool will be set by workerEnv ( our factory ) */ - *w = &private_data->worker; return JK_TRUE; }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>