mturk       2002/06/29 00:48:20

  Modified:    jk/native2/common jk_worker_lb.c
  Log:
  Serialize the calls to the workers until the first call finishes or times out.
  Added new config param 'timeout' setting the initialization worker
  timeout.
  
  Revision  Changes    Path
  1.21      +88 -7     jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c
  
  Index: jk_worker_lb.c
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- jk_worker_lb.c    22 Jun 2002 16:57:54 -0000      1.20
  +++ jk_worker_lb.c    29 Jun 2002 07:48:20 -0000      1.21
  @@ -70,6 +70,10 @@
   #include "jk_env.h"
   #include "jk_requtil.h"
   
  +#ifdef HAS_APR
  +#include "apr_thread_proc.h"
  +#endif
  +
   #define DEFAULT_LB_FACTOR           (1.0)
   
   /* Time to wait before retry... XXX make it configurable*/
  @@ -79,6 +83,12 @@
   
   #define NO_WORKER_MSG "The servlet container is temporary unavailable or being 
upgraded\n";
   
  +typedef struct {
  +    struct jk_mutex *cs;
  +    int    init_timeout;
  +    int    initializing;
  +} jk_worker_lb_private_t;
  +
   /** Find the best worker. In process, check if timeout expired
       for workers that failed in the past and give them another chance.
   
  @@ -281,6 +291,8 @@
   {
       int attempt=0;
       jk_workerEnv_t *wEnv=lb->workerEnv;
  +    jk_worker_lb_private_t *lb_priv = lb->worker_private;
  +    jk_worker_t *rec = NULL;
       
       if( s==NULL ) {
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
  @@ -309,10 +321,29 @@
       }
       
       while(1) {
  -        jk_worker_t *rec;
           int rc;
   
  -        rec=jk2_get_most_suitable_worker(env, lb, s, attempt);
  +        /* Since there is potential worker state change
  +         * make the find best worker call thread safe 
  +         */
  +        if (!lb_priv->initializing) {
  +            if (lb_priv->cs != NULL)
  +                lb_priv->cs->lock(env, lb_priv->cs);
  +            rec=jk2_get_most_suitable_worker(env, lb, s, attempt);
  +
  +            if (lb_priv->cs != NULL)
  +                lb_priv->cs->unLock(env, lb_priv->cs);
  +        }
  +        else if (!rec){
  +            /* If we are initializing the service wait until
  +             * the initialization finishes or times out 
  +             */
  +            if (lb->cs != NULL) {
  +                lb->cs->lock(env, lb->cs);
  +                lb->cs->unLock(env, lb->cs);
  +            }
  +            continue;
  +        }
           attempt++;
           
           s->is_recoverable_error = JK_FALSE;
  @@ -354,16 +385,45 @@
           s->jvm_route = rec->route;
   
           s->realWorker = rec;
  -
  +        if (!rec->initialized && !lb_priv->initializing && lb->cs != NULL) {
  +            /* If the worker has not been called yet serialize the call */
  +            lb->cs->lock(env, lb->cs);
  +            lb_priv->initializing = JK_TRUE;
  +            rec->error_time = time(NULL);
  +        }
           rc = rec->service(env, rec, s);
   
           if(rc==JK_OK) {                        
               rec->in_error_state = JK_FALSE;
               rec->error_time     = 0;
  -            /* the endpoint that succeeded is saved for done() */
  +            /* Set the initialized flag to TRUE if that was the first call */
  +            if (!rec->initialized && lb->cs != NULL) {
  +                rec->initialized = JK_TRUE;
  +                lb_priv->initializing = JK_FALSE;
  +                lb->cs->unLock(env, lb->cs);
  +            }
               return JK_OK;
           }
           
  +        if (!rec->initialized && lb->cs != NULL) {
  +            time_t now = time(NULL);
  +            /* In the case of initialization timeout disable the worker */
  +            if ((int)(now - rec->error_time) > lb_priv->init_timeout) {
  +                rec->mbean->disabled = JK_TRUE;
  +                lb_priv->initializing = JK_FALSE;
  +                s->is_recoverable_error = JK_FALSE;
  +                env->l->jkLog(env, env->l, JK_LOG_ERROR, 
  +                          "lb_worker.service() worker init timeout for %s\n",
  +                          rec->channelName);
  +                lb->cs->unLock(env, lb->cs);                
  +            }
  +            else {
  +#ifdef HAS_APR
  +                apr_thread_yield();
  +#endif
  +                continue;
  +            }
  +        }
           env->l->jkLog(env, env->l, JK_LOG_ERROR, 
                         "lb.service() worker failed %s\n", rec->mbean->name );
           
  @@ -373,7 +433,7 @@
            * Time for fault tolerance (if possible)...
            */
           rec->in_error_state = JK_TRUE;
  -        rec->error_time     = time(0);
  +        rec->error_time     = time(NULL);
           
           if(!s->is_recoverable_error) {
               /* Error is not recoverable - break with an error. */
  @@ -464,6 +524,9 @@
           lb->noWorkerCode=atoi( value );
       } else if( strcmp( name, "hwBalanceErr") == 0 ) {
           lb->hwBalanceErr=atoi( value );
  +    } else if( strcmp( name, "initTimeout") == 0 ) {
  +        jk_worker_lb_private_t *priv = lb->worker_private;
  +        priv->init_timeout=atoi( value );
       }
   
       return JK_ERR;
  @@ -506,7 +569,9 @@
   {
       jk_worker_t *w;
       int i;
  -    
  +    jk_bean_t *jkb;
  +    jk_worker_lb_private_t *worker_private;
  +
       if(NULL == name ) {
           env->l->jkLog(env, env->l, JK_LOG_ERROR,
                         "lb_worker.factory() NullPointerException\n");
  @@ -520,8 +585,24 @@
                         "lb_worker.factory() OutOfMemoryException\n");
           return JK_ERR;
       }
  +    
  +    jkb=env->createBean2(env, pool,"threadMutex", NULL);
  +    if( jkb != NULL ) {
  +        w->cs=jkb->object;
  +        jkb->init(env, jkb );
  +    }
   
  -    w->worker_private = NULL;
  +    worker_private = (jk_worker_lb_private_t *)pool->calloc(env,
  +                          pool, sizeof(jk_worker_lb_private_t));
  +    
  +    /* one minute service startup */
  +    worker_private->init_timeout = 60;
  +    jkb=env->createBean2(env, pool,"threadMutex", NULL);
  +    if( jkb != NULL ) {
  +        worker_private->cs=jkb->object;
  +        jkb->init(env, jkb );
  +    }
  +    w->worker_private = worker_private;
       w->service        = jk2_lb_service;
       
       for( i=0; i<JK_LB_LEVELS; i++ ) {
  
  
  

--
To unsubscribe, e-mail:   <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>

Reply via email to