costin 02/05/14 10:23:31 Modified: jk/native2/common jk_worker_lb.c Log: - Started to add the 'hwBalanceErr' support. - removed the config stuff - it's now in jk_config - we still check the shm head and call the config->update method. This is needed since /jkstatus will be called in one process, and we need all processes to update. In addition this is another mechanism to triger config reloading. - various cosmetic changes. Revision Changes Path 1.14 +43 -221 jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c Index: jk_worker_lb.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v retrieving revision 1.13 retrieving revision 1.14 diff -u -r1.13 -r1.14 --- jk_worker_lb.c 9 May 2002 23:47:31 -0000 1.13 +++ jk_worker_lb.c 14 May 2002 17:23:31 -0000 1.14 @@ -148,6 +148,13 @@ */ break; } + + if( lb->hwBalanceErr > 0 ) { + /* don't go to higher levels - we'll return an error + */ + currentLevel=0; + break; + } } /** Reenable workers in error state if the timeout has passed. @@ -212,6 +219,11 @@ rc = w; } } + + if( lb->hwBalanceErr > 0 ) { + /* Don't try higher levels, only level=0 */ + break; + } } if( attempt >= error_workers ) { @@ -257,202 +269,6 @@ return rc; } -/* Remove all channels used by this tomcat instance */ -static int jk2_worker_lb_disableInstance( jk_env_t *env, - jk_worker_t *lb, - char *instanceId ) -{ - int i; - int level; - - for( level=0; level<JK_LB_LEVELS; level++ ) { - for(i = 0 ; i < lb->workerCnt[level] ; i++) { - jk_worker_t *w=lb->workerTables[level][i]; - - if( w->route != NULL && - strcmp( w->route, instanceId ) == 0 ) { - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb.updateWorkers() Gracefull shutdown %s %s\n", - w->channel->mbean->name, instanceId ); - w->in_error_state= JK_TRUE; - w->mbean->disabled = JK_TRUE; - } - } - } - return JK_OK; -} - -static int jk2_worker_lb_registerChannel( jk_env_t *env, - jk_worker_t *lb, - char *instanceId, - jk_msg_t *msg, jk_map_t *groups) -{ - char *chName; - jk_map_t *chProp; - int i; - int found=JK_FALSE; - jk_config_t *config; - char *tmpBuf; - jk_bean_t *chBean; - int rc=JK_OK; - int level; - - jk2_map_default_create(env, &chProp, env->tmpPool); - - chName=msg->getString( env, msg ); - if( chName==NULL ) - rc=JK_ERR; - - if( rc==JK_OK ) - rc=msg->getMap( env, msg, chProp ); - - if( rc!=JK_OK ) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "lb.updateWorkers() can't read channel data %s %s\n", - chName, instanceId); - return JK_ERR; - } - - for( level=0; level<JK_LB_LEVELS; level++ ) { - for(i = 0 ; i < lb->workerCnt[level] ; i++) { - jk_worker_t *w=lb->workerTables[level][i]; - - if( w->route && - strcmp( w->route, instanceId ) == 0 && - strcmp( w->channel->mbean->name, chName ) == 0 ) { - /* XXX Create a new channel with the update properties, - Then replace it. - - At this moment we just re-enable the worker. - */ - if( w->mbean->disabled || w->in_error_state ) { - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb.updateWorkers() re-enabling %s %s\n", - w->channel->mbean->name, instanceId ); - w->mbean->disabled=JK_FALSE; - w->in_error_state=JK_FALSE; - } - - found=JK_TRUE; - break; - } - } - } - - if( found==JK_TRUE ) { - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb.updateWorkers() already found %s %s\n", - chName, instanceId); - return JK_OK; - } - - config=lb->workerEnv->config; - - tmpBuf=(char *)env->tmpPool->calloc( env, env->tmpPool, strlen( chName ) + 10 ); - strcpy( tmpBuf, chName ); - strcat( tmpBuf, ".name" ); - - config->setPropertyString( env, config, tmpBuf, chName ); - chBean=env->getBean( env, chName ); - if( chBean==NULL ) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "lb.updateWorkers() can't create %s\n", - chName ); - return JK_ERR; - } - - for( i=0; i< chProp->size(env, chProp ); i++ ) { - char *name=chProp->nameAt( env, chProp, i ); - char *value=chProp->valueAt( env, chProp, i ); - - config->setProperty( env, config, chBean, name, value ); - } - - config->save( env, config, NULL ); - - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "lb.updateWorkers() create %s %s\n", - chName, instanceId ); - - /* XXX Add it to the right groups */ - - return JK_OK; -} - -/** Check the scoreboard, make updates in the 'live' - config -*/ -static int JK_METHOD jk2_lb_updateWorkers(jk_env_t *env, - jk_worker_t *lb, - jk_shm_t *shm) -{ - int rc; - int i; - int j; - jk_map_t *groups; - - if( shm== NULL || shm->head==NULL) return JK_ERR; - - JK_ENTER_CS(&lb->cs, rc); - if(rc !=JK_TRUE) { - env->l->jkLog(env, env->l, JK_LOG_ERROR, - "lb.updateWorkers() Can't enter critical section\n"); - return JK_ERR; - } - if( lb->ver == shm->head->lbVer ) { - /* Was updated by some other thread */ - return JK_OK; - } - - /* Walk the shm and update any changed worker */ - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb.updateWorkers() Updating workers %d %d\n", - lb->ver, shm->head->lbVer); - for( i=1; i<shm->head->lastSlot; i++ ) { - jk_shm_slot_t *slot= shm->getSlot( env, shm, i ); - if( strncmp( slot->name, "TOMCAT:", 7 ) == 0 ) { - /* */ - char *instanceId=slot->name+7; - char *data=slot->data; - jk_msg_t *msg; - int chCnt; - - jk2_map_default_create(env, &groups, env->tmpPool); - - msg=jk2_msg_ajp_create2( env, env->tmpPool, slot->data, slot->size); - msg->checkHeader( env, msg , NULL); - - msg->getByte(env, msg ); - msg->getString(env, msg ); - - msg->getMap( env, msg, groups ); - - /* The actual data */ - chCnt=msg->getInt(env, msg ); - - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb.updateWorkers() Reading %s %d channels %d groups %p %p %p\n", - slot->name, chCnt, groups->size( env, groups ), slot->data, slot, shm->head); - - if( chCnt == 0 ) { - jk2_worker_lb_disableInstance( env, lb, instanceId ); - } else { - /* Create all channels we don't have */ - /* XXX Not sure what's the best solution, we can do it in many ways */ - for( j=0; j< chCnt; j++ ) { - jk2_worker_lb_registerChannel( env, lb, instanceId, msg, groups ); - } - } - - } - } - - lb->ver = shm->head->lbVer; - - JK_LEAVE_CS(&lb->cs, rc); - return JK_OK; -} - /** Get the best worker and forward to it. Since we don't directly connect to anything, there's no @@ -484,8 +300,11 @@ while somebody else is changing - but that's ok, we just check for equality. */ - if( lb->ver != wEnv->shm->head->lbVer ) { - jk2_lb_updateWorkers(env, lb, wEnv->shm); + /* We should check this periodically + */ + if( wEnv->config->ver != wEnv->shm->head->lbVer ) { + wEnv->config->update(env, wEnv->config, NULL ); + wEnv->config->ver = wEnv->shm->head->lbVer; } } @@ -502,6 +321,8 @@ /* NULL record, no more workers left ... */ env->l->jkLog(env, env->l, JK_LOG_ERROR, "lb_worker.service() all workers in error or disabled state\n"); + /* XXX set hwBalanceErr status */ + /* XXX nice message or redirect */ return JK_ERR; } @@ -515,9 +336,6 @@ s->jvm_route = rec->route; - /* It may be better to do this on the endpoint */ - rec->reqCnt++; - s->realWorker = rec; rc = rec->service(env, rec, s); @@ -539,7 +357,6 @@ */ rec->in_error_state = JK_TRUE; rec->error_time = time(0); - rec->errCnt++; if(!s->is_recoverable_error) { /* Error is not recoverable - break with an error. */ @@ -600,19 +417,9 @@ return JK_OK; } -static int JK_METHOD jk2_lb_addWorker(jk_env_t *env, jk_worker_t *lb, - char *name) -{ - name = lb->mbean->pool->pstrdup(env, lb->mbean->pool, name); - lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, ""); - - env->l->jkLog(env, env->l, JK_LOG_INFO, - "lb_worker.setAttribute(): Adding to %s: %s\n", lb->mbean->localName, name); - -} static char *jk2_worker_lb_multiValueInfo[]={"worker", NULL }; -static char *jk2_worker_lb_setAttributeInfo[]={"debug", NULL }; +static char *jk2_worker_lb_setAttributeInfo[]={"hwBalanceErr", NULL }; static int JK_METHOD jk2_lb_setAttribute(jk_env_t *env, jk_bean_t *mbean, char *name, void *valueP) @@ -626,16 +433,29 @@ char *tmp; if( strcmp( name, "worker") == 0 ) { - jk2_lb_addWorker( env, lb, value); + if( lb->lbWorkerMap->get( env, lb->lbWorkerMap, name) != NULL ) { + /* Already added */ + return JK_OK; + } + name = lb->mbean->pool->pstrdup(env, lb->mbean->pool, name); + lb->lbWorkerMap->add(env, lb->lbWorkerMap, name, ""); + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "lb_worker.setAttribute(): Adding to %s: %s\n", lb->mbean->localName, name); + jk2_lb_refresh( env, lb ); return JK_OK; + } else if( strcmp( name, "hwBalanceErr") == 0 ) { + lb->hwBalanceErr=atoi( value ); } + return JK_ERR; } -static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_worker_t *lb) +static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_bean_t *bean) { + jk_worker_t *lb=bean->object; int err; char **worker_names; int i = 0; @@ -646,9 +466,8 @@ if( err != JK_OK ) return err; - lb->ver=0; - if( lb->workerEnv->shm != NULL && lb->workerEnv->shm->head != NULL) - jk2_lb_updateWorkers(env, lb, lb->workerEnv->shm); + /* if( lb->workerEnv->shm != NULL && lb->workerEnv->shm->head != NULL) */ + /* jk2_lb_updateWorkers(env, lb, lb->workerEnv->shm); */ env->l->jkLog(env, env->l, JK_LOG_INFO, "lb.init() %s %d workers\n", @@ -657,8 +476,9 @@ return JK_OK; } -static int JK_METHOD jk2_lb_destroy(jk_env_t *env, jk_worker_t *w) +static int JK_METHOD jk2_lb_destroy(jk_env_t *env, jk_bean_t *bean) { + jk_worker_t *w=bean->object; /* Workers are destroyed by the workerEnv. It is possible that a worker is part of more than a lb. Nothing to clean up so far. @@ -688,8 +508,6 @@ } w->worker_private = NULL; - w->init = jk2_lb_init; - w->destroy = jk2_lb_destroy; w->service = jk2_lb_service; for( i=0; i<JK_LB_LEVELS; i++ ) { @@ -698,11 +516,15 @@ jk2_map_default_create(env,&w->lbWorkerMap, pool); + result->init = jk2_lb_init; + result->destroy = jk2_lb_destroy; result->setAttribute=jk2_lb_setAttribute; result->multiValueInfo=jk2_worker_lb_multiValueInfo; result->setAttributeInfo=jk2_worker_lb_setAttributeInfo; result->object=w; w->mbean=result; + + w->hwBalanceErr=0; w->workerEnv=env->getByName( env, "workerEnv" ); w->workerEnv->addWorker( env, w->workerEnv, w );
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>