costin 02/05/09 14:06:48
Modified: jk/native2/common jk_worker_lb.c
Log:
That's the big one.
Please review !
It changes the handling of lb_value to int. I also cleaned up the logic so
it's easier ( I hope ) to understand what's happening. "Levels" replace
the 'local worker', I thing I got the logic straight for those.
I started to add a 'introspection' data, to validate and better report
the config.
We use one table per level. At the moment the maximum number of workers
is hardcoded ( to 255 ), we could make it dynamic but that would make things
pretty complex when we add workers dynamically ( it won't work without
a CS or atomic operations )
Revision Changes Path
1.12 +195 -181 jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c
Index: jk_worker_lb.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- jk_worker_lb.c 9 May 2002 00:01:43 -0000 1.11
+++ jk_worker_lb.c 9 May 2002 21:06:48 -0000 1.12
@@ -77,6 +77,7 @@
/* XXX make it longer - debugging only */
#define WAIT_BEFORE_RECOVER (5)
+#define MAX_ATTEMPTS 3
/** Find the best worker. In process, check if timeout expired
for workers that failed in the past and give them another chance.
@@ -90,115 +91,157 @@
jk_ws_service_t *s, int attempt)
{
jk_worker_t *rc = NULL;
- double lb_min = 0.0;
+ int lb_min = 0;
+ int lb_max = 0;
int i;
+ int j;
+ int level;
+ int currentLevel=JK_LB_LEVELS - 1;
char *session_route;
time_t now = 0;
session_route = jk2_requtil_getSessionRoute(env, s);
if(session_route) {
- for(i = 0 ; i < lb->num_of_workers ; i++) {
- jk_worker_t *w=lb->lb_workers[i];
-
- if(w->route != NULL &&
- 0 == strcmp(session_route, w->route)) {
- if(attempt > 0 && w->in_error_state) {
- break;
- } else {
- return w;
- }
+ for( level=0; level<JK_LB_LEVELS; level++ ) {
+ for(i = 0 ; i < lb->workerCnt[level]; i++) {
+ jk_worker_t *w=lb->workerTables[level][i];
+
+ if(w->route != NULL &&
+ 0 == strcmp(session_route, w->route)) {
+ if(attempt > 0 && w->in_error_state) {
+ /* We already tried to revive this worker. */
+ break;
+ } else {
+ return w;
+ }
+ }
}
}
}
- /** Get one worker that is ready */
- for(i = 0 ; i < lb->num_of_workers ; i++) {
- jk_worker_t *w=lb->lb_workers[i];
-
- if(w->in_error_state) {
+ /** Get one worker that is ready
+ */
+ for( level=0; level<JK_LB_LEVELS; level++ ) {
+ for(i = 0 ; i < lb->workerCnt[level] ; i++) {
+ jk_worker_t *w=lb->workerTables[level][i];
+
if( w->mbean->disabled ) continue;
-
- /* Check if it's ready for recovery */
- /* if(!lb->lb_workers[i]->in_recovering) { */
- if( now==0 )
- now = time(NULL);
-
- if((now - w->error_time) > WAIT_BEFORE_RECOVER) {
- env->l->jkLog(env, env->l, JK_LOG_ERROR,
- "lb.getWorker() timeout expired, reenable again %s\n",
- w->mbean->name);
-
- w->in_recovering = JK_TRUE;
- w->in_error_state = JK_FALSE;
+ if( w->in_error_state ) continue;
- /* No need to do that - if it'll be used again, then error time
- will be set automatically on error */
- /* w->error_time = now; */
- /* Not sure we need that either */
- /* w->retry_count++; */
-
- /* The worker's error state is reset, but that doesn't
- mean it'll be used - normal priority selection happens
- Don't give bigger priority to recovered workers
- */
- /* rc = lb->lb_workers[i];
- break;
- */
+ if( rc==NULL ) {
+ rc=w;
+ currentLevel=level;
+ lb_min=w->lb_value;
+ continue;
}
- }
- if( ! lb->lb_workers[i]->in_error_state ) {
- if(lb->lb_workers[i]->lb_value == 0 ) {
- /* That's the 'default' worker, it'll take all requests.
- * All other workers are not used unless this is in error state.
- *
- * The 'break' will disable checking for recovery on other
- * workers - but that doesn't matter as long as the default is
alive.
- */
- rc=lb->lb_workers[i];
- break;
- }
- if(lb->lb_workers[i]->lb_value < lb_min ||
- ( rc==NULL ) ) {
- lb_min = lb->lb_workers[i]->lb_value;
- rc = lb->lb_workers[i];
+
+ if( w->lb_value < lb_min ) {
+ lb_min = w->lb_value;
+ rc = w;
+ currentLevel=level;
}
}
+
+ if( rc!=NULL ) {
+ /* We found a valid worker on the current level, don't worry about the
+ higher levels.
+ */
+ break;
+ }
}
+
+ /** Reenable workers in error state if the timeout has passed.
+ * Don't bother with 'higher' levels, since we'll never try them.
+ */
+ for( level=0; level<=currentLevel; level++ ) {
+
+ for(i = 0 ; i < lb->workerCnt[level] ; i++) {
+ jk_worker_t *w=lb->workerTables[level][i];
+
+ if( w->mbean->disabled ) continue;
+ if(w->in_error_state) {
+ /* Check if it's ready for recovery */
+ if( now==0 ) now = time(NULL);
+
+ if((now - w->error_time) > WAIT_BEFORE_RECOVER) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "lb.getWorker() reenable %s\n", w->mbean->name);
+ w->in_error_state = JK_FALSE;
+
+ /* Find max lb */
+ if( lb_max ==0 ) {
+ for( j=0; j<lb->workerCnt[level]; j++ ) {
+ if( lb->workerTables[level][j]->lb_value > lb_max ) {
+ lb_max=lb->workerTables[level][j]->lb_value;
+ }
+ }
+ }
+ w->lb_value = lb_max;
+ }
+ }
+ }
+ }
+
+ /* If no active worker is found, we'll try all workers in error_state,
+ */
if ( rc==NULL ) {
/* no workers found (rc is null), now try as hard as possible to get a
worker anyway, pick one with largest error time.. */
- env->l->jkLog(env, env->l, JK_LOG_ERROR,
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
"lb.getWorker() All workers in error state, use the one with
oldest error\n");
- for(i = 0 ; i < lb->num_of_workers ; i++) {
- jk_worker_t *w=lb->lb_workers[i];
-
- if( w->mbean->disabled == JK_TRUE ) continue;
-
- if ( rc != NULL ) {
+ for( level=0; level<JK_LB_LEVELS; level++ ) {
+ for(i = 0 ; i < lb->workerCnt[level] ; i++) {
+ jk_worker_t *w=lb->workerTables[level][i];
+
+ if( w->mbean->disabled == JK_TRUE ) continue;
+
+ if( rc==NULL ) {
+ rc= w;
+ currentLevel=level;
+ continue;
+ }
/* pick the oldest failed worker */
if ( w->error_time < rc->error_time ) {
+ currentLevel=level;
rc = w;
}
- } else {
- rc = w;
}
}
-
- if ( rc && rc->in_error_state ) {
- rc->in_recovering = JK_TRUE;
- rc->in_error_state = JK_FALSE;
- }
}
- if(rc) {
+ if(rc!=NULL) {
+ /* It it's the default, it'll remain the default - we don't
+ increase the factor
+ */
+ rc->in_error_state = JK_FALSE;
if( rc->lb_value != 0 ) {
- /* It it's the default, it'll remain the default - we don't
- increase the factor
- */
- rc->lb_value += rc->lb_factor;
+ int newValue=rc->lb_value + rc->lb_factor;
+
+ if( newValue > 255 ) {
+ rc->lb_value=rc->lb_factor;
+ /* Roll over. This has 2 goals:
+ - avoid the lb factor becoming too big, and give a chance to run
to
+ workers that were in error state ( I think it's cleaner than
looking for "max" )
+ - the actual lb_value will be 1 byte. Even on the craziest
platform, that
+ will be an atomic write. We do a lot of operations on lb_value
in a MT environment,
+ and the chance of reading something inconsistent is
considerable. Since APR
+ will not support atomic - and adding a CS would cost too much,
this is actually
+ a good solution.
+
+ Note that lb_value is not used for anything critical - just to
balance the load,
+ the worst that may happen is having a worker stay idle for 255
requests.
+ */
+ for(i = 0 ; i < lb->workerCnt[currentLevel] ; i++) {
+ jk_worker_t *w=lb->workerTables[currentLevel][i];
+ w->lb_value=w->lb_factor;
+ }
+ } else {
+ rc->lb_value=newValue;
+ }
}
}
@@ -210,17 +253,21 @@
jk_worker_t *lb,
char *instanceId )
{
- int j;
+ int i;
+ int level;
- for( j=0; j< lb->num_of_workers; j++ ) {
- jk_worker_t *w=lb->lb_workers[j];
- if( w->route != NULL &&
- strcmp( w->route, instanceId ) == 0 ) {
- env->l->jkLog(env, env->l, JK_LOG_INFO,
- "lb.updateWorkers() Gracefull shutdown %s %s\n",
- w->channel->mbean->name, instanceId );
- w->in_error_state= JK_TRUE;
- w->mbean->disabled = JK_TRUE;
+ for( level=0; level<JK_LB_LEVELS; level++ ) {
+ for(i = 0 ; i < lb->workerCnt[level] ; i++) {
+ jk_worker_t *w=lb->workerTables[level][i];
+
+ if( w->route != NULL &&
+ strcmp( w->route, instanceId ) == 0 ) {
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "lb.updateWorkers() Gracefull shutdown %s %s\n",
+ w->channel->mbean->name, instanceId );
+ w->in_error_state= JK_TRUE;
+ w->mbean->disabled = JK_TRUE;
+ }
}
}
return JK_OK;
@@ -239,6 +286,7 @@
char *tmpBuf;
jk_bean_t *chBean;
int rc=JK_OK;
+ int level;
jk2_map_default_create(env, &chProp, env->tmpPool);
@@ -256,26 +304,29 @@
return JK_ERR;
}
- for( i=0; i< lb->num_of_workers; i++ ) {
- jk_worker_t *w=lb->lb_workers[i];
- if( w->route &&
- strcmp( w->route, instanceId ) == 0 &&
- strcmp( w->channel->mbean->name, chName ) == 0 ) {
- /* XXX Create a new channel with the update properties,
- Then replace it.
-
- At this moment we just re-enable the worker.
- */
- if( w->mbean->disabled || w->in_error_state ) {
- env->l->jkLog(env, env->l, JK_LOG_INFO,
- "lb.updateWorkers() re-enabling %s %s\n",
- w->channel->mbean->name, instanceId );
- w->mbean->disabled=JK_FALSE;
- w->in_error_state=JK_FALSE;
+ for( level=0; level<JK_LB_LEVELS; level++ ) {
+ for(i = 0 ; i < lb->workerCnt[level] ; i++) {
+ jk_worker_t *w=lb->workerTables[level][i];
+
+ if( w->route &&
+ strcmp( w->route, instanceId ) == 0 &&
+ strcmp( w->channel->mbean->name, chName ) == 0 ) {
+ /* XXX Create a new channel with the update properties,
+ Then replace it.
+
+ At this moment we just re-enable the worker.
+ */
+ if( w->mbean->disabled || w->in_error_state ) {
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "lb.updateWorkers() re-enabling %s %s\n",
+ w->channel->mbean->name, instanceId );
+ w->mbean->disabled=JK_FALSE;
+ w->in_error_state=JK_FALSE;
+ }
+
+ found=JK_TRUE;
+ break;
}
-
- found=JK_TRUE;
- break;
}
}
@@ -434,18 +485,14 @@
int rc;
/* Prevent loops */
- if( attempt > lb->num_of_workers + 1 ) {
+ if( attempt > MAX_ATTEMPTS ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb.service() max attempts exceeded %d\n", attempt);
return JK_ERR;
}
- if( lb->num_of_workers==1 ) {
- /* A single worker - no need to search */
- rec=lb->lb_workers[0];
- } else {
- rec=jk2_get_most_suitable_worker(env, lb, s, attempt++);
- }
+ rec=jk2_get_most_suitable_worker(env, lb, s, attempt);
+ attempt++;
s->is_recoverable_error = JK_FALSE;
@@ -459,21 +506,22 @@
if( lb->mbean->debug > 0 )
env->l->jkLog(env, env->l, JK_LOG_INFO,
"lb.service() try %s\n", rec->mbean->name );
+
if( rec->route==NULL ) {
rec->route=rec->mbean->localName;
}
+
s->jvm_route = rec->route;
/* It may be better to do this on the endpoint */
rec->reqCnt++;
s->realWorker = rec;
+
rc = rec->service(env, rec, s);
if(rc==JK_OK) {
rec->in_error_state = JK_FALSE;
- rec->in_recovering = JK_FALSE;
- rec->retry_count = 0;
rec->error_time = 0;
/* the endpoint that succeeded is saved for done() */
return JK_OK;
@@ -488,7 +536,6 @@
* Time for fault tolerance (if possible)...
*/
rec->in_error_state = JK_TRUE;
- rec->in_recovering = JK_FALSE;
rec->error_time = time(0);
rec->errCnt++;
@@ -505,7 +552,7 @@
*/
if( lb->mbean->debug > 0 ) {
env->l->jkLog(env, env->l, JK_LOG_INFO,
- "lb_worker.service() try other host\n");
+ "lb_worker.service() try other hosts\n");
}
}
return JK_ERR;
@@ -518,65 +565,36 @@
{
int currentWorker=0;
int i;
+ int level;
int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
- if( lb->lb_workers_size < num_of_workers ) {
- if( lb->lb_workers_size==0 ) {
- lb->lb_workers_size=10;
- } else {
- lb->lb_workers_size = 2 * lb->lb_workers_size;
- }
- lb->lb_workers =
- lb->mbean->pool->alloc(env, lb->mbean->pool,
- lb->lb_workers_size * sizeof(jk_worker_t *));
- if(!lb->lb_workers) {
- env->l->jkLog(env, env->l, JK_LOG_ERROR,
- "lb_worker.validate(): OutOfMemoryException\n");
- return JK_ERR;
- }
- }
-
for(i = 0 ; i < num_of_workers ; i++) {
char *name = lb->lbWorkerMap->nameAt( env, lb->lbWorkerMap, i);
jk_worker_t *w= env->getByName( env, name );
+ int level=0;
+ int pos=0;
+
if( w== NULL ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.init(): no worker found %s\n", name);
- num_of_workers--;
continue;
}
+
+ if( w->mbean->disabled ) continue;
- if( w->lb_factor != 0 ) {
- w->lb_factor = 1/ w->lb_factor;
- lb->lb_workers[currentWorker]=w;
- } else {
- /* If == 0, then this is the default worker. Switch it with the first
- worker to avoid looking too much for it.
- */
- jk_worker_t *first=lb->lb_workers[0];
- lb->lb_workers[0]=w;
- /* Only do the exchange if the worker is not the first */
- if( currentWorker > 0 ) {
- lb->lb_workers[currentWorker]=first;
- }
- }
+ level=w->level;
+ /* It's like disabled */
+ if( level >= JK_LB_LEVELS ) continue;
+
+ pos=lb->workerCnt[level]++;
+
+ lb->workerTables[level][pos]=w;
- /*
- * Allow using lb in fault-tolerant mode.
- * Just set lbfactor in worker.properties to 0 to have
- * a worker used only when principal is down or session route
- * point to it. Provided by Paul Frieden <[EMAIL PROTECTED]>
- */
w->lb_value = w->lb_factor;
w->in_error_state = JK_FALSE;
- w->in_recovering = JK_FALSE;
- w->retry_count = 0;
-
- currentWorker++;
}
- lb->num_of_workers=num_of_workers;
return JK_OK;
}
@@ -587,12 +605,15 @@
lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, "");
env->l->jkLog(env, env->l, JK_LOG_INFO,
- "lb_worker.setAttribute(): Adding %s %s\n", lb->mbean->name,
name);
+ "lb_worker.setAttribute(): Adding to %s: %s\n",
lb->mbean->localName, name);
}
-static int JK_METHOD jk2_lb_setProperty(jk_env_t *env, jk_bean_t *mbean,
- char *name, void *valueP)
+static char *jk2_worker_lb_multiValueInfo[]={"worker", NULL };
+static char *jk2_worker_lb_setAttributeInfo[]={"debug", NULL };
+
+static int JK_METHOD jk2_lb_setAttribute(jk_env_t *env, jk_bean_t *mbean,
+ char *name, void *valueP)
{
jk_worker_t *lb=mbean->object;
char *value=valueP;
@@ -602,20 +623,7 @@
unsigned i = 0;
char *tmp;
- if( strcmp( name, "balanced_workers") == 0 ) {
- worker_names=jk2_config_split( env, lb->mbean->pool,
- value, NULL, &num_of_workers );
- if( worker_names==NULL || num_of_workers==0 ) {
- env->l->jkLog(env, env->l, JK_LOG_ERROR,
- "lb_worker.validate(): no defined workers\n");
- return JK_ERR;
- }
- for(i = 0 ; i < num_of_workers ; i++) {
- jk2_lb_addWorker( env, lb, worker_names[i]);
- }
- jk2_lb_refresh( env, lb );
- return JK_OK;
- } else if( strcmp( name, "worker") == 0 ) {
+ if( strcmp( name, "worker") == 0 ) {
jk2_lb_addWorker( env, lb, value);
jk2_lb_refresh( env, lb );
return JK_OK;
@@ -630,6 +638,7 @@
char **worker_names;
int i = 0;
char *tmp;
+ int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
err=jk2_lb_refresh(env, lb );
if( err != JK_OK )
@@ -641,7 +650,7 @@
env->l->jkLog(env, env->l, JK_LOG_INFO,
"lb.init() %s %d workers\n",
- lb->mbean->name, lb->num_of_workers );
+ lb->mbean->name, num_of_workers );
return JK_OK;
}
@@ -660,6 +669,7 @@
jk_bean_t *result, char *type, char *name)
{
jk_worker_t *w;
+ int i;
if(NULL == name ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
@@ -675,16 +685,20 @@
return JK_ERR;
}
- w->lb_workers = NULL;
- w->num_of_workers = 0;
w->worker_private = NULL;
w->init = jk2_lb_init;
w->destroy = jk2_lb_destroy;
w->service = jk2_lb_service;
-
+
+ for( i=0; i<JK_LB_LEVELS; i++ ) {
+ w->workerCnt[i]=0;
+ }
+
jk2_map_default_create(env,&w->lbWorkerMap, pool);
- result->setAttribute=jk2_lb_setProperty;
+ result->setAttribute=jk2_lb_setAttribute;
+ result->multiValueInfo=jk2_worker_lb_multiValueInfo;
+ result->setAttributeInfo=jk2_worker_lb_setAttributeInfo;
result->object=w;
w->mbean=result;
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>