mturk 2002/06/29 00:48:20
Modified: jk/native2/common jk_worker_lb.c
Log:
Serialize the calls to the workers until the first call finishes or times out.
Added new config param 'timeout' setting the initialization worker
timeout.
Revision Changes Path
1.21 +88 -7 jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c
Index: jk_worker_lb.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_worker_lb.c,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- jk_worker_lb.c 22 Jun 2002 16:57:54 -0000 1.20
+++ jk_worker_lb.c 29 Jun 2002 07:48:20 -0000 1.21
@@ -70,6 +70,10 @@
#include "jk_env.h"
#include "jk_requtil.h"
+#ifdef HAS_APR
+#include "apr_thread_proc.h"
+#endif
+
#define DEFAULT_LB_FACTOR (1.0)
/* Time to wait before retry... XXX make it configurable*/
@@ -79,6 +83,12 @@
#define NO_WORKER_MSG "The servlet container is temporary unavailable or being
upgraded\n";
+typedef struct {
+ struct jk_mutex *cs;
+ int init_timeout;
+ int initializing;
+} jk_worker_lb_private_t;
+
/** Find the best worker. In process, check if timeout expired
for workers that failed in the past and give them another chance.
@@ -281,6 +291,8 @@
{
int attempt=0;
jk_workerEnv_t *wEnv=lb->workerEnv;
+ jk_worker_lb_private_t *lb_priv = lb->worker_private;
+ jk_worker_t *rec = NULL;
if( s==NULL ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
@@ -309,10 +321,29 @@
}
while(1) {
- jk_worker_t *rec;
int rc;
- rec=jk2_get_most_suitable_worker(env, lb, s, attempt);
+ /* Since there is potential worker state change
+ * make the find best worker call thread safe
+ */
+ if (!lb_priv->initializing) {
+ if (lb_priv->cs != NULL)
+ lb_priv->cs->lock(env, lb_priv->cs);
+ rec=jk2_get_most_suitable_worker(env, lb, s, attempt);
+
+ if (lb_priv->cs != NULL)
+ lb_priv->cs->unLock(env, lb_priv->cs);
+ }
+ else if (!rec){
+ /* If we are initializing the service wait until
+ * the initialization finishes or times out
+ */
+ if (lb->cs != NULL) {
+ lb->cs->lock(env, lb->cs);
+ lb->cs->unLock(env, lb->cs);
+ }
+ continue;
+ }
attempt++;
s->is_recoverable_error = JK_FALSE;
@@ -354,16 +385,45 @@
s->jvm_route = rec->route;
s->realWorker = rec;
-
+ if (!rec->initialized && !lb_priv->initializing && lb->cs != NULL) {
+ /* If the worker has not been called yet serialize the call */
+ lb->cs->lock(env, lb->cs);
+ lb_priv->initializing = JK_TRUE;
+ rec->error_time = time(NULL);
+ }
rc = rec->service(env, rec, s);
if(rc==JK_OK) {
rec->in_error_state = JK_FALSE;
rec->error_time = 0;
- /* the endpoint that succeeded is saved for done() */
+ /* Set the initialized flag to TRUE if that was the first call */
+ if (!rec->initialized && lb->cs != NULL) {
+ rec->initialized = JK_TRUE;
+ lb_priv->initializing = JK_FALSE;
+ lb->cs->unLock(env, lb->cs);
+ }
return JK_OK;
}
+ if (!rec->initialized && lb->cs != NULL) {
+ time_t now = time(NULL);
+ /* In the case of initialization timeout disable the worker */
+ if ((int)(now - rec->error_time) > lb_priv->init_timeout) {
+ rec->mbean->disabled = JK_TRUE;
+ lb_priv->initializing = JK_FALSE;
+ s->is_recoverable_error = JK_FALSE;
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "lb_worker.service() worker init timeout for %s\n",
+ rec->channelName);
+ lb->cs->unLock(env, lb->cs);
+ }
+ else {
+#ifdef HAS_APR
+ apr_thread_yield();
+#endif
+ continue;
+ }
+ }
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb.service() worker failed %s\n", rec->mbean->name );
@@ -373,7 +433,7 @@
* Time for fault tolerance (if possible)...
*/
rec->in_error_state = JK_TRUE;
- rec->error_time = time(0);
+ rec->error_time = time(NULL);
if(!s->is_recoverable_error) {
/* Error is not recoverable - break with an error. */
@@ -464,6 +524,9 @@
lb->noWorkerCode=atoi( value );
} else if( strcmp( name, "hwBalanceErr") == 0 ) {
lb->hwBalanceErr=atoi( value );
+ } else if( strcmp( name, "initTimeout") == 0 ) {
+ jk_worker_lb_private_t *priv = lb->worker_private;
+ priv->init_timeout=atoi( value );
}
return JK_ERR;
@@ -506,7 +569,9 @@
{
jk_worker_t *w;
int i;
-
+ jk_bean_t *jkb;
+ jk_worker_lb_private_t *worker_private;
+
if(NULL == name ) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.factory() NullPointerException\n");
@@ -520,8 +585,24 @@
"lb_worker.factory() OutOfMemoryException\n");
return JK_ERR;
}
+
+ jkb=env->createBean2(env, pool,"threadMutex", NULL);
+ if( jkb != NULL ) {
+ w->cs=jkb->object;
+ jkb->init(env, jkb );
+ }
- w->worker_private = NULL;
+ worker_private = (jk_worker_lb_private_t *)pool->calloc(env,
+ pool, sizeof(jk_worker_lb_private_t));
+
+ /* one minute service startup */
+ worker_private->init_timeout = 60;
+ jkb=env->createBean2(env, pool,"threadMutex", NULL);
+ if( jkb != NULL ) {
+ worker_private->cs=jkb->object;
+ jkb->init(env, jkb );
+ }
+ w->worker_private = worker_private;
w->service = jk2_lb_service;
for( i=0; i<JK_LB_LEVELS; i++ ) {
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>