This is an automated email from the ASF dual-hosted git repository.
johnnyv pushed a commit to branch bugfix/DIRMINA-1173
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/bugfix/DIRMINA-1173 by this
push:
new ec93bb746 nonblock SSL now passes all tests
ec93bb746 is described below
commit ec93bb746b1a3216167341bf2470ae939fba55c8
Author: Jonathan Valliere <[email protected]>
AuthorDate: Mon Feb 19 22:25:50 2024 -0500
nonblock SSL now passes all tests
The following public endpoints for SSLHandlerG1 now correctly handle the
non-block operations
- open
- write
- receive
- ack
- flush
- close
I added try..finally blocks to ensure processed messages are fired even if
a subsequent message caused the SSL to fail.
---
.../org/apache/mina/filter/ssl/SSLHandlerG1.java | 220 ++++++++++-----------
.../java/org/apache/mina/filter/ssl/SslFilter.java | 15 +-
2 files changed, 115 insertions(+), 120 deletions(-)
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
index 5cb6858ea..80f52752b 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SSLHandlerG1.java
@@ -68,7 +68,7 @@ import java.util.concurrent.Executor;
/**
* Enable asynchronous tasks
*/
- static protected final boolean ENABLE_ASYNC_TASKS = false;
+ static protected final boolean ENABLE_ASYNC_TASKS = true;
/**
* Indicates whether the first handshake was completed
@@ -142,25 +142,21 @@ import java.util.concurrent.Executor;
*/
@Override
public void open(NextFilter next) throws SSLException {
- synchronized (this) {
- if (mHandshakeStarted == false) {
- mHandshakeStarted = true;
- if (mEngine.getUseClientMode()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} open() - begin handshaking", this);
+ try {
+ synchronized (this) {
+ if (mHandshakeStarted == false) {
+ mHandshakeStarted = true;
+ if (mEngine.getUseClientMode()) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} open() - begin handshaking",
this);
+ }
+ mEngine.beginHandshake();
+ write_handshake(next);
}
- mEngine.beginHandshake();
- write_handshake(next);
}
}
- }
- synchronized (mWriteQueue) {
- EncryptedWriteRequest x;
- while((x = mWriteQueue.poll()) != null) {
- next.filterWrite(mSession, x);
- }
- }
- synchronized (this) {
+ } finally {
+ forward_writes(next);
throw_pending_error(next);
}
}
@@ -170,20 +166,11 @@ import java.util.concurrent.Executor;
*/
@Override
public void receive(NextFilter next, IoBuffer message) throws SSLException
{
- receive_start(next, message);
- synchronized (mReceiveQueue) {
- IoBuffer x;
- while((x = mReceiveQueue.poll()) != null) {
- next.messageReceived(mSession, x);
- }
- }
- synchronized (mWriteQueue) {
- EncryptedWriteRequest x;
- while((x = mWriteQueue.poll()) != null) {
- next.filterWrite(mSession, x);
- }
- }
- synchronized (this) {
+ try {
+ receive_start(next, message);
+ } finally {
+ forward_received(next);
+ forward_writes(next);
throw_pending_error(next);
}
}
@@ -267,7 +254,7 @@ import java.util.concurrent.Executor;
LOGGER.debug("{} receive_loop() - result {}", toString(),
dest);
}
- mReceiveQueue.push(dest);
+ mReceiveQueue.add(dest);
}
switch (result.getHandshakeStatus()) {
@@ -323,25 +310,25 @@ import java.util.concurrent.Executor;
*/
@Override
public void ack(NextFilter next, WriteRequest request) throws SSLException
{
- synchronized (this) {
- if (mAckQueue.remove(request)) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} ack() - {}", toString(), request);
- }
+ try {
+ synchronized (this) {
+ if (mAckQueue.remove(request)) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} ack() - accepted {}", toString(),
request);
+ }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} ack() - checking to see if any messages
can be flushed", toString(), request);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} ack() - checking to see if any
messages can be flushed", toString(), request);
+ }
+ flush_start(next);
+ } else {
+ if(LOGGER.isWarnEnabled()) {
+ LOGGER.warn("{} ack() - unknown message {}",
toString(), request);
+ }
}
- flush_start(next);
}
- }
- synchronized (mWriteQueue) {
- EncryptedWriteRequest x;
- while((x = mWriteQueue.poll()) != null) {
- next.filterWrite(mSession, x);
- }
- }
- synchronized (this) {
+ } finally {
+ forward_writes(next);
throw_pending_error(next);
}
}
@@ -351,41 +338,37 @@ import java.util.concurrent.Executor;
*/
@Override
public void write(NextFilter next, WriteRequest request) throws
SSLException, WriteRejectedException {
- synchronized (this) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - source {}", toString(), request);
- }
- if (mOutboundClosing) {
- throw new WriteRejectedException(request, "closing");
- }
- if (mEncodeQueue.isEmpty()) {
- if (write_loop(next, request) == false) {
+ try {
+ synchronized (this) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} write() - source {}", toString(),
request);
+ }
+ if (mOutboundClosing) {
+ throw new WriteRejectedException(request, "closing");
+ }
+ if (mEncodeQueue.isEmpty()) {
+ if (write_loop(next, request) == false) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} write() - unable to write right
now, saving request for later", toString(),
+ request);
+ }
+ if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
+ throw new BufferOverflowException();
+ }
+ mEncodeQueue.add(request);
+ }
+ } else {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - unable to write right now,
saving request for later", toString(),
- request);
+ LOGGER.debug("{} write() - unable to write right now,
saving request for later", toString(), request);
}
if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
throw new BufferOverflowException();
}
mEncodeQueue.add(request);
}
- } else {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write() - unable to write right now,
saving request for later", toString(), request);
- }
- if (mEncodeQueue.size() == MAX_QUEUED_MESSAGES) {
- throw new BufferOverflowException();
- }
- mEncodeQueue.add(request);
}
- }
- synchronized (mWriteQueue) {
- EncryptedWriteRequest x;
- while((x = mWriteQueue.poll()) != null) {
- next.filterWrite(mSession, x);
- }
- }
- synchronized (this) {
+ } finally {
+ forward_writes(next);
throw_pending_error(next);
}
}
@@ -404,7 +387,7 @@ import java.util.concurrent.Executor;
@SuppressWarnings("incomplete-switch")
synchronized protected boolean write_loop(NextFilter next, WriteRequest
request) throws SSLException {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - source {}", toString(),
request);
+ LOGGER.debug("{} write_loop() - source {}", toString(), request);
}
IoBuffer source = IoBuffer.class.cast(request.getMessage());
@@ -413,7 +396,7 @@ import java.util.concurrent.Executor;
SSLEngineResult result = mEngine.wrap(source.buf(), dest.buf());
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - bytes-consumed {},
bytes-produced {}, status {}, handshake {}",
+ LOGGER.debug("{} write_loop() - bytes-consumed {}, bytes-produced
{}, status {}, handshake {}",
toString(), result.bytesConsumed(),
result.bytesProduced(), result.getStatus(),
result.getHandshakeStatus());
}
@@ -426,10 +409,10 @@ import java.util.concurrent.Executor;
EncryptedWriteRequest encrypted = new
EncryptedWriteRequest(dest, null);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - result {}",
toString(), encrypted);
+ LOGGER.debug("{} write_loop() - result {}", toString(),
encrypted);
}
- mWriteQueue.push(encrypted);
+ mWriteQueue.add(encrypted);
// do not return because we want to enter the handshake switch
} else {
// then we probably consumed some data
@@ -437,28 +420,25 @@ import java.util.concurrent.Executor;
if (source.hasRemaining()) {
EncryptedWriteRequest encrypted = new
EncryptedWriteRequest(dest, null);
- mAckQueue.add(encrypted);
-
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - result {}",
toString(), encrypted);
+ LOGGER.debug("{} write_loop() - result {}",
toString(), encrypted);
}
- mWriteQueue.push(encrypted);
+ mWriteQueue.add(encrypted);
- if (mAckQueue.size() < MAX_UNACK_MESSAGES) {
+ if (mWriteQueue.size() + mAckQueue.size() <
MAX_UNACK_MESSAGES) {
return write_loop(next, request); // write additional
chunks
}
return false;
} else {
EncryptedWriteRequest encrypted = new
EncryptedWriteRequest(dest, request);
- mAckQueue.add(encrypted);
-
+
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - result {}",
toString(), encrypted);
+ LOGGER.debug("{} write_loop() - result {}",
toString(), encrypted);
}
- mWriteQueue.push(encrypted);
+ mWriteQueue.add(encrypted);
return true;
}
@@ -469,7 +449,7 @@ import java.util.concurrent.Executor;
switch (result.getHandshakeStatus()) {
case NEED_TASK:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - handshake needs task,
scheduling", toString());
+ LOGGER.debug("{} write_loop() - handshake needs task,
scheduling", toString());
}
schedule_task(next);
@@ -477,14 +457,14 @@ import java.util.concurrent.Executor;
case NEED_WRAP:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - handshake needs wrap,
looping", toString());
+ LOGGER.debug("{} write_loop() - handshake needs wrap,
looping", toString());
}
return write_loop(next, request);
case FINISHED:
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("{} write_user_loop() - handshake finished,
flushing queue", toString());
+ LOGGER.debug("{} write_loop() - handshake finished,
flushing queue", toString());
}
finish_handshake(next);
@@ -581,7 +561,7 @@ import java.util.concurrent.Executor;
}
EncryptedWriteRequest encrypted = new EncryptedWriteRequest(dest,
null);
- mWriteQueue.push(encrypted);
+ mWriteQueue.add(encrypted);
}
switch (result.getHandshakeStatus()) {
@@ -641,14 +621,10 @@ import java.util.concurrent.Executor;
* {@inheritDoc}
*/
public void flush(NextFilter next) throws SSLException {
- flush_start(next);
- synchronized (mWriteQueue) {
- EncryptedWriteRequest x;
- while((x = mWriteQueue.poll()) != null) {
- next.filterWrite(mSession, x);
- }
- }
- synchronized (this) {
+ try {
+ flush_start(next);
+ } finally {
+ forward_writes(next);
throw_pending_error(next);
}
}
@@ -701,14 +677,10 @@ import java.util.concurrent.Executor;
*/
@Override
public void close(NextFilter next, boolean linger) throws SSLException {
- close_start(next, linger);
- synchronized (mWriteQueue) {
- EncryptedWriteRequest x;
- while((x = mWriteQueue.poll()) != null) {
- next.filterWrite(mSession, x);
- }
- }
- synchronized (this) {
+ try {
+ close_start(next, linger);
+ } finally {
+ forward_writes(next);
throw_pending_error(next);
}
}
@@ -747,13 +719,13 @@ import java.util.concurrent.Executor;
*/
synchronized protected void throw_pending_error(NextFilter next) throws
SSLException {
SSLException sslException = mPendingError;
-
if (sslException != null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} throw_pending_error() - throwing pending
error");
+ }
// Loop to send back the alert messages
receive_loop(next, null);
-
mPendingError = null;
-
// And finally rethrow the exception
throw sslException;
}
@@ -770,6 +742,31 @@ import java.util.concurrent.Executor;
}
}
+ protected void forward_received(NextFilter next) {
+ synchronized (mReceiveQueue) {
+ IoBuffer x;
+ while ((x = mReceiveQueue.poll()) != null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} forward_received() - received {}",
toString(), x);
+ }
+ next.messageReceived(mSession, x);
+ }
+ }
+ }
+
+ protected void forward_writes(NextFilter next) {
+ synchronized (mWriteQueue) {
+ EncryptedWriteRequest x;
+ while ((x = mWriteQueue.poll()) != null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("{} forward_writes() - writing {}",
toString(), x);
+ }
+ mAckQueue.add(x);
+ next.filterWrite(mSession, x);
+ }
+ }
+ }
+
/**
* Schedule a SSLEngine task for execution, either using an Executor, or
immediately.
*
@@ -792,7 +789,6 @@ import java.util.concurrent.Executor;
*/
synchronized protected void execute_task(NextFilter next) {
Runnable task;
-
while ((task = mEngine.getDelegatedTask()) != null) {
try {
if (LOGGER.isDebugEnabled()) {
diff --git a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
index 90329f219..c6340f2a0 100644
--- a/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
+++ b/mina-core/src/main/java/org/apache/mina/filter/ssl/SslFilter.java
@@ -447,15 +447,14 @@ public class SslFilter extends IoFilterAdapter {
*/
@Override
public void messageSent(NextFilter next, IoSession session, WriteRequest
request) throws Exception {
- if (LOGGER.isDebugEnabled()) {
- if (session.isServer()) {
- LOGGER.debug("SERVER: Session {} ack {}", session, request);
- } else {
- LOGGER.debug("CLIENT: Session {} ack {}", session, request);
- }
- }
-
if (request instanceof EncryptedWriteRequest) {
+ if (LOGGER.isDebugEnabled()) {
+ if (session.isServer()) {
+ LOGGER.debug("SERVER: Session {} ack {}", session,
request);
+ } else {
+ LOGGER.debug("CLIENT: Session {} ack {}", session,
request);
+ }
+ }
EncryptedWriteRequest encryptedWriteRequest =
EncryptedWriteRequest.class.cast(request);
SslHandler sslHandler = getSslHandler(session);
sslHandler.ack(next, request);