costin 02/01/25 22:55:17
Modified: jk/native2/common jk_channel_jni.c
Log:
A number of bug fixes - make sure we save the global ref, not the ref.
We now get an Endpoint from the java side - and cache/reuse it with the
jni endpoint.
The jni channel is 'interesting' - it's the first non-stream channel. It could
be treated as a stream, by using 2 threads ( so send/receive model will work ),
but I wanted to preserve 'single thread, no sync' model from the previous
jni worker.
It may seem a bit complicated - and it adds some limitations on the model, but
I think it's worth it.
The idea is that, as before, the first message ( containing the request ) gives
control to tomcat who may send back messages ( and get back responses to it's
messages ). The send() method in channel is doing exactly this first step.
Tomcat will use a native method to send messages ( that replaces receive(),
which is not used ), which are dispatched. The response is actually put in
the same buffer - a single jarray pin is needed.
We must make sure we don't run into buffer problems - but that can be resolved.
Revision Changes Path
1.2 +359 -52 jakarta-tomcat-connectors/jk/native2/common/jk_channel_jni.c
Index: jk_channel_jni.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jk/native2/common/jk_channel_jni.c,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- jk_channel_jni.c 12 Jan 2002 04:59:19 -0000 1.1
+++ jk_channel_jni.c 26 Jan 2002 06:55:17 -0000 1.2
@@ -79,7 +79,7 @@
*/
typedef struct {
jk_vm_t *vm;
-
+
char *className;
jclass jniBridge;
@@ -87,8 +87,15 @@
} jk_channel_jni_private_t;
typedef struct {
- JNIEnv *env;
+ JNIEnv *jniEnv;
+
+ int len;
+ jbyteArray jarray;
+ char *carray;
+ int arrayLen;
+ jobject epJ;
+ jobject msgJ;
} jk_ch_jni_ep_private_t;
@@ -105,6 +112,12 @@
{
int err;
char *tmp;
+
+ /* the channel is init-ed during a worker validation. If a jni worker
+ is not already defined... well, not good. But on open we should
+ have it.
+ */
+
_this->worker=worker;
_this->properties=props;
@@ -113,7 +126,7 @@
"channel_jni.init(): %s\n",
worker->name );
- return err;
+ return JK_TRUE;
}
/** Assume the jni-worker or someone else started
@@ -125,12 +138,18 @@
{
jk_workerEnv_t *we=endpoint->worker->workerEnv;
JNIEnv *jniEnv;
+ jk_ch_jni_ep_private_t *epData;
+ jmethodID jmethod;
+ jobject jobj;
jk_channel_jni_private_t *jniCh=_this->_privatePtr;
-
- /** XXX make it customizable */
- jniCh->className=JAVA_BRIDGE_CLASS_NAME;
+ if( endpoint->channelData != NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channel_jni.open() already open, nothing else to do\n");
+ return JK_TRUE;
+ }
+
jniCh->vm=(jk_vm_t *)we->vm;
jniEnv = (JNIEnv *)jniCh->vm->attach( env, jniCh->vm );
@@ -139,19 +158,84 @@
"channel_jni.open() can't attach\n" );
return JK_FALSE;
}
+ /* Create the buffers used by the write method. We allocate a
+ byte[] and jbyte[] - I have no idea what's more expensive,
+ to copy a buffer or to 'pin' the jbyte[] for copying.
+
+ This will be tuned if needed, for now it seems the easiest
+ solution
+ */
+ epData=(jk_ch_jni_ep_private_t *)
+ endpoint->pool->calloc( env,endpoint->pool,
+ sizeof( jk_ch_jni_ep_private_t ));
+ endpoint->channelData=epData;
+ /** XXX make it customizable */
+ jniCh->className=JAVA_BRIDGE_CLASS_NAME;
+
jniCh->jniBridge =
(*jniEnv)->FindClass(jniEnv, jniCh->className );
+ jniCh->jniBridge=(*jniEnv)->NewGlobalRef( jniEnv, jniCh->jniBridge);
+
if( jniCh->jniBridge == NULL ) {
env->l->jkLog(env, env->l, JK_LOG_INFO,
"channel_jni.open() can't find %s\n",jniCh->className );
return JK_FALSE;
}
-
- jniCh->writeMethod =
+
+ jmethod=(*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge,
+ "createEndpointStatic", "(JJ)Lorg/apache/jk/core/Endpoint;");
+ if( jmethod == NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channel_jni.open() can't find createEndpointStatic\n");
+ return JK_FALSE;
+ }
+ jobj=(*jniEnv)->CallStaticObjectMethod( jniEnv, jniCh->jniBridge,
+ jmethod,
+ (jlong)(long)(void *)env,
+ (jlong)(long)(void *)endpoint );
+ epData->epJ=(*jniEnv)->NewGlobalRef( jniEnv, jobj );
+
+ jmethod=(*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge,
+ "createMessage",
+ "()Lorg/apache/jk/common/MsgAjp;");
+ if( jmethod == NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channel_jni.open() can't find createMessage\n");
+ return JK_FALSE;
+ }
+ jobj=(*jniEnv)->CallStaticObjectMethod( jniEnv, jniCh->jniBridge,
+ jmethod );
+ epData->msgJ=(*jniEnv)->NewGlobalRef( jniEnv, jobj );
+
+ /* XXX Destroy them in close */
+
+ jmethod=(*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge,
+ "getBuffer",
+ "(Lorg/apache/jk/common/MsgAjp;)[B");
+ if( jmethod == NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channel_jni.open() can't find getBuffer\n");
+ return JK_FALSE;
+ }
+ epData->jarray=(*jniEnv)->CallStaticObjectMethod( jniEnv, jniCh->jniBridge,
+ jmethod, epData->msgJ );
+ /*epData->jarray=(*jniEnv)->NewByteArray(jniEnv, 10000 ); */
+ epData->jarray=(*jniEnv)->NewGlobalRef( jniEnv, epData->jarray );
+
+ epData->arrayLen = (*jniEnv)->GetArrayLength( jniEnv, epData->jarray );
+
+ /* XXX > ajp buffer size. Don't know how to fragment or reallocate
+ yet */
+ epData->carray=(char *)endpoint->pool->calloc( env, endpoint->pool,
+ epData->arrayLen);
+
+ jniCh->writeMethod =
(*jniEnv)->GetStaticMethodID(jniEnv, jniCh->jniBridge,
- "write", "(JJ)I");
+ "receiveRequest",
+ "(JJLorg/apache/jk/core/Endpoint;"
+ "Lorg/apache/jk/common/MsgAjp;)I");
if( jniCh->writeMethod == NULL ) {
env->l->jkLog(env, env->l, JK_LOG_EMERG,
@@ -159,10 +243,14 @@
return JK_FALSE;
}
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channel_jni.open() found write method, open ok\n" );
+
+
/* Don't detach ( XXX Need to find out when the thread is
* closing in order for this to work )
*/
- jniCh->vm->detach( env, jniCh->vm );
+ /* jniCh->vm->detach( env, jniCh->vm ); */
return JK_TRUE;
}
@@ -172,7 +260,15 @@
static int JK_METHOD jk_channel_jni_close(jk_env_t *env,jk_channel_t *_this,
jk_endpoint_t *endpoint)
{
+ jk_ch_jni_ep_private_t *epData;
+
+ epData=(jk_ch_jni_ep_private_t *)endpoint->channelData;
+
+ /* (*jniEnv)->DeleteGlobalRef( jniEnv, epData->msgJ ); */
+ /* (*jniEnv)->DeleteGlobalRef( jniEnv, epData->epJ ); */
+
return JK_TRUE;
+
}
/** send a long message
@@ -187,19 +283,41 @@
* @was: jk_tcp_socket_sendfull
*/
static int JK_METHOD jk_channel_jni_send(jk_env_t *env, jk_channel_t *_this,
- jk_endpoint_t *endpoint,
- char *b, int len)
+ jk_endpoint_t *endpoint,
+ jk_msg_t *msg)
{
int sd;
int sent=0;
+ char *b;
+ int len;
jbyte *nbuf;
jbyteArray jbuf;
int jlen;
- jboolean iscommit;
-
+ jboolean iscommit=0;
+ JNIEnv *jniEnv;
jk_channel_jni_private_t *jniCh=_this->_privatePtr;
+ jk_ch_jni_ep_private_t *epData=
+ (jk_ch_jni_ep_private_t *)endpoint->channelData;;
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,"channel_jni.send()\n" );
+
+ if( epData == NULL ) {
+ jk_channel_jni_open( env, _this, endpoint );
+ epData=(jk_ch_jni_ep_private_t *)endpoint->channelData;
+ }
+
+ msg->end( env, msg );
+ len=msg->len;
+ b=msg->buf;
- JNIEnv *jniEnv=(JNIEnv *)endpoint->channelData;;
+ jniEnv=NULL; /* epData->jniEnv; */
+ jbuf=epData->jarray;
+
+ if( jniCh->writeMethod == NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_EMERG,
+ "channel_jni.send() no write method\n" );
+ return JK_FALSE;
+ }
if( jniEnv==NULL ) {
/* Try first getEnv, then attach */
jniEnv = (JNIEnv *)jniCh->vm->attach( env, jniCh->vm );
@@ -210,6 +328,9 @@
}
}
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channel_jni.send() getting byte array \n" );
+
/* Copy the data in the ( recycled ) jbuf, then call the
* write method. XXX We could try 'pining' if the vm supports
* it, this is a looong lived object.
@@ -219,7 +340,7 @@
if(nbuf==NULL ) {
env->l->jkLog(env, env->l, JK_LOG_INFO,
"channelJni.send() Can't get java bytes");
- return -1;
+ return JK_FALSE;
}
if( len > jlen ) {
@@ -231,26 +352,32 @@
(*jniEnv)->ReleaseByteArrayElements(jniEnv, jbuf, nbuf, 0);
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channel_jni.send() before send %p %p\n",
+ (void *)(long)epData->epJ,
+ (void *)(long)epData->msgJ );
+
sent=(*jniEnv)->CallStaticIntMethod( jniEnv,
jniCh->jniBridge,
jniCh->writeMethod,
- jbuf,
- len);
- return sent;
+ (jlong)(long)(void *)env,
+ (jlong)(long)(void *)endpoint->currentRequest,
+ epData->epJ,
+ epData->msgJ);
+ env->l->jkLog(env, env->l, JK_LOG_INFO,"channel_jni.send() result %d\n",
+ sent);
+ return JK_TRUE;
}
-/** receive len bytes.
- * @param sd opened socket.
- * @param b buffer to store the data.
- * @param len length to receive.
- * @return -1: receive failed or connection closed.
- * >0: length of the received data.
- * Was: tcp_socket_recvfull
+/**
+ * Not used - we have a single thread, there is no 'blocking' - the
+ * java side will send messages by calling a native method, which will
+ * receive and dispatch.
*/
static int JK_METHOD jk_channel_jni_recv( jk_env_t *env, jk_channel_t *_this,
jk_endpoint_t *endpoint,
- char *b, int len )
+ jk_msg_t *msg )
{
jbyte *nbuf;
jbyteArray jbuf;
@@ -258,46 +385,222 @@
jboolean iscommit;
jk_channel_jni_private_t *jniCh=_this->_privatePtr;
- JNIEnv *jniEnv=(JNIEnv *)endpoint->channelData;;
- if( jniEnv==NULL ) {
- /* Try first getEnv, then attach */
- jniEnv = (JNIEnv *)jniCh->vm->attach( env, jniCh->vm );
- if( jniEnv == NULL ) {
- env->l->jkLog(env, env->l, JK_LOG_INFO,
- "channel_jni.send() can't attach\n" );
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "channelJni.recv() method not supported for JNI channel\n");
+ return -1;
+
+ /* Old workaround:
+
+ nbuf=(jbyte *)endpoint->currentData;
+
+ if(nbuf==NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channelJni.recv() no jbyte[] was received\n");
+ return -1;
+ }
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channelJni.recv() receiving %d\n", len);
+
+ memcpy( b, nbuf + endpoint->currentOffset, len );
+ endpoint->currentOffset += len;
+
+ return len;
+ */
+}
+
+/* Process a message from java. We return in all cases,
+ with the response message if any.
+*/
+static int jk_channel_jni_processMsg( jk_env_t *env, jk_endpoint_t *e,
+ jk_ws_service_t *r)
+{
+ int code;
+ jk_handler_t *handler;
+ int rc;
+ jk_handler_t **handlerTable=e->worker->workerEnv->handlerTable;
+ int maxHandler=e->worker->workerEnv->lastMessageId;
+
+ rc=-1;
+ handler=NULL;
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channelJniNative.processMsg()\n");
+
+ /* e->reply->dump(env, e->reply, "Received "); */
+
+ rc = e->worker->workerEnv->dispatch( env, e->worker->workerEnv, e, r );
+
+ /* Process the status code returned by handler */
+ switch( rc ) {
+ case JK_HANDLER_RESPONSE:
+ e->recoverable = JK_FALSE;
+ /* XXX response must be put back into the buffer
+ rc = e->post->send(env, e->post, e ); */
+ if (rc < 0) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "jni.processCallbacks() error sending response data\n");
return JK_FALSE;
}
+ return JK_TRUE;
+ case JK_HANDLER_ERROR:
+ /*
+ * we won't be able to gracefully recover from this so
+ * set recoverable to false and get out.
+ */
+ e->recoverable = JK_FALSE;
+ return JK_FALSE;
+ case JK_HANDLER_FATAL:
+ /*
+ * Client has stop talking to us, so get out.
+ * We assume this isn't our fault, so just a normal exit.
+ * In most (all?) cases, the ajp13_endpoint::reuse will still be
+ * false here, so this will be functionally the same as an
+ * un-recoverable error. We just won't log it as such.
+ */
+ return JK_FALSE;
+ default:
+ /* All other cases */
+ return JK_TRUE;
+ }
+
+ /* not reached */
+ return JK_FALSE;
+}
+
+
+/*
+
+ */
+int jk_channel_jni_javaSendPacket(JNIEnv *jniEnv, jobject o,
+ jlong envJ, jlong eP, jlong s,
+ jbyteArray data, jint dataLen)
+{
+ /* [V] Convert indirectly from jlong -> int -> pointer to shut up gcc */
+ /* I hope it's okay on other compilers and/or machines... */
+ jk_ws_service_t *ps = (jk_ws_service_t *)(int)s;
+ jk_env_t *env = (jk_env_t *)(long)envJ;
+ jk_endpoint_t *e = (jk_endpoint_t *)(long)eP;
+ int cnt=0;
+ jint rc = -1;
+ jboolean iscommit;
+ jbyte *nbuf;
+ unsigned acc = 0;
+ int msgLen=(int)dataLen;
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channelJniNative.sendPacket()\n");
+
+ if(!ps) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "channelJniNative.sendPacket() NullPointerException\n");
+ return -1;
}
- /* Copy the data in the ( recycled ) jbuf, then call the
- * write method. XXX We could try 'pining' if the vm supports
- * it, this is a looong lived object.
- */
- nbuf = (*jniEnv)->GetByteArrayElements(jniEnv, jbuf, &iscommit);
+ nbuf = (*jniEnv)->GetByteArrayElements(jniEnv, data, &iscommit);
- if(nbuf==NULL ) {
- env->l->jkLog(env, env->l, JK_LOG_INFO,
- "channelJni.send() Can't get java bytes");
- return -1;
+ if(nbuf==NULL) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "channelJniNative.sendPacket() NullPointerException 2\n");
+ return -1;
}
- if( len > jlen ) {
- /* XXX Reallocate the buffer */
- len=jlen;
+ /* Simulate a receive on the incoming packet. e->reply is what's
+ used when receiving data from java. This method is JAVA.sendPacket()
+ and corresponds to CHANNELJNI.receive */
+ e->currentData = nbuf;
+ e->currentOffset=0;
+ /* This was an workaround, no longer used ! */
+
+ e->reply->reset(env, e->reply);
+
+ memcpy( e->reply->buf, nbuf , msgLen );
+
+ rc=e->reply->checkHeader( env, e->reply, e );
+ if( rc < 0 ) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR,
+ "ajp14.service() Error reading reply\n");
+ /* we just can't recover, unset recover flag */
+ return JK_FALSE;
}
- memcpy( b, nbuf, len );
+ /* XXX check if the len in header matches our len */
- (*jniEnv)->ReleaseByteArrayElements(jniEnv, jbuf, nbuf, 0);
+ /* Now execute it */
+ jk_channel_jni_processMsg( env, e, ps );
- return len;
+ (*jniEnv)->ReleaseByteArrayElements(jniEnv, data, nbuf, 0);
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channelJniNative.sendPacket() done\n");
+
+ return (jint)cnt;
}
+
+
+/** Called before request processing, to initialize resources.
+ All following calls will be in the same thread.
+*/
+int JK_METHOD jk_channel_jni_beforeRequest(struct jk_env *env,
+ jk_channel_t *_this,
+ struct jk_worker *worker,
+ struct jk_endpoint *endpoint,
+ struct jk_ws_service *r )
+{
+ JNIEnv *jniEnv;
+ jint rc;
+ jk_workerEnv_t *we=worker->workerEnv;
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO, "service() attaching to vm\n");
+
+
+ jniEnv=(JNIEnv *)endpoint->endpoint_private;
+ if(jniEnv==NULL) { /*! attached */
+ /* Try to attach */
+ if( we->vm == NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR, "No VM to use\n");
+ return JK_FALSE;
+ }
+ jniEnv = we->vm->attach(env, we->vm);
+
+ if(jniEnv == NULL ) {
+ env->l->jkLog(env, env->l, JK_LOG_ERROR, "Attach failed\n");
+ /* Is it recoverable ?? - yes, don't change the previous value*/
+ /* r->is_recoverable_error = JK_TRUE; */
+ return JK_FALSE;
+ }
+ endpoint->endpoint_private = jniEnv;
+ }
+ return JK_TRUE;
+}
+
+/** Called after request processing. Used to be worker.done()
+ */
+int JK_METHOD jk_channel_jni_afterRequest(struct jk_env *env,
+ jk_channel_t *_this,
+ struct jk_worker *worker,
+ struct jk_endpoint *endpoint,
+ struct jk_ws_service *r )
+{
+ jk_workerEnv_t *we=worker->workerEnv;
+
+ /* XXX Don't detach if worker is reused per thread */
+ endpoint->endpoint_private=NULL;
+ we->vm->detach( env, we->vm );
+
+ env->l->jkLog(env, env->l, JK_LOG_INFO,
+ "channelJni.afterRequest() ok\n");
+ return JK_TRUE;
+}
+
+
+
int JK_METHOD jk_channel_jni_factory(jk_env_t *env,
- jk_pool_t *pool,
- void **result,
- const char *type, const char *name)
+ jk_pool_t *pool,
+ void **result,
+ const char *type, const char *name)
{
jk_channel_t *_this;
@@ -314,10 +617,14 @@
_this->open= jk_channel_jni_open;
_this->close= jk_channel_jni_close;
+ _this->beforeRequest= jk_channel_jni_beforeRequest;
+ _this->afterRequest= jk_channel_jni_afterRequest;
+
_this->name="jni";
_this->_privatePtr=(jk_channel_jni_private_t *)pool->calloc(env, pool,
sizeof(jk_channel_jni_private_t));
+ _this->is_stream=JK_FALSE;
*result= _this;
--
To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]>
For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>