This is an automated email from the ASF dual-hosted git repository.
elecharny pushed a commit to branch 2.1.X
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.1.X by this push:
new e495790 o Fixed the worker run() method to properly manage the idle
count o Added Guss' tests for DIRMINA-1156
e495790 is described below
commit e49579069e49d733d8f75bc67b2e16311b619ffe
Author: emmanuel lecharny <[email protected]>
AuthorDate: Wed Jan 12 22:03:26 2022 +0100
o Fixed the worker run() method to properly manage the idle count
o Added Guss' tests for DIRMINA-1156
---
.../filter/executor/OrderedThreadPoolExecutor.java | 2 -
.../executor/PriorityThreadPoolExecutor.java | 44 +-
.../executor/UnorderedThreadPoolExecutor.java | 21 +-
.../executor/OrderedThreadPoolExecutorTest.java | 166 ++++++++
.../executor/PriorityThreadPoolExecutorTest.java | 143 ++++++-
.../executor/UnorderedThreadPoolExecutorTest.java | 166 ++++++++
.../org/apache/mina/handler/DIRMINA1156Test.java | 465 +++++++++++++++++++++
7 files changed, 971 insertions(+), 36 deletions(-)
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
index c800864..4b42986 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
@@ -682,8 +682,6 @@ public class OrderedThreadPoolExecutor extends
ThreadPoolExecutor {
if (session == null) {
synchronized (workers) {
if (workers.size() > getCorePoolSize()) {
- // Remove now to prevent duplicate exit.
- // workers.remove(this);
break;
}
}
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
index 721005c..43ace10 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
@@ -19,15 +19,32 @@
*/
package org.apache.mina.filter.executor;
-import org.apache.mina.core.session.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.mina.core.session.AttributeKey;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoEvent;
+import org.apache.mina.core.session.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s
* within a session (similar to {@link OrderedThreadPoolExecutor}) and allows
@@ -304,12 +321,13 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
+ workers.add(worker);
+
// As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
// Now, we can start it.
thread.start();
- workers.add(worker);
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
@@ -730,8 +748,6 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
if (session == null) {
synchronized (workers) {
if (workers.size() > getCorePoolSize()) {
- // Remove now to prevent duplicate exit.
- workers.remove(this);
break;
}
}
@@ -741,13 +757,11 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
break;
}
- try {
- if (session != null) {
- runTasks(getSessionTasksQueue(session));
- }
- } finally {
- idleWorkers.incrementAndGet();
+ if (session != null) {
+ runTasks(getSessionTasksQueue(session));
}
+
+ idleWorkers.incrementAndGet();
}
} finally {
synchronized (workers) {
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
index 3136492..570963d 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
@@ -198,9 +198,14 @@ public class UnorderedThreadPoolExecutor extends
ThreadPoolExecutor {
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
+
+ workers.add(worker);
+
+ // As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
+
+ // Now, we can start it.
thread.start();
- workers.add(worker);
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
@@ -476,8 +481,6 @@ public class UnorderedThreadPoolExecutor extends
ThreadPoolExecutor {
if (task == null) {
synchronized (workers) {
if (workers.size() > corePoolSize) {
- // Remove now to prevent duplicate exit.
- workers.remove(this);
break;
}
}
@@ -487,14 +490,12 @@ public class UnorderedThreadPoolExecutor extends
ThreadPoolExecutor {
break;
}
- try {
- if (task != null) {
-
queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
- runTask(task);
- }
- } finally {
- idleWorkers.incrementAndGet();
+ if (task != null) {
+ queueHandler.polled(UnorderedThreadPoolExecutor.this,
(IoEvent) task);
+ runTask(task);
}
+
+ idleWorkers.incrementAndGet();
}
} finally {
synchronized (workers) {
diff --git
a/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
new file mode 100644
index 0000000..729e0ee
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.apache.mina.filter.FilterEvent;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link OrderedThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, [email protected]
+ */
+public class OrderedThreadPoolExecutorTest
+{
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after a RuntimeException is thrown when the {@link
OrderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the
+ * RuntimeException that is being used in the implementation of this test.
The purpose of this test is to verify
+ * Worker's behavior when a RuntimeException is thrown during execution
occurs (even if that RuntimeException cannot
+ * occur in the way that this test simulates it). A test that implements
the execution in a more realistic manner is
+ * provided in {@link
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testRuntimeExceptionInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Worker execution has happened.
+ executor.shutdown();
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after an Error is thrown when the {@link
OrderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the Error that
+ * is being used in the implementation of this test. The purpose of this
test is to verify Worker's behavior when an
+ * Error is thrown during execution occurs (even if that Error cannot
occur in the way that this test simulates it).
+ * A test that implements the execution in a more realistic manner is
provided in
+ * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testErrorInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new Error("An Error thrown during unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Ensure that the task has been executed in the executor.
+ executor.shutdown(); // Shutting down and awaiting termination
ensures that test execution blocks until Worker execution has happened.
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Empty implementation of IoFilter.NextFilterAdapter, intended to
facilitate easy subclassing.
+ */
+ private abstract static class NextFilterAdapter implements
IoFilter.NextFilter {
+ public void sessionOpened(IoSession session) {}
+ public void sessionClosed(IoSession session) {}
+ public void sessionIdle(IoSession session, IdleStatus status) {}
+ public void exceptionCaught(IoSession session, Throwable cause) {}
+ public void inputClosed(IoSession session) {}
+ public void messageReceived(IoSession session, Object message) {}
+ public void messageSent(IoSession session, WriteRequest writeRequest)
{}
+ public void filterWrite(IoSession session, WriteRequest writeRequest)
{}
+ public void filterClose(IoSession session) {}
+ public void sessionCreated(IoSession session) {}
+
+ @Override
+ public void event(IoSession session, FilterEvent event) {}
+ }
+}
diff --git
a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
index 87b48ea..8af367f 100644
---
a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
+++
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
@@ -19,6 +19,17 @@
*/
package org.apache.mina.filter.executor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.mina.core.filterchain.IoFilter;
import org.apache.mina.core.session.DummySession;
import org.apache.mina.core.session.IdleStatus;
@@ -28,15 +39,6 @@ import org.apache.mina.filter.FilterEvent;
import org.junit.Ignore;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
/**
* Tests that verify the functionality provided by the implementation of
* {@link PriorityThreadPoolExecutor}.
@@ -213,6 +215,110 @@ public class PriorityThreadPoolExecutorTest {
}
/**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after a RuntimeException is thrown when the {@link
PriorityThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the
+ * RuntimeException that is being used in the implementation of this test.
The purpose of this test is to verify
+ * Worker's behavior when a RuntimeException is thrown during execution
occurs (even if that RuntimeException cannot
+ * occur in the way that this test simulates it). A test that implements
the execution in a more realistic manner is
+ * provided in {@link
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testRuntimeExceptionInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new
PriorityThreadPoolExecutorTest.NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Worker execution has happened.
+ executor.shutdown();
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after an Error is thrown when the {@link
PriorityThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the Error that
+ * is being used in the implementation of this test. The purpose of this
test is to verify Worker's behavior when an
+ * Error is thrown during execution occurs (even if that Error cannot
occur in the way that this test simulates it).
+ * A test that implements the execution in a more realistic manner is
provided in
+ * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testErrorInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new
PriorityThreadPoolExecutorTest.NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new Error("An Error thrown during unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Ensure that the task has been executed in the executor.
+ executor.shutdown(); // Shutting down and awaiting termination
ensures that test execution blocks until Worker execution has happened.
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
* A comparator that prefers a particular session.
*/
private static class UnfairComparator implements Comparator<IoSession> {
@@ -305,4 +411,23 @@ public class PriorityThreadPoolExecutorTest {
// TODO Auto-generated method stub
}
}
+
+ /**
+ * Empty implementation of IoFilter.NextFilterAdapter, intended to
facilitate easy subclassing.
+ */
+ private abstract static class NextFilterAdapter implements
IoFilter.NextFilter {
+ public void sessionOpened(IoSession session) {}
+ public void sessionClosed(IoSession session) {}
+ public void sessionIdle(IoSession session, IdleStatus status) {}
+ public void exceptionCaught(IoSession session, Throwable cause) {}
+ public void inputClosed(IoSession session) {}
+ public void messageReceived(IoSession session, Object message) {}
+ public void messageSent(IoSession session, WriteRequest writeRequest)
{}
+ public void filterWrite(IoSession session, WriteRequest writeRequest)
{}
+ public void filterClose(IoSession session) {}
+ public void sessionCreated(IoSession session) {}
+
+ @Override
+ public void event(IoSession session, FilterEvent event) {}
+ }
}
diff --git
a/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
new file mode 100644
index 0000000..28d06c5
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.apache.mina.filter.FilterEvent;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link UnorderedThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, [email protected]
+ */
+public class UnorderedThreadPoolExecutorTest
+{
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after a RuntimeException is thrown when the {@link
UnorderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the
+ * RuntimeException that is being used in the implementation of this test.
The purpose of this test is to verify
+ * Worker's behavior when a RuntimeException is thrown during execution
occurs (even if that RuntimeException cannot
+ * occur in the way that this test simulates it). A test that implements
the execution in a more realistic manner is
+ * provided in {@link
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testRuntimeExceptionInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Worker execution has happened.
+ executor.shutdown();
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after an Error is thrown when the {@link
UnorderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the Error that
+ * is being used in the implementation of this test. The purpose of this
test is to verify Worker's behavior when an
+ * Error is thrown during execution occurs (even if that Error cannot
occur in the way that this test simulates it).
+ * A test that implements the execution in a more realistic manner is
provided in
+ * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testErrorInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new Error("An Error thrown during unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Ensure that the task has been executed in the executor.
+ executor.shutdown(); // Shutting down and awaiting termination
ensures that test execution blocks until Worker execution has happened.
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Empty implementation of IoFilter.NextFilterAdapter, intended to
facilitate easy subclassing.
+ */
+ private abstract static class NextFilterAdapter implements
IoFilter.NextFilter {
+ public void sessionOpened(IoSession session) {}
+ public void sessionClosed(IoSession session) {}
+ public void sessionIdle(IoSession session, IdleStatus status) {}
+ public void exceptionCaught(IoSession session, Throwable cause) {}
+ public void inputClosed(IoSession session) {}
+ public void messageReceived(IoSession session, Object message) {}
+ public void messageSent(IoSession session, WriteRequest writeRequest)
{}
+ public void filterWrite(IoSession session, WriteRequest writeRequest)
{}
+ public void filterClose(IoSession session) {}
+ public void sessionCreated(IoSession session) {}
+
+ @Override
+ public void event(IoSession session, FilterEvent event) {}
+ }
+}
diff --git
a/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
new file mode 100644
index 0000000..40b7073
--- /dev/null
+++ b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
+import org.apache.mina.filter.executor.PriorityThreadPoolExecutor;
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
+import org.junit.Test;
+
+/**
+ * Tests that reproduces a bug as described in issue DIRMINA-1156
+ *
+ * @author Guus der Kinderen, [email protected]
+ * @see <a
href="https://issues.apache.org/jira/browse/DIRMINA-1156">DIRMINA-1156</a>
+ */
+public class DIRMINA1156Test
+{
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after an {@link Error} is thrown by a session's handler that was
invoked through an OrderedThreadPoolExecutor.
+ */
+ @Test
+ public void testOrderedThreadPoolExecutorSessionHandlerThrowingError()
throws Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Error("An Error thrown during unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after a {@link RuntimeException} is thrown by a session's handler that
was invoked through an
+ * OrderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testOrderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after a (checked) {@link Exception} is thrown by a session's handler
that was invoked through an
+ * OrderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testOrderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Exception("A (checked) Exception thrown during unit
testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after an {@link Error} is thrown by a session's handler that was
invoked through an UnorderedThreadPoolExecutor.
+ */
+ @Test
+ public void testUnorderedThreadPoolExecutorSessionHandlerThrowingError()
throws Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Error("An Error thrown during unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after a {@link RuntimeException} is thrown by a session's handler that
was invoked through an
+ * UnorderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testUnorderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after a (checked) {@link Exception} is thrown by a session's handler
that was invoked through an
+ * UnorderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testUnorderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Exception("A (checked) Exception thrown during unit
testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after an {@link Error} is thrown by a session's handler that was
invoked through an PriorityThreadPoolExecutor.
+ */
+ @Test
+ public void testPriorityThreadPoolExecutorSessionHandlerThrowingError()
throws Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Error("An Error thrown during unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after a {@link RuntimeException} is thrown by a session's handler that
was invoked through an
+ * PriorityThreadPoolExecutor.
+ */
+ @Test
+ public void
testPriorityThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after a (checked) {@link Exception} is thrown by a session's handler
that was invoked through an
+ * PriorityThreadPoolExecutor.
+ */
+ @Test
+ public void
testPriorityThreadPoolExecutorSessionHandlerThrowingCheckedException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Exception("A (checked) Exception thrown during unit
testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+}