mturk 2005/02/16 00:19:46 Modified: jk/native/common jk_ajp_common.c Log: Rewrite endpoint cache. The endpoints are now created on init, and are present for worker liftime. This also truly limits the number of connections to Tomcat. Revision Changes Path 1.79 +155 -154 jakarta-tomcat-connectors/jk/native/common/jk_ajp_common.c Index: jk_ajp_common.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native/common/jk_ajp_common.c,v retrieving revision 1.78 retrieving revision 1.79 diff -u -r1.78 -r1.79 --- jk_ajp_common.c 15 Feb 2005 12:11:20 -0000 1.78 +++ jk_ajp_common.c 16 Feb 2005 08:19:46 -0000 1.79 @@ -682,8 +682,15 @@ * Reset the endpoint (clean buf) */ -static void ajp_reset_endpoint(ajp_endpoint_t * ae) +static void ajp_reset_endpoint(ajp_endpoint_t * ae, jk_logger_t *l) { + if (ae->sd > 0 && !ae->reuse) { + jk_close_socket(ae->sd); + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "closed socket with sd = %d", ae->sd); + ae->sd = -1; + } jk_reset_pool(&(ae->pool)); } @@ -699,7 +706,8 @@ jk_close_socket(ae->sd); if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, - "closed sd = %d", ae->sd); + "closed socket with sd = %d", ae->sd); + ae->sd = -1; } jk_close_pool(&(ae->pool)); @@ -709,35 +717,31 @@ /* - * Try to reuse a previous connection + * Try another connection from cache */ -static void ajp_reuse_connection(ajp_endpoint_t * ae, jk_logger_t *l) +static void ajp_next_connection(ajp_endpoint_t **ae, jk_logger_t *l) { - ajp_worker_t *aw = ae->worker; + int rc; + ajp_worker_t *aw = (*ae)->worker; /* Close existing endpoint socket */ - jk_close_socket(ae->sd); - ae->sd = -1; - - if (aw->ep_cache_sz) { - int rc; - JK_ENTER_CS(&aw->cs, rc); - if (rc) { - unsigned i; + jk_close_socket((*ae)->sd); + (*ae)->sd = -1; - for (i = 0; i < aw->ep_cache_sz; i++) { - /* Find cache slot with usable socket */ - if (aw->ep_cache[i] && aw->ep_cache[i]->sd > 0) { - ae->sd = aw->ep_cache[i]->sd; - aw->ep_cache[i]->sd = -1; - ajp_close_endpoint(aw->ep_cache[i], l); - aw->ep_cache[i] = NULL; - break; - } + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int i; + for (i = 0; i < aw->ep_cache_sz; i++) { + /* Find cache slot with usable socket */ + if (aw->ep_cache[i] && aw->ep_cache[i]->sd > 0) { + ajp_endpoint_t *e = aw->ep_cache[i]; + aw->ep_cache[i] = *ae; + *ae = e; + break; } - JK_LEAVE_CS(&aw->cs, rc); } + JK_LEAVE_CS(&aw->cs, rc); } } @@ -1157,9 +1161,9 @@ * loop. */ if (err || (ajp_connection_tcp_send_message(ae, op->request, l) == JK_FALSE)) { - jk_log(l, JK_LOG_ERROR, - "Error sending request try another pooled connection"); - ajp_reuse_connection(ae, l); + jk_log(l, JK_LOG_INFO, + "Error sending request. Will try another pooled connection"); + ajp_next_connection(&ae, l); } else break; @@ -1366,9 +1370,7 @@ /* * Strange protocol error. */ - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, "Reuse: %d", ae->reuse); - ae->reuse = JK_FALSE; + jk_log(l, JK_LOG_INFO, " Protocol error: Reuse is set to false"); } /* Reuse in all cases */ ae->reuse = JK_TRUE; @@ -1630,7 +1632,6 @@ /* Up to there we can recover */ *is_recoverable_error = JK_TRUE; - op->recoverable = JK_TRUE; err = ajp_get_reply(e, s, l, p, op); if (err > 0) { @@ -1662,7 +1663,7 @@ } } /* Get another connection from the pool */ - ajp_reuse_connection(p, l); + ajp_next_connection(&p, l); if (err == JK_CLIENT_ERROR) { *is_recoverable_error = JK_FALSE; @@ -1752,12 +1753,28 @@ return JK_FALSE; } +static ajp_endpoint_t *ajp_create_endpoint(jk_worker_t *w, int proto, time_t now) +{ + ajp_endpoint_t *ae = (ajp_endpoint_t *)calloc(1, sizeof(ajp_endpoint_t)); + if (ae) { + ae->sd = -1; + ae->reuse = JK_FALSE; + ae->last_access = now; + jk_open_pool(&ae->pool, ae->buf, sizeof(ae->buf)); + ae->worker = w->worker_private; + ae->endpoint.endpoint_private = ae; + ae->proto = proto; + ae->endpoint.service = ajp_service; + ae->endpoint.done = ajp_done; + } + return ae; +} int ajp_init(jk_worker_t *pThis, jk_map_t *props, jk_worker_env_t *we, jk_logger_t *l, int proto) { - int cache; int rc = JK_FALSE; + int cache; /* * start the connection cache */ @@ -1767,7 +1784,7 @@ if (pThis && pThis->worker_private) { ajp_worker_t *p = pThis->worker_private; - int cache_sz = jk_get_worker_cache_size(props, p->name, cache); + p->ep_cache_sz = jk_get_worker_cache_size(props, p->name, cache); p->socket_timeout = jk_get_worker_socket_timeout(props, p->name, AJP_DEF_SOCKET_TIMEOUT); @@ -1851,29 +1868,40 @@ */ p->secret = jk_get_worker_secret(props, p->name); - p->ep_cache_sz = 0; - p->ep_mincache_sz = 0; - if (cache_sz > 0) { - p->ep_cache = - (ajp_endpoint_t **) malloc(sizeof(ajp_endpoint_t *) * - cache_sz); - if (p->ep_cache) { - int i; - p->ep_cache_sz = cache_sz; - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "setting connection cache size to %d", - p->ep_cache_sz); - /* Initialize cache slots */ - for (i = 0; i < cache_sz; i++) { - p->ep_cache[i] = NULL; - } - JK_INIT_CS(&(p->cs), i); - if (i == JK_FALSE) { + p->ep_mincache_sz = 1; + p->ep_cache = (ajp_endpoint_t **) malloc(sizeof(ajp_endpoint_t *) * + p->ep_cache_sz); + if (p->ep_cache) { + unsigned int i; + time_t now = time(NULL); + + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "setting connection cache size to %d", + p->ep_cache_sz); + /* Initialize cache slots */ + for (i = 0; i < p->ep_cache_sz; i++) { + p->ep_cache[i] = ajp_create_endpoint(pThis, proto, now); + if (p->ep_cache[i]) { + jk_log(l, JK_LOG_ERROR, + "Failed creating enpont cache slot %d errno=%d", + i, errno); JK_TRACE_EXIT(l); return JK_FALSE; } } + JK_INIT_CS(&(p->cs), i); + if (i == JK_FALSE) { + JK_TRACE_EXIT(l); + return JK_FALSE; + } + } + else { + jk_log(l, JK_LOG_ERROR, + "Could not malloc ep_cache of size %d", + p->ep_cache_sz); + JK_TRACE_EXIT(l); + return JK_FALSE; } rc = JK_TRUE; } @@ -1932,46 +1960,40 @@ JK_TRACE_ENTER(l); if (e && *e && (*e)->endpoint_private) { ajp_endpoint_t *p = (*e)->endpoint_private; + int rc; + ajp_worker_t *w = p->worker; - if (p->reuse) { - ajp_worker_t *w = p->worker; - if (w->ep_cache_sz) { - int rc; - JK_ENTER_CS(&w->cs, rc); - if (rc) { - unsigned int i; - - for (i = 0; i < w->ep_cache_sz; i++) { - if (!w->ep_cache[i]) { - w->ep_cache[i] = p; - ajp_reset_endpoint(p); - break; - } - } - JK_LEAVE_CS(&w->cs, rc); - if (i < w->ep_cache_sz) { - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "recycling connection cache slot=%d", i); - JK_TRACE_EXIT(l); - return JK_TRUE; - } - jk_log(l, JK_LOG_INFO, - "could not find empty cache slot from %d for worker %s" - ". Rise worker cachesize", - w->ep_cache_sz, w->name); + JK_ENTER_CS(&w->cs, rc); + if (rc) { + int i; + + for(i = w->ep_cache_sz - 1; i >= 0; i--) { + if (w->ep_cache[i] == NULL) { + w->ep_cache[i] = p; + ajp_reset_endpoint(p, l); + break; } } + JK_LEAVE_CS(&w->cs, rc); + if (i >= 0) { + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "recycling connection cache slot=%d for worker %s", + i, p->worker->name); + JK_TRACE_EXIT(l); + *e = NULL; + return JK_TRUE; + } + jk_log(l, JK_LOG_INFO, + "could not find empty cache slot from %d for worker %s" + ". Rise worker cachesize", + w->ep_cache_sz, w->name); } - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "done with connection %d for worker %s", - p->sd, p->worker->name); - ajp_close_endpoint(p, l); - *e = NULL; + jk_log(l, JK_LOG_ERROR, + "Could not lock mutex errno=%d", errno); JK_TRACE_EXIT(l); - return JK_TRUE; + return JK_FALSE; } JK_LOG_NULL_PARAMS(l); @@ -1988,84 +2010,63 @@ ajp_worker_t *aw = pThis->worker_private; ajp_endpoint_t *ae = NULL; time_t now = time(NULL); + int rc; - if (aw->ep_cache_sz) { - int rc; - JK_ENTER_CS(&aw->cs, rc); - if (rc) { - unsigned i; - for (i = 0; i < aw->ep_cache_sz; i++) { - if (aw->ep_cache[i]) { - ae = aw->ep_cache[i]; - aw->ep_cache[i] = NULL; - break; - } + JK_ENTER_CS(&aw->cs, rc); + if (rc) { + unsigned int i; + for (i = 0; i < aw->ep_cache_sz; i++) { + if (aw->ep_cache[i]) { + ae = aw->ep_cache[i]; + aw->ep_cache[i] = NULL; + break; } - /* Handle enpoint cache timeouts */ - if (ae && aw->cache_timeout) { - for (i = 0; i < aw->ep_cache_sz; i++) { - /* Skip the cached enty */ - if (aw->ep_cache[i] && (ae != aw->ep_cache[i])) { - int elapsed = - (int)(now - ae->last_access); - if (elapsed > aw->cache_timeout) { - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "cleaning cache slot = %d elapsed %u", - i, elapsed); - ajp_close_endpoint(aw->ep_cache[i], l); - aw->ep_cache[i] = NULL; - } + } + /* Handle enpoint cache timeouts */ + if (ae && aw->cache_timeout) { + for (i = 0; i < aw->ep_cache_sz; i++) { + /* Skip the cached enty */ + if (aw->ep_cache[i] && (ae != aw->ep_cache[i])) { + int elapsed = + (int)(now - ae->last_access); + if (elapsed > aw->cache_timeout) { + aw->ep_cache[i]->reuse = JK_FALSE; + ajp_reset_endpoint(aw->ep_cache[i], l); + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "cleaning cache slot = %d elapsed %u", + i, elapsed); } } } - JK_LEAVE_CS(&aw->cs, rc); - if (ae) { - if (ae->sd > 0) { - /* Handle timeouts for open sockets */ - int elapsed = (int)(now - ae->last_access); + } + JK_LEAVE_CS(&aw->cs, rc); + if (ae) { + if (ae->sd > 0) { + /* Handle timeouts for open sockets */ + int elapsed = (int)(now - ae->last_access); + if (JK_IS_DEBUG_LEVEL(l)) + jk_log(l, JK_LOG_DEBUG, + "time elapsed since last request = %u seconds", + elapsed); + if (aw->recycle_timeout > 0 && + elapsed > aw->recycle_timeout) { + ae->reuse = JK_FALSE; + ajp_reset_endpoint(ae, l); if (JK_IS_DEBUG_LEVEL(l)) jk_log(l, JK_LOG_DEBUG, - "time elapsed since last request = %u seconds", - elapsed); - if (aw->recycle_timeout > 0 && - elapsed > aw->recycle_timeout) { - jk_close_socket(ae->sd); - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "reached connection recycle timeout, closed sd = %d", - ae->sd); - ae->sd = -1; /* just to avoid twice close */ - } + "reached connection recycle timeout, closed sd = %d", + ae->sd); } - ae->last_access = now; - *je = &ae->endpoint; - JK_TRACE_EXIT(l); - return JK_TRUE; } + ae->last_access = now; + *je = &ae->endpoint; + JK_TRACE_EXIT(l); + return JK_TRUE; } } - /* Create new endpoint */ - ae = (ajp_endpoint_t *) calloc(1, sizeof(ajp_endpoint_t)); - if (ae) { - ae->sd = -1; - ae->reuse = JK_FALSE; - ae->last_access = now; - jk_open_pool(&ae->pool, ae->buf, sizeof(ae->buf)); - ae->worker = pThis->worker_private; - ae->endpoint.endpoint_private = ae; - ae->proto = proto; - ae->endpoint.service = ajp_service; - ae->endpoint.done = ajp_done; - *je = &ae->endpoint; - if (JK_IS_DEBUG_LEVEL(l)) - jk_log(l, JK_LOG_DEBUG, - "created new endpoint for worker %s", aw->name); - JK_TRACE_EXIT(l); - return JK_TRUE; - } - jk_log(l, JK_LOG_ERROR, - "can't malloc endpoint"); + jk_log(l, JK_LOG_INFO, + "can't find free endpoint"); } else { JK_LOG_NULL_PARAMS(l);
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]