costin 02/01/25 22:55:17 Modified: jk/native2/common jk_channel_jni.c Log: A number of bug fixes - make sure we save the global ref, not the ref. We now get an Endpoint from the java side - and cache/reuse it with the jni endpoint. The jni channel is 'interesting' - it's the first non-stream channel. It could be treated as a stream, by using 2 threads ( so send/receive model will work ), but I wanted to preserve 'single thread, no sync' model from the previous jni worker. It may seem a bit complicated - and it adds some limitations on the model, but I think it's worth it. The idea is that, as before, the first message ( containing the request ) gives control to tomcat who may send back messages ( and get back responses to it's messages ). The send() method in channel is doing exactly this first step. Tomcat will use a native method to send messages ( that replaces receive(), which is not used ), which are dispatched. The response is actually put in the same buffer - a single jarray pin is needed. We must make sure we don't run into buffer problems - but that can be resolved. Revision Changes Path 1.2 +359 -52 jakarta-tomcat-connectors/jk/native2/common/jk_channel_jni.c Index: jk_channel_jni.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_channel_jni.c,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- jk_channel_jni.c 12 Jan 2002 04:59:19 -0000 1.1 +++ jk_channel_jni.c 26 Jan 2002 06:55:17 -0000 1.2 @@ -79,7 +79,7 @@ */ typedef struct { jk_vm_t *vm; - + char *className; jclass jniBridge; @@ -87,8 +87,15 @@ } jk_channel_jni_private_t; typedef struct { - JNIEnv *env; + JNIEnv *jniEnv; + + int len; + jbyteArray jarray; + char *carray; + int arrayLen; + jobject epJ; + jobject msgJ; } jk_ch_jni_ep_private_t; @@ -105,6 +112,12 @@ { int err; char *tmp; + + /* the channel is init-ed during a worker validation. If a jni worker + is not already defined... well, not good. But on open we should + have it. + */ + _this->worker=worker; _this->properties=props; @@ -113,7 +126,7 @@ "channel_jni.init(): %s\n", worker->name ); - return err; + return JK_TRUE; } /** Assume the jni-worker or someone else started @@ -125,12 +138,18 @@ { jk_workerEnv_t *we=endpoint->worker->workerEnv; JNIEnv *jniEnv; + jk_ch_jni_ep_private_t *epData; + jmethodID jmethod; + jobject jobj; jk_channel_jni_private_t *jniCh=_this->_privatePtr; - - /** XXX make it customizable */ - jniCh->className=JAVA_BRIDGE_CLASS_NAME; + if( endpoint->channelData != NULL ) { + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channel_jni.open() already open, nothing else to do\n"); + return JK_TRUE; + } + jniCh->vm=(jk_vm_t *)we->vm; jniEnv = (JNIEnv *)jniCh->vm->attach( env, jniCh->vm ); @@ -139,19 +158,84 @@ "channel_jni.open() can't attach\n" ); return JK_FALSE; } + /* Create the buffers used by the write method. We allocate a + byte[] and jbyte[] - I have no idea what's more expensive, + to copy a buffer or to 'pin' the jbyte[] for copying. + + This will be tuned if needed, for now it seems the easiest + solution + */ + epData=(jk_ch_jni_ep_private_t *) + endpoint->pool->calloc( env,endpoint->pool, + sizeof( jk_ch_jni_ep_private_t )); + endpoint->channelData=epData; + /** XXX make it customizable */ + jniCh->className=JAVA_BRIDGE_CLASS_NAME; + jniCh->jniBridge = (*jniEnv)->FindClass(jniEnv, jniCh->className ); + jniCh->jniBridge=(*jniEnv)->NewGlobalRef( jniEnv, jniCh->jniBridge); + if( jniCh->jniBridge == NULL ) { env->l->jkLog(env, env->l, JK_LOG_INFO, "channel_jni.open() can't find %s\n",jniCh->className ); return JK_FALSE; } - - jniCh->writeMethod = + + jmethod=(*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge, + "createEndpointStatic", "(JJ)Lorg/apache/jk/core/Endpoint;"); + if( jmethod == NULL ) { + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channel_jni.open() can't find createEndpointStatic\n"); + return JK_FALSE; + } + jobj=(*jniEnv)->CallStaticObjectMethod( jniEnv, jniCh->jniBridge, + jmethod, + (jlong)(long)(void *)env, + (jlong)(long)(void *)endpoint ); + epData->epJ=(*jniEnv)->NewGlobalRef( jniEnv, jobj ); + + jmethod=(*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge, + "createMessage", + "()Lorg/apache/jk/common/MsgAjp;"); + if( jmethod == NULL ) { + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channel_jni.open() can't find createMessage\n"); + return JK_FALSE; + } + jobj=(*jniEnv)->CallStaticObjectMethod( jniEnv, jniCh->jniBridge, + jmethod ); + epData->msgJ=(*jniEnv)->NewGlobalRef( jniEnv, jobj ); + + /* XXX Destroy them in close */ + + jmethod=(*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge, + "getBuffer", + "(Lorg/apache/jk/common/MsgAjp;)[B"); + if( jmethod == NULL ) { + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channel_jni.open() can't find getBuffer\n"); + return JK_FALSE; + } + epData->jarray=(*jniEnv)->CallStaticObjectMethod( jniEnv, jniCh->jniBridge, + jmethod, epData->msgJ ); + /*epData->jarray=(*jniEnv)->NewByteArray(jniEnv, 10000 ); */ + epData->jarray=(*jniEnv)->NewGlobalRef( jniEnv, epData->jarray ); + + epData->arrayLen = (*jniEnv)->GetArrayLength( jniEnv, epData->jarray ); + + /* XXX > ajp buffer size. Don't know how to fragment or reallocate + yet */ + epData->carray=(char *)endpoint->pool->calloc( env, endpoint->pool, + epData->arrayLen); + + jniCh->writeMethod = (*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge, - "write", "(JJ)I"); + "receiveRequest", + "(JJLorg/apache/jk/core/Endpoint;" + "Lorg/apache/jk/common/MsgAjp;)I"); if( jniCh->writeMethod == NULL ) { env->l->jkLog(env, env->l, JK_LOG_EMERG, @@ -159,10 +243,14 @@ return JK_FALSE; } + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channel_jni.open() found write method, open ok\n" ); + + /* Don't detach ( XXX Need to find out when the thread is * closing in order for this to work ) */ - jniCh->vm->detach( env, jniCh->vm ); + /* jniCh->vm->detach( env, jniCh->vm ); */ return JK_TRUE; } @@ -172,7 +260,15 @@ static int JK_METHOD jk_channel_jni_close(jk_env_t *env,jk_channel_t *_this, jk_endpoint_t *endpoint) { + jk_ch_jni_ep_private_t *epData; + + epData=(jk_ch_jni_ep_private_t *)endpoint->channelData; + + /* (*jniEnv)->DeleteGlobalRef( jniEnv, epData->msgJ ); */ + /* (*jniEnv)->DeleteGlobalRef( jniEnv, epData->epJ ); */ + return JK_TRUE; + } /** send a long message @@ -187,19 +283,41 @@ * @was: jk_tcp_socket_sendfull */ static int JK_METHOD jk_channel_jni_send(jk_env_t *env, jk_channel_t *_this, - jk_endpoint_t *endpoint, - char *b, int len) + jk_endpoint_t *endpoint, + jk_msg_t *msg) { int sd; int sent=0; + char *b; + int len; jbyte *nbuf; jbyteArray jbuf; int jlen; - jboolean iscommit; - + jboolean iscommit=0; + JNIEnv *jniEnv; jk_channel_jni_private_t *jniCh=_this->_privatePtr; + jk_ch_jni_ep_private_t *epData= + (jk_ch_jni_ep_private_t *)endpoint->channelData;; + + env->l->jkLog(env, env->l, JK_LOG_INFO,"channel_jni.send()\n" ); + + if( epData == NULL ) { + jk_channel_jni_open( env, _this, endpoint ); + epData=(jk_ch_jni_ep_private_t *)endpoint->channelData; + } + + msg->end( env, msg ); + len=msg->len; + b=msg->buf; - JNIEnv *jniEnv=(JNIEnv *)endpoint->channelData;; + jniEnv=NULL; /* epData->jniEnv; */ + jbuf=epData->jarray; + + if( jniCh->writeMethod == NULL ) { + env->l->jkLog(env, env->l, JK_LOG_EMERG, + "channel_jni.send() no write method\n" ); + return JK_FALSE; + } if( jniEnv==NULL ) { /* Try first getEnv, then attach */ jniEnv = (JNIEnv *)jniCh->vm->attach( env, jniCh->vm ); @@ -210,6 +328,9 @@ } } + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channel_jni.send() getting byte array \n" ); + /* Copy the data in the ( recycled ) jbuf, then call the * write method. XXX We could try 'pining' if the vm supports * it, this is a looong lived object. @@ -219,7 +340,7 @@ if(nbuf==NULL ) { env->l->jkLog(env, env->l, JK_LOG_INFO, "channelJni.send() Can't get java bytes"); - return -1; + return JK_FALSE; } if( len > jlen ) { @@ -231,26 +352,32 @@ (*jniEnv)->ReleaseByteArrayElements(jniEnv, jbuf, nbuf, 0); + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channel_jni.send() before send %p %p\n", + (void *)(long)epData->epJ, + (void *)(long)epData->msgJ ); + sent=(*jniEnv)->CallStaticIntMethod( jniEnv, jniCh->jniBridge, jniCh->writeMethod, - jbuf, - len); - return sent; + (jlong)(long)(void *)env, + (jlong)(long)(void *)endpoint->currentRequest, + epData->epJ, + epData->msgJ); + env->l->jkLog(env, env->l, JK_LOG_INFO,"channel_jni.send() result %d\n", + sent); + return JK_TRUE; } -/** receive len bytes. - * @param sd opened socket. - * @param b buffer to store the data. - * @param len length to receive. - * @return -1: receive failed or connection closed. - * >0: length of the received data. - * Was: tcp_socket_recvfull +/** + * Not used - we have a single thread, there is no 'blocking' - the + * java side will send messages by calling a native method, which will + * receive and dispatch. */ static int JK_METHOD jk_channel_jni_recv( jk_env_t *env, jk_channel_t *_this, jk_endpoint_t *endpoint, - char *b, int len ) + jk_msg_t *msg ) { jbyte *nbuf; jbyteArray jbuf; @@ -258,46 +385,222 @@ jboolean iscommit; jk_channel_jni_private_t *jniCh=_this->_privatePtr; - JNIEnv *jniEnv=(JNIEnv *)endpoint->channelData;; - if( jniEnv==NULL ) { - /* Try first getEnv, then attach */ - jniEnv = (JNIEnv *)jniCh->vm->attach( env, jniCh->vm ); - if( jniEnv == NULL ) { - env->l->jkLog(env, env->l, JK_LOG_INFO, - "channel_jni.send() can't attach\n" ); + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "channelJni.recv() method not supported for JNI channel\n"); + return -1; + + /* Old workaround: + + nbuf=(jbyte *)endpoint->currentData; + + if(nbuf==NULL ) { + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channelJni.recv() no jbyte[] was received\n"); + return -1; + } + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channelJni.recv() receiving %d\n", len); + + memcpy( b, nbuf + endpoint->currentOffset, len ); + endpoint->currentOffset += len; + + return len; + */ +} + +/* Process a message from java. We return in all cases, + with the response message if any. +*/ +static int jk_channel_jni_processMsg( jk_env_t *env, jk_endpoint_t *e, + jk_ws_service_t *r) +{ + int code; + jk_handler_t *handler; + int rc; + jk_handler_t **handlerTable=e->worker->workerEnv->handlerTable; + int maxHandler=e->worker->workerEnv->lastMessageId; + + rc=-1; + handler=NULL; + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channelJniNative.processMsg()\n"); + + /* e->reply->dump(env, e->reply, "Received "); */ + + rc = e->worker->workerEnv->dispatch( env, e->worker->workerEnv, e, r ); + + /* Process the status code returned by handler */ + switch( rc ) { + case JK_HANDLER_RESPONSE: + e->recoverable = JK_FALSE; + /* XXX response must be put back into the buffer + rc = e->post->send(env, e->post, e ); */ + if (rc < 0) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "jni.processCallbacks() error sending response data\n"); return JK_FALSE; } + return JK_TRUE; + case JK_HANDLER_ERROR: + /* + * we won't be able to gracefully recover from this so + * set recoverable to false and get out. + */ + e->recoverable = JK_FALSE; + return JK_FALSE; + case JK_HANDLER_FATAL: + /* + * Client has stop talking to us, so get out. + * We assume this isn't our fault, so just a normal exit. + * In most (all?) cases, the ajp13_endpoint::reuse will still be + * false here, so this will be functionally the same as an + * un-recoverable error. We just won't log it as such. + */ + return JK_FALSE; + default: + /* All other cases */ + return JK_TRUE; + } + + /* not reached */ + return JK_FALSE; +} + + +/* + + */ +int jk_channel_jni_javaSendPacket(JNIEnv *jniEnv, jobject o, + jlong envJ, jlong eP, jlong s, + jbyteArray data, jint dataLen) +{ + /* [V] Convert indirectly from jlong -> int -> pointer to shut up gcc */ + /* I hope it's okay on other compilers and/or machines... */ + jk_ws_service_t *ps = (jk_ws_service_t *)(int)s; + jk_env_t *env = (jk_env_t *)(long)envJ; + jk_endpoint_t *e = (jk_endpoint_t *)(long)eP; + int cnt=0; + jint rc = -1; + jboolean iscommit; + jbyte *nbuf; + unsigned acc = 0; + int msgLen=(int)dataLen; + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channelJniNative.sendPacket()\n"); + + if(!ps) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "channelJniNative.sendPacket() NullPointerException\n"); + return -1; } - /* Copy the data in the ( recycled ) jbuf, then call the - * write method. XXX We could try 'pining' if the vm supports - * it, this is a looong lived object. - */ - nbuf = (*jniEnv)->GetByteArrayElements(jniEnv, jbuf, &iscommit); + nbuf = (*jniEnv)->GetByteArrayElements(jniEnv, data, &iscommit); - if(nbuf==NULL ) { - env->l->jkLog(env, env->l, JK_LOG_INFO, - "channelJni.send() Can't get java bytes"); - return -1; + if(nbuf==NULL) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "channelJniNative.sendPacket() NullPointerException 2\n"); + return -1; } - if( len > jlen ) { - /* XXX Reallocate the buffer */ - len=jlen; + /* Simulate a receive on the incoming packet. e->reply is what's + used when receiving data from java. This method is JAVA.sendPacket() + and corresponds to CHANNELJNI.receive */ + e->currentData = nbuf; + e->currentOffset=0; + /* This was an workaround, no longer used ! */ + + e->reply->reset(env, e->reply); + + memcpy( e->reply->buf, nbuf , msgLen ); + + rc=e->reply->checkHeader( env, e->reply, e ); + if( rc < 0 ) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, + "ajp14.service() Error reading reply\n"); + /* we just can't recover, unset recover flag */ + return JK_FALSE; } - memcpy( b, nbuf, len ); + /* XXX check if the len in header matches our len */ - (*jniEnv)->ReleaseByteArrayElements(jniEnv, jbuf, nbuf, 0); + /* Now execute it */ + jk_channel_jni_processMsg( env, e, ps ); - return len; + (*jniEnv)->ReleaseByteArrayElements(jniEnv, data, nbuf, 0); + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channelJniNative.sendPacket() done\n"); + + return (jint)cnt; } + + +/** Called before request processing, to initialize resources. + All following calls will be in the same thread. +*/ +int JK_METHOD jk_channel_jni_beforeRequest(struct jk_env *env, + jk_channel_t *_this, + struct jk_worker *worker, + struct jk_endpoint *endpoint, + struct jk_ws_service *r ) +{ + JNIEnv *jniEnv; + jint rc; + jk_workerEnv_t *we=worker->workerEnv; + + env->l->jkLog(env, env->l, JK_LOG_INFO, "service() attaching to vm\n"); + + + jniEnv=(JNIEnv *)endpoint->endpoint_private; + if(jniEnv==NULL) { /*! attached */ + /* Try to attach */ + if( we->vm == NULL ) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, "No VM to use\n"); + return JK_FALSE; + } + jniEnv = we->vm->attach(env, we->vm); + + if(jniEnv == NULL ) { + env->l->jkLog(env, env->l, JK_LOG_ERROR, "Attach failed\n"); + /* Is it recoverable ?? - yes, don't change the previous value*/ + /* r->is_recoverable_error = JK_TRUE; */ + return JK_FALSE; + } + endpoint->endpoint_private = jniEnv; + } + return JK_TRUE; +} + +/** Called after request processing. Used to be worker.done() + */ +int JK_METHOD jk_channel_jni_afterRequest(struct jk_env *env, + jk_channel_t *_this, + struct jk_worker *worker, + struct jk_endpoint *endpoint, + struct jk_ws_service *r ) +{ + jk_workerEnv_t *we=worker->workerEnv; + + /* XXX Don't detach if worker is reused per thread */ + endpoint->endpoint_private=NULL; + we->vm->detach( env, we->vm ); + + env->l->jkLog(env, env->l, JK_LOG_INFO, + "channelJni.afterRequest() ok\n"); + return JK_TRUE; +} + + + int JK_METHOD jk_channel_jni_factory(jk_env_t *env, - jk_pool_t *pool, - void **result, - const char *type, const char *name) + jk_pool_t *pool, + void **result, + const char *type, const char *name) { jk_channel_t *_this; @@ -314,10 +617,14 @@ _this->open= jk_channel_jni_open; _this->close= jk_channel_jni_close; + _this->beforeRequest= jk_channel_jni_beforeRequest; + _this->afterRequest= jk_channel_jni_afterRequest; + _this->name="jni"; _this->_privatePtr=(jk_channel_jni_private_t *)pool->calloc(env, pool, sizeof(jk_channel_jni_private_t)); + _this->is_stream=JK_FALSE; *result= _this;
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>