costin 02/04/25 12:21:58
Modified: jk/native2/common jk_worker_lb.c
Log:
Added some code to check the shm 'version' and update the config.
Not completed.
Revision Changes Path
1.4 +173 -97 jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c
Index: jk_worker_lb.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- jk_worker_lb.c 15 Apr 2002 23:47:57 -0000 1.3
+++ jk_worker_lb.c 25 Apr 2002 19:21:58 -0000 1.4
@@ -60,7 +60,7 @@
* several workers. *
* Author: Gal Shachor <[EMAIL PROTECTED]> *
* Based on: *
- * Version: $Revision: 1.3 $ *
+ * Version: $Revision: 1.4 $ *
***************************************************************************/
#include "jk_pool.h"
@@ -70,6 +70,7 @@
#include "jk_config.h"
#include "jk_env.h"
#include "jk_requtil.h"
+#include "jk_mt.h"
#define DEFAULT_LB_FACTOR (1.0)
@@ -83,15 +84,15 @@
* This + ADDITIONAL_WAIT_LOAD will be set on all the workers
* that recover after an error.
*/
-static double jk2_get_max_lb(jk_worker_t *p)
+static double jk2_get_max_lb(jk_worker_t *lb)
{
int i;
double rc = 0.0;
- for(i = 0 ; i < p->num_of_workers ; i++) {
- if(!p->lb_workers[i]->in_error_state) {
- if(p->lb_workers[i]->lb_value > rc) {
- rc = p->lb_workers[i]->lb_value;
+ for(i = 0 ; i < lb->num_of_workers ; i++) {
+ if(!lb->lb_workers[i]->in_error_state) {
+ if(lb->lb_workers[i]->lb_value > rc) {
+ rc = lb->lb_workers[i]->lb_value;
}
}
}
@@ -106,7 +107,7 @@
It'll also adjust the load balancing factors.
*/
-static jk_worker_t *jk2_get_most_suitable_worker(jk_env_t *env, jk_worker_t *p,
+static jk_worker_t *jk2_get_most_suitable_worker(jk_env_t *env, jk_worker_t *lb,
jk_ws_service_t *s, int attempt)
{
jk_worker_t *rc = NULL;
@@ -117,37 +118,37 @@
session_route = jk2_requtil_getSessionRoute(env, s);
if(session_route) {
- for(i = 0 ; i < p->num_of_workers ; i++) {
- if(0 == strcmp(session_route, p->lb_workers[i]->mbean->name)) {
- if(attempt > 0 && p->lb_workers[i]->in_error_state) {
+ for(i = 0 ; i < lb->num_of_workers ; i++) {
+ if(0 == strcmp(session_route, lb->lb_workers[i]->mbean->name)) {
+ if(attempt > 0 && lb->lb_workers[i]->in_error_state) {
break;
} else {
- return p->lb_workers[i];
+ return lb->lb_workers[i];
}
}
}
}
/** Get one worker that is ready */
- for(i = 0 ; i < p->num_of_workers ; i++) {
- if(p->lb_workers[i]->in_error_state) {
- if(!p->lb_workers[i]->in_recovering) {
+ for(i = 0 ; i < lb->num_of_workers ; i++) {
+ if(lb->lb_workers[i]->in_error_state) {
+ if(!lb->lb_workers[i]->in_recovering) {
time_t now = time(0);
- if((now - p->lb_workers[i]->error_time) > WAIT_BEFORE_RECOVER) {
+ if((now - lb->lb_workers[i]->error_time) > WAIT_BEFORE_RECOVER) {
- p->lb_workers[i]->in_recovering = JK_TRUE;
- p->lb_workers[i]->error_time = now;
- p->lb_workers[i]->retry_count++;
- rc = p->lb_workers[i];
+ lb->lb_workers[i]->in_recovering = JK_TRUE;
+ lb->lb_workers[i]->error_time = now;
+ lb->lb_workers[i]->retry_count++;
+ rc = lb->lb_workers[i];
break;
}
}
} else {
- if(p->lb_workers[i]->lb_value < lb_min || !rc) {
- lb_min = p->lb_workers[i]->lb_value;
- rc = p->lb_workers[i];
+ if(lb->lb_workers[i]->lb_value < lb_min || !rc) {
+ lb_min = lb->lb_workers[i]->lb_value;
+ rc = lb->lb_workers[i];
}
}
}
@@ -155,29 +156,29 @@
if ( !rc ) {
/* no workers found (rc is null), now try as hard as possible to get a
worker anyway, pick one with largest error time.. */
- for(i = 0 ; i < p->num_of_workers ; i++) {
- if(p->lb_workers[i]->in_error_state) {
- if(!p->lb_workers[i]->in_recovering) {
+ for(i = 0 ; i < lb->num_of_workers ; i++) {
+ if(lb->lb_workers[i]->in_error_state) {
+ if(!lb->lb_workers[i]->in_recovering) {
/* if the retry count is zero, that means the worker only
failed once, this is to e that the failed worker will
not continue to be retried over and over again.
*/
- if ( p->lb_workers[i]->retry_count == 0 ) {
+ if ( lb->lb_workers[i]->retry_count == 0 ) {
if ( rc ) {
/* pick the oldest failed worker */
- if ( p->lb_workers[i]->error_time < rc->error_time ) {
- rc = p->lb_workers[i];
+ if ( lb->lb_workers[i]->error_time < rc->error_time ) {
+ rc = lb->lb_workers[i];
}
} else {
- rc = p->lb_workers[i];
+ rc = lb->lb_workers[i];
}
}
}
} else {
/* This is a good worker - it may have come to life */
- if(p->lb_workers[i]->lb_value < lb_min || rc != NULL) {
- lb_min = p->lb_workers[i]->lb_value;
- rc = p->lb_workers[i];
+ if(lb->lb_workers[i]->lb_value < lb_min || rc != NULL) {
+ lb_min = lb->lb_workers[i]->lb_value;
+ rc = lb->lb_workers[i];
break;
}
}
@@ -198,43 +199,104 @@
return rc;
}
+/** Check the scoreboard, make updates in the 'live'
+ config
+*/
+static int JK_METHOD jk2_lb_updateWorkers(jk_env_t *env,
+ jk_worker_t *lb,
+ jk_shm_t *shm)
+{
+ int rc;
+ int i;
+
+ if( shm== NULL || shm->head==NULL) return JK_ERR;
+
+ JK_ENTER_CS(&lb->cs, rc);
+ if(rc !=JK_TRUE) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "lb.updateWorkers() Can't enter critical section\n");
+ return JK_ERR;
+ }
+ if( lb->ver == shm->head->lbVer ) {
+ /* Was updated by some other thread */
+ return JK_OK;
+ }
+
+ /* Walk the shm and update any changed worker */
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "lb.updateWorkers() Updating workers %d %d\n",
+ lb->ver, shm->head->lbVer);
+ for( i=1; i<shm->head->lastSlot; i++ ) {
+ jk_shm_slot_t *slot= shm->getSlot( env, shm, i );
+ if( strncmp( slot->name, "TOMCAT:", 7 ) == 0 ) {
+ /* */
+ char *instanceId=slot->name+7;
+ char *data=slot->data;
+
+
+ }
+ }
+
+ lb->ver = shm->head->lbVer;
+
+ JK_LEAVE_CS(&lb->cs, rc);
+ return JK_OK;
+}
+
/** Get the best worker and forward to it.
Since we don't directly connect to anything, there's no
need for an endpoint.
*/
static int JK_METHOD jk2_lb_service(jk_env_t *env,
- jk_worker_t *w,
+ jk_worker_t *lb,
jk_ws_service_t *s)
{
int attempt=0;
int i;
-
+ jk_workerEnv_t *wEnv=lb->workerEnv;
+
if( s==NULL ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb.service() NullPointerException\n");
- return JK_FALSE;
+ return JK_ERR;
}
/* you can not recover on another load balancer */
s->realWorker=NULL;
- /* reset all the retry counts to 0 */
- for(i = 0 ; i < w->num_of_workers ; i++) {
- w->lb_workers[i]->retry_count = 0;
- }
-
+ /* reset all the retry counts to 0. XXX may be a problem if we have many
workers ? */
+ for(i = 0 ; i < lb->num_of_workers ; i++) {
+ lb->lb_workers[i]->retry_count = 0;
+ }
+ if( wEnv->shm != NULL && wEnv->shm->head != NULL ) {
+ /* We have shm, let's check for updates. This is just checking one
+ memory location, no lock involved. It is possible we'll read it
+ while somebody else is changing - but that's ok, we just check for
+ equality.
+ */
+ if( lb->ver != wEnv->shm->head->lbVer ) {
+ jk2_lb_updateWorkers(env, lb, wEnv->shm);
+ }
+ }
while(1) {
jk_worker_t *rec;
int rc;
- if( w->num_of_workers==1 ) {
+ /* Prevent loops */
+ if( attempt > lb->num_of_workers + 1 ) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "lb.service() max attempts exceeded %d\n", attempt);
+ return JK_ERR;
+ }
+
+ if( lb->num_of_workers==1 ) {
/* A single worker - no need to search */
- rec=w->lb_workers[0];
+ rec=lb->lb_workers[0];
} else {
- rec=jk2_get_most_suitable_worker(env, w, s, attempt++);
+ rec=jk2_get_most_suitable_worker(env, lb, s, attempt++);
}
s->is_recoverable_error = JK_FALSE;
@@ -243,7 +305,7 @@
/* NULL record, no more workers left ... */
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.service() No suitable workers left \n");
- return JK_FALSE;
+ return JK_ERR;
}
env->l->jkLog(env, env->l, JK_LOG_INFO,
@@ -253,7 +315,7 @@
rc = rec->service(env, rec, s);
- if(rc==JK_TRUE) {
+ if(rc==JK_OK) {
if(rec->in_recovering) {
rec->lb_value = jk2_get_max_lb(rec) + ADDITINAL_WAIT_LOAD;
}
@@ -263,7 +325,7 @@
rec->error_time = 0;
/* the endpoint that succeeded is saved for done() */
s->realWorker = rec;
- return JK_TRUE;
+ return JK_OK;
}
/*
@@ -289,51 +351,51 @@
env->l->jkLog(env, env->l, JK_LOG_INFO,
"lb_worker.service() try other host\n");
}
- return JK_FALSE;
+ return JK_ERR;
}
/** Init internal structures.
Called any time the config changes
*/
-static int JK_METHOD jk2_lb_initLbArray(jk_env_t *env, jk_worker_t *_this)
+static int JK_METHOD jk2_lb_refresh(jk_env_t *env, jk_worker_t *lb)
{
int currentWorker=0;
int i;
- _this->num_of_workers=_this->lbWorkerMap->size( env, _this->lbWorkerMap);
+ lb->num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
- if( _this->lb_workers_size < _this->num_of_workers ) {
- if( _this->lb_workers_size==0 ) {
- _this->lb_workers_size=10;
+ if( lb->lb_workers_size < lb->num_of_workers ) {
+ if( lb->lb_workers_size==0 ) {
+ lb->lb_workers_size=10;
} else {
- _this->lb_workers_size = 2 * _this->lb_workers_size;
+ lb->lb_workers_size = 2 * lb->lb_workers_size;
}
- _this->lb_workers =
- _this->pool->alloc(env, _this->pool,
- _this->lb_workers_size * sizeof(jk_worker_t *));
- if(!_this->lb_workers) {
+ lb->lb_workers =
+ lb->pool->alloc(env, lb->pool,
+ lb->lb_workers_size * sizeof(jk_worker_t *));
+ if(!lb->lb_workers) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.validate(): OutOfMemoryException\n");
- return JK_FALSE;
+ return JK_ERR;
}
}
- for(i = 0 ; i < _this->num_of_workers ; i++) {
- char *name = _this->lbWorkerMap->nameAt( env, _this->lbWorkerMap, i);
+ for(i = 0 ; i < lb->num_of_workers ; i++) {
+ char *name = lb->lbWorkerMap->nameAt( env, lb->lbWorkerMap, i);
jk_worker_t *w= env->getByName( env, name );
if( w== NULL ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.init(): no worker found %s\n", name);
- _this->num_of_workers--;
+ lb->num_of_workers--;
continue;
}
- _this->lb_workers[currentWorker]=w;
+ lb->lb_workers[currentWorker]=w;
- if( _this->lb_workers[currentWorker]->lb_factor == 0 )
- _this->lb_workers[currentWorker]->lb_factor = DEFAULT_LB_FACTOR;
+ if( w->lb_factor == 0 )
+ w->lb_factor = DEFAULT_LB_FACTOR;
- _this->lb_workers[currentWorker]->lb_factor =
- 1/ _this->lb_workers[currentWorker]->lb_factor;
+ w->lb_factor =
+ 1/ w->lb_factor;
/*
* Allow using lb in fault-tolerant mode.
@@ -341,69 +403,83 @@
* a worker used only when principal is down or session route
* point to it. Provided by Paul Frieden <[EMAIL PROTECTED]>
*/
- _this->lb_workers[currentWorker]->lb_value =
- _this->lb_workers[currentWorker]->lb_factor;
- _this->lb_workers[currentWorker]->in_error_state = JK_FALSE;
- _this->lb_workers[currentWorker]->in_recovering = JK_FALSE;
- _this->lb_workers[currentWorker]->retry_count = 0;
+ w->lb_value =
+ w->lb_factor;
+ w->in_error_state = JK_FALSE;
+ w->in_recovering = JK_FALSE;
+ w->retry_count = 0;
currentWorker++;
}
- return JK_TRUE;
+ return JK_OK;
+}
+
+static int JK_METHOD jk2_lb_addWorker(jk_env_t *env, jk_worker_t *lb,
+ char *name)
+{
+ name = lb->pool->pstrdup(env, lb->pool, name);
+ lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, "");
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "lb_worker.setAttribute(): Adding %s %s\n", lb->mbean->name,
name);
+
}
static int JK_METHOD jk2_lb_setProperty(jk_env_t *env, jk_bean_t *mbean,
char *name, void *valueP)
{
- jk_worker_t *_this=mbean->object;
+ jk_worker_t *lb=mbean->object;
char *value=valueP;
int err;
char **worker_names;
unsigned num_of_workers;
unsigned i = 0;
char *tmp;
-
- /* XXX Add one-by-one */
if( strcmp( name, "balanced_workers") == 0 ) {
- worker_names=jk2_config_split( env, _this->pool,
+ worker_names=jk2_config_split( env, lb->pool,
value, NULL, &num_of_workers );
if( worker_names==NULL || num_of_workers==0 ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.validate(): no defined workers\n");
- return JK_FALSE;
+ return JK_ERR;
}
for(i = 0 ; i < num_of_workers ; i++) {
- char *name = _this->pool->pstrdup(env, _this->pool, worker_names[i]);
- _this->lbWorkerMap->add(env, _this->lbWorkerMap, name, "");
- env->l->jkLog(env, env->l, JK_LOG_INFO,
- "lb_worker.setAttribute(): Adding %s %s\n",
_this->mbean->name, name);
+ jk2_lb_addWorker( env, lb, worker_names[i]);
}
- jk2_lb_initLbArray( env, _this );
- return JK_TRUE;
+ jk2_lb_refresh( env, lb );
+ return JK_OK;
+ } else if( strcmp( name, "worker") == 0 ) {
+ jk2_lb_addWorker( env, lb, value);
+ jk2_lb_refresh( env, lb );
+ return JK_OK;
}
- return JK_FALSE;
+ return JK_ERR;
}
-static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_worker_t *_this)
+static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_worker_t *lb)
{
int err;
char **worker_names;
int i = 0;
char *tmp;
- int num_of_workers=_this->lbWorkerMap->size( env, _this->lbWorkerMap);
+ int num_of_workers=lb->lbWorkerMap->size( env, lb->lbWorkerMap);
- err=jk2_lb_initLbArray(env, _this );
- if( err != JK_TRUE )
+ err=jk2_lb_refresh(env, lb );
+ if( err != JK_OK )
return err;
-
+
+ lb->ver=0;
+ if( lb->workerEnv->shm != NULL && lb->workerEnv->shm->head != NULL)
+ jk2_lb_updateWorkers(env, lb, lb->workerEnv->shm);
+
env->l->jkLog(env, env->l, JK_LOG_INFO,
"lb.init() %s %d workers\n",
- _this->mbean->name, _this->num_of_workers );
-
- return JK_TRUE;
+ lb->mbean->name, lb->num_of_workers );
+
+ return JK_OK;
}
static int JK_METHOD jk2_lb_destroy(jk_env_t *env, jk_worker_t *w)
@@ -413,7 +489,7 @@
if(w==NULL ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.destroy() NullPointerException\n");
- return JK_FALSE;
+ return JK_ERR;
}
/* Workers are destroyed by the workerEnv. It is possible
@@ -427,7 +503,7 @@
w->pool->close(env, w->pool);
- return JK_TRUE;
+ return JK_OK;
}
@@ -439,7 +515,7 @@
if(NULL == name ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.factory() NullPointerException\n");
- return JK_FALSE;
+ return JK_ERR;
}
w = (jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t));
@@ -447,7 +523,7 @@
if(w==NULL) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.factory() OutOfMemoryException\n");
- return JK_FALSE;
+ return JK_ERR;
}
w->pool=pool;
@@ -468,6 +544,6 @@
w->workerEnv=env->getByName( env, "workerEnv" );
w->workerEnv->addWorker( env, w->workerEnv, w );
- return JK_TRUE;
+ return JK_OK;
}
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>