mturk       2005/02/16 00:19:46

  Modified:    jk/native/common jk_ajp_common.c
  Log:
  Rewrite endpoint cache. The endpoints are now created on init,
  and are present for worker liftime. This also truly limits the number
  of connections to Tomcat.
  
  Revision  Changes    Path
  1.79      +155 -154  
jakarta-tomcat-connectors/jk/native/common/jk_ajp_common.c
  
  Index: jk_ajp_common.c
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_ajp_common.c,v
  retrieving revision 1.78
  retrieving revision 1.79
  diff -u -r1.78 -r1.79
  --- jk_ajp_common.c   15 Feb 2005 12:11:20 -0000      1.78
  +++ jk_ajp_common.c   16 Feb 2005 08:19:46 -0000      1.79
  @@ -682,8 +682,15 @@
    * Reset the endpoint (clean buf)
    */
   
  -static void ajp_reset_endpoint(ajp_endpoint_t * ae)
  +static void ajp_reset_endpoint(ajp_endpoint_t * ae, jk_logger_t *l)
   {
  +    if (ae->sd > 0 && !ae->reuse) {
  +        jk_close_socket(ae->sd);
  +        if (JK_IS_DEBUG_LEVEL(l))
  +            jk_log(l, JK_LOG_DEBUG,
  +                   "closed socket with sd = %d", ae->sd);
  +        ae->sd = -1;
  +    }
       jk_reset_pool(&(ae->pool));
   }
   
  @@ -699,7 +706,8 @@
           jk_close_socket(ae->sd);
           if (JK_IS_DEBUG_LEVEL(l))
               jk_log(l, JK_LOG_DEBUG,
  -                   "closed sd = %d", ae->sd);
  +                   "closed socket with sd = %d", ae->sd);
  +        ae->sd = -1;
       }
   
       jk_close_pool(&(ae->pool));
  @@ -709,35 +717,31 @@
   
   
   /*
  - * Try to reuse a previous connection
  + * Try another connection from cache
    */
   
  -static void ajp_reuse_connection(ajp_endpoint_t * ae, jk_logger_t *l)
  +static void ajp_next_connection(ajp_endpoint_t **ae, jk_logger_t *l)
   {
  -    ajp_worker_t *aw = ae->worker;
  +    int rc;
  +    ajp_worker_t *aw = (*ae)->worker;
   
       /* Close existing endpoint socket */
  -    jk_close_socket(ae->sd);
  -    ae->sd = -1;
  -
  -    if (aw->ep_cache_sz) {
  -        int rc;
  -        JK_ENTER_CS(&aw->cs, rc);
  -        if (rc) {
  -            unsigned i;
  +    jk_close_socket((*ae)->sd);
  +    (*ae)->sd = -1;
   
  -            for (i = 0; i < aw->ep_cache_sz; i++) {
  -                /* Find cache slot with usable socket */
  -                if (aw->ep_cache[i] && aw->ep_cache[i]->sd > 0) {
  -                    ae->sd = aw->ep_cache[i]->sd;
  -                    aw->ep_cache[i]->sd = -1;
  -                    ajp_close_endpoint(aw->ep_cache[i], l);
  -                    aw->ep_cache[i] = NULL;
  -                    break;
  -                }
  +    JK_ENTER_CS(&aw->cs, rc);
  +    if (rc) {
  +        unsigned int i;
  +        for (i = 0; i < aw->ep_cache_sz; i++) {
  +            /* Find cache slot with usable socket */
  +            if (aw->ep_cache[i] && aw->ep_cache[i]->sd > 0) {
  +                ajp_endpoint_t *e = aw->ep_cache[i];
  +                aw->ep_cache[i] = *ae;
  +                *ae = e;
  +                break;
               }
  -            JK_LEAVE_CS(&aw->cs, rc);
           }
  +        JK_LEAVE_CS(&aw->cs, rc);
       }
   }
   
  @@ -1157,9 +1161,9 @@
            * loop. */
           if (err ||
               (ajp_connection_tcp_send_message(ae, op->request, l) == 
JK_FALSE)) {
  -            jk_log(l, JK_LOG_ERROR,
  -                   "Error sending request try another pooled connection");
  -            ajp_reuse_connection(ae, l);
  +            jk_log(l, JK_LOG_INFO,
  +                   "Error sending request. Will try another pooled 
connection");
  +            ajp_next_connection(&ae, l);
           }
           else
               break;
  @@ -1366,9 +1370,7 @@
                   /*
                    * Strange protocol error.
                    */
  -                if (JK_IS_DEBUG_LEVEL(l))
  -                    jk_log(l, JK_LOG_DEBUG, "Reuse: %d", ae->reuse);
  -                ae->reuse = JK_FALSE;
  +                jk_log(l, JK_LOG_INFO, " Protocol error: Reuse is set to 
false");
               }
               /* Reuse in all cases */
               ae->reuse = JK_TRUE;
  @@ -1630,7 +1632,6 @@
   
                   /* Up to there we can recover */
                   *is_recoverable_error = JK_TRUE;
  -                op->recoverable = JK_TRUE;
   
                   err = ajp_get_reply(e, s, l, p, op);
                   if (err > 0) {
  @@ -1662,7 +1663,7 @@
                   }
               }
               /* Get another connection from the pool */
  -            ajp_reuse_connection(p, l);
  +            ajp_next_connection(&p, l);
   
               if (err == JK_CLIENT_ERROR) {
                   *is_recoverable_error = JK_FALSE;
  @@ -1752,12 +1753,28 @@
       return JK_FALSE;
   }
   
  +static ajp_endpoint_t *ajp_create_endpoint(jk_worker_t *w, int proto, time_t 
now)
  +{
  +    ajp_endpoint_t *ae = (ajp_endpoint_t *)calloc(1, sizeof(ajp_endpoint_t));
  +    if (ae) {
  +        ae->sd = -1;
  +        ae->reuse = JK_FALSE;
  +        ae->last_access = now;
  +        jk_open_pool(&ae->pool, ae->buf, sizeof(ae->buf));
  +        ae->worker = w->worker_private;
  +        ae->endpoint.endpoint_private = ae;
  +        ae->proto = proto;
  +        ae->endpoint.service = ajp_service;
  +        ae->endpoint.done = ajp_done;
  +    }
  +    return ae;
  +}
   
   int ajp_init(jk_worker_t *pThis,
                jk_map_t *props, jk_worker_env_t *we, jk_logger_t *l, int proto)
   {
  -    int cache;
       int rc = JK_FALSE;
  +    int cache;
       /*
        * start the connection cache
        */
  @@ -1767,7 +1784,7 @@
   
       if (pThis && pThis->worker_private) {
           ajp_worker_t *p = pThis->worker_private;
  -        int cache_sz = jk_get_worker_cache_size(props, p->name, cache);
  +        p->ep_cache_sz = jk_get_worker_cache_size(props, p->name, cache);
           p->socket_timeout =
               jk_get_worker_socket_timeout(props, p->name, 
AJP_DEF_SOCKET_TIMEOUT);
   
  @@ -1851,29 +1868,40 @@
            */
   
           p->secret = jk_get_worker_secret(props, p->name);
  -        p->ep_cache_sz = 0;
  -        p->ep_mincache_sz = 0;
  -        if (cache_sz > 0) {
  -            p->ep_cache =
  -                (ajp_endpoint_t **) malloc(sizeof(ajp_endpoint_t *) *
  -                                           cache_sz);
  -            if (p->ep_cache) {
  -                int i;
  -                p->ep_cache_sz = cache_sz;
  -                if (JK_IS_DEBUG_LEVEL(l))
  -                    jk_log(l, JK_LOG_DEBUG,
  -                           "setting connection cache size to %d",
  -                           p->ep_cache_sz);
  -                /* Initialize cache slots */
  -                for (i = 0; i < cache_sz; i++) {
  -                    p->ep_cache[i] = NULL;
  -                }
  -                JK_INIT_CS(&(p->cs), i);
  -                if (i == JK_FALSE) {
  +        p->ep_mincache_sz = 1;
  +        p->ep_cache = (ajp_endpoint_t **) malloc(sizeof(ajp_endpoint_t *) *
  +                                                 p->ep_cache_sz);
  +        if (p->ep_cache) {
  +            unsigned int i;
  +            time_t now = time(NULL);
  +
  +            if (JK_IS_DEBUG_LEVEL(l))
  +                jk_log(l, JK_LOG_DEBUG,
  +                        "setting connection cache size to %d",
  +                        p->ep_cache_sz);
  +            /* Initialize cache slots */
  +            for (i = 0; i < p->ep_cache_sz; i++) {
  +                p->ep_cache[i] = ajp_create_endpoint(pThis, proto, now);
  +                if (p->ep_cache[i]) {
  +                    jk_log(l, JK_LOG_ERROR,
  +                            "Failed creating enpont cache slot %d errno=%d",
  +                            i, errno);
                       JK_TRACE_EXIT(l);
                       return JK_FALSE;
                   }
               }
  +            JK_INIT_CS(&(p->cs), i);
  +            if (i == JK_FALSE) {
  +                JK_TRACE_EXIT(l);
  +                return JK_FALSE;
  +            }
  +        }
  +        else {
  +            jk_log(l, JK_LOG_ERROR,
  +                   "Could not malloc ep_cache of size %d",
  +                   p->ep_cache_sz);
  +            JK_TRACE_EXIT(l);
  +            return JK_FALSE;
           }
           rc = JK_TRUE;
       }
  @@ -1932,46 +1960,40 @@
       JK_TRACE_ENTER(l);
       if (e && *e && (*e)->endpoint_private) {
           ajp_endpoint_t *p = (*e)->endpoint_private;
  +        int rc;
  +        ajp_worker_t *w = p->worker;
   
  -        if (p->reuse) {
  -            ajp_worker_t *w = p->worker;
  -            if (w->ep_cache_sz) {
  -                int rc;
  -                JK_ENTER_CS(&w->cs, rc);
  -                if (rc) {
  -                    unsigned int i;
  -
  -                    for (i = 0; i < w->ep_cache_sz; i++) {
  -                        if (!w->ep_cache[i]) {
  -                            w->ep_cache[i] = p;
  -                            ajp_reset_endpoint(p);
  -                            break;
  -                        }
  -                    }
  -                    JK_LEAVE_CS(&w->cs, rc);
  -                    if (i < w->ep_cache_sz) {
  -                        if (JK_IS_DEBUG_LEVEL(l))
  -                            jk_log(l, JK_LOG_DEBUG,
  -                                   "recycling connection cache slot=%d", i);
  -                        JK_TRACE_EXIT(l);
  -                        return JK_TRUE;
  -                    }
  -                    jk_log(l, JK_LOG_INFO,
  -                           "could not find empty cache slot from %d for 
worker %s"
  -                           ". Rise worker cachesize",
  -                           w->ep_cache_sz, w->name);
  +        JK_ENTER_CS(&w->cs, rc);
  +        if (rc) {
  +            int i;
  +
  +            for(i = w->ep_cache_sz - 1; i >= 0; i--) {
  +                if (w->ep_cache[i] == NULL) {
  +                    w->ep_cache[i] = p;
  +                    ajp_reset_endpoint(p, l);
  +                    break;
                   }
               }
  +            JK_LEAVE_CS(&w->cs, rc);
  +            if (i >= 0) {
  +                if (JK_IS_DEBUG_LEVEL(l))
  +                    jk_log(l, JK_LOG_DEBUG,
  +                            "recycling connection cache slot=%d for worker 
%s",
  +                            i, p->worker->name);
  +                JK_TRACE_EXIT(l);
  +                *e = NULL;
  +                return JK_TRUE;
  +            }
  +            jk_log(l, JK_LOG_INFO,
  +                    "could not find empty cache slot from %d for worker %s"
  +                    ". Rise worker cachesize",
  +                    w->ep_cache_sz, w->name);
           }
  -        if (JK_IS_DEBUG_LEVEL(l))
  -            jk_log(l, JK_LOG_DEBUG,
  -                   "done with connection %d for worker %s",
  -                   p->sd, p->worker->name);
  -        ajp_close_endpoint(p, l);
  -        *e = NULL;
   
  +        jk_log(l, JK_LOG_ERROR,
  +               "Could not lock mutex errno=%d", errno);
           JK_TRACE_EXIT(l);
  -        return JK_TRUE;
  +        return JK_FALSE;
       }
   
       JK_LOG_NULL_PARAMS(l);
  @@ -1988,84 +2010,63 @@
           ajp_worker_t *aw = pThis->worker_private;
           ajp_endpoint_t *ae = NULL;
           time_t now = time(NULL);
  +        int rc;
   
  -        if (aw->ep_cache_sz) {
  -            int rc;
  -            JK_ENTER_CS(&aw->cs, rc);
  -            if (rc) {
  -                unsigned i;
  -                for (i = 0; i < aw->ep_cache_sz; i++) {
  -                    if (aw->ep_cache[i]) {
  -                        ae = aw->ep_cache[i];
  -                        aw->ep_cache[i] = NULL;
  -                        break;
  -                    }
  +        JK_ENTER_CS(&aw->cs, rc);
  +        if (rc) {
  +            unsigned int i;
  +            for (i = 0; i < aw->ep_cache_sz; i++) {
  +                if (aw->ep_cache[i]) {
  +                    ae = aw->ep_cache[i];
  +                    aw->ep_cache[i] = NULL;
  +                    break;
                   }
  -                /* Handle enpoint cache timeouts */
  -                if (ae && aw->cache_timeout) {
  -                    for (i = 0; i < aw->ep_cache_sz; i++) {
  -                        /* Skip the cached enty */
  -                        if (aw->ep_cache[i] && (ae != aw->ep_cache[i])) {
  -                            int elapsed =
  -                                (int)(now - ae->last_access);
  -                            if (elapsed > aw->cache_timeout) {
  -                                if (JK_IS_DEBUG_LEVEL(l))
  -                                    jk_log(l, JK_LOG_DEBUG,
  -                                           "cleaning cache slot = %d elapsed 
%u",
  -                                           i, elapsed);
  -                                ajp_close_endpoint(aw->ep_cache[i], l);
  -                                aw->ep_cache[i] = NULL;
  -                            }
  +            }
  +            /* Handle enpoint cache timeouts */
  +            if (ae && aw->cache_timeout) {
  +                for (i = 0; i < aw->ep_cache_sz; i++) {
  +                    /* Skip the cached enty */
  +                    if (aw->ep_cache[i] && (ae != aw->ep_cache[i])) {
  +                        int elapsed =
  +                            (int)(now - ae->last_access);
  +                        if (elapsed > aw->cache_timeout) {
  +                            aw->ep_cache[i]->reuse = JK_FALSE;
  +                            ajp_reset_endpoint(aw->ep_cache[i], l);
  +                            if (JK_IS_DEBUG_LEVEL(l))
  +                                jk_log(l, JK_LOG_DEBUG,
  +                                        "cleaning cache slot = %d elapsed 
%u",
  +                                        i, elapsed);
                           }
                       }
                   }
  -                JK_LEAVE_CS(&aw->cs, rc);
  -                if (ae) {
  -                    if (ae->sd > 0) {
  -                        /* Handle timeouts for open sockets */
  -                        int elapsed = (int)(now - ae->last_access);
  +            }
  +            JK_LEAVE_CS(&aw->cs, rc);
  +            if (ae) {
  +                if (ae->sd > 0) {
  +                    /* Handle timeouts for open sockets */
  +                    int elapsed = (int)(now - ae->last_access);
  +                    if (JK_IS_DEBUG_LEVEL(l))
  +                        jk_log(l, JK_LOG_DEBUG,
  +                                "time elapsed since last request = %u 
seconds",
  +                                elapsed);
  +                    if (aw->recycle_timeout > 0 &&
  +                        elapsed > aw->recycle_timeout) {
  +                        ae->reuse = JK_FALSE;
  +                        ajp_reset_endpoint(ae, l);
                           if (JK_IS_DEBUG_LEVEL(l))
                               jk_log(l, JK_LOG_DEBUG,
  -                                   "time elapsed since last request = %u 
seconds",
  -                                   elapsed);
  -                        if (aw->recycle_timeout > 0 &&
  -                            elapsed > aw->recycle_timeout) {
  -                            jk_close_socket(ae->sd);
  -                            if (JK_IS_DEBUG_LEVEL(l))
  -                                jk_log(l, JK_LOG_DEBUG,
  -                                       "reached connection recycle timeout, 
closed sd = %d",
  -                                       ae->sd);
  -                            ae->sd = -1;        /* just to avoid twice close 
*/
  -                        }
  +                                    "reached connection recycle timeout, 
closed sd = %d",
  +                                    ae->sd);
                       }
  -                    ae->last_access = now;
  -                    *je = &ae->endpoint;
  -                    JK_TRACE_EXIT(l);
  -                    return JK_TRUE;
                   }
  +                ae->last_access = now;
  +                *je = &ae->endpoint;
  +                JK_TRACE_EXIT(l);
  +                return JK_TRUE;
               }
           }
  -        /* Create new endpoint */
  -        ae = (ajp_endpoint_t *) calloc(1, sizeof(ajp_endpoint_t));
  -        if (ae) {
  -            ae->sd = -1;
  -            ae->reuse = JK_FALSE;
  -            ae->last_access = now;
  -            jk_open_pool(&ae->pool, ae->buf, sizeof(ae->buf));
  -            ae->worker = pThis->worker_private;
  -            ae->endpoint.endpoint_private = ae;
  -            ae->proto = proto;
  -            ae->endpoint.service = ajp_service;
  -            ae->endpoint.done = ajp_done;
  -            *je = &ae->endpoint;
  -            if (JK_IS_DEBUG_LEVEL(l))
  -                jk_log(l, JK_LOG_DEBUG,
  -                       "created new endpoint for worker %s", aw->name);
  -            JK_TRACE_EXIT(l);
  -            return JK_TRUE;
  -        }
  -        jk_log(l, JK_LOG_ERROR,
  -               "can't malloc endpoint");
  +        jk_log(l, JK_LOG_INFO,
  +               "can't find free endpoint");
       }
       else {
           JK_LOG_NULL_PARAMS(l);
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to