This is an automated email from the ASF dual-hosted git repository.
elecharny pushed a commit to branch 2.0.X
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.0.X by this push:
new 4de9c94 Backported the fix for DIRMINA-1156
4de9c94 is described below
commit 4de9c942c8e34ef864d02ce6568dc23978548862
Author: emmanuel lecharny <[email protected]>
AuthorDate: Wed Jan 12 23:47:01 2022 +0100
Backported the fix for DIRMINA-1156
---
.../filter/executor/OrderedThreadPoolExecutor.java | 15 +-
.../executor/PriorityThreadPoolExecutor.java | 44 +-
.../executor/UnorderedThreadPoolExecutor.java | 20 +-
.../executor/OrderedThreadPoolExecutorTest.java | 162 +++++++
.../executor/PriorityThreadPoolExecutorTest.java | 424 +++++++++++++++++++
.../executor/UnorderedThreadPoolExecutorTest.java | 162 +++++++
.../org/apache/mina/handler/DIRMINA1156Test.java | 465 +++++++++++++++++++++
7 files changed, 1258 insertions(+), 34 deletions(-)
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
index 7378956..841a584 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
@@ -255,12 +255,13 @@ public class OrderedThreadPoolExecutor extends
ThreadPoolExecutor {
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
+ workers.add(worker);
+
// As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
// Now, we can start it.
thread.start();
- workers.add(worker);
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
@@ -681,8 +682,6 @@ public class OrderedThreadPoolExecutor extends
ThreadPoolExecutor {
if (session == null) {
synchronized (workers) {
if (workers.size() > getCorePoolSize()) {
- // Remove now to prevent duplicate exit.
- workers.remove(this);
break;
}
}
@@ -692,13 +691,11 @@ public class OrderedThreadPoolExecutor extends
ThreadPoolExecutor {
break;
}
- try {
- if (session != null) {
- runTasks(getSessionTasksQueue(session));
- }
- } finally {
- idleWorkers.incrementAndGet();
+ if (session != null) {
+ runTasks(getSessionTasksQueue(session));
}
+
+ idleWorkers.incrementAndGet();
}
} finally {
synchronized (workers) {
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
index 721005c..43ace10 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
@@ -19,15 +19,32 @@
*/
package org.apache.mina.filter.executor;
-import org.apache.mina.core.session.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.mina.core.session.AttributeKey;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoEvent;
+import org.apache.mina.core.session.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s
* within a session (similar to {@link OrderedThreadPoolExecutor}) and allows
@@ -304,12 +321,13 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
+ workers.add(worker);
+
// As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
// Now, we can start it.
thread.start();
- workers.add(worker);
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
@@ -730,8 +748,6 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
if (session == null) {
synchronized (workers) {
if (workers.size() > getCorePoolSize()) {
- // Remove now to prevent duplicate exit.
- workers.remove(this);
break;
}
}
@@ -741,13 +757,11 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
break;
}
- try {
- if (session != null) {
- runTasks(getSessionTasksQueue(session));
- }
- } finally {
- idleWorkers.incrementAndGet();
+ if (session != null) {
+ runTasks(getSessionTasksQueue(session));
}
+
+ idleWorkers.incrementAndGet();
}
} finally {
synchronized (workers) {
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
index 3136492..813f857 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
@@ -198,9 +198,13 @@ public class UnorderedThreadPoolExecutor extends
ThreadPoolExecutor {
Worker worker = new Worker();
Thread thread = getThreadFactory().newThread(worker);
+ workers.add(worker);
+
+ // As we have added a new thread, it's considered as idle.
idleWorkers.incrementAndGet();
+
+ // Now, we can start it.
thread.start();
- workers.add(worker);
if (workers.size() > largestPoolSize) {
largestPoolSize = workers.size();
@@ -476,8 +480,6 @@ public class UnorderedThreadPoolExecutor extends
ThreadPoolExecutor {
if (task == null) {
synchronized (workers) {
if (workers.size() > corePoolSize) {
- // Remove now to prevent duplicate exit.
- workers.remove(this);
break;
}
}
@@ -487,14 +489,12 @@ public class UnorderedThreadPoolExecutor extends
ThreadPoolExecutor {
break;
}
- try {
- if (task != null) {
-
queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
- runTask(task);
- }
- } finally {
- idleWorkers.incrementAndGet();
+ if (task != null) {
+ queueHandler.polled(UnorderedThreadPoolExecutor.this,
(IoEvent) task);
+ runTask(task);
}
+
+ idleWorkers.incrementAndGet();
}
} finally {
synchronized (workers) {
diff --git
a/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
new file mode 100644
index 0000000..de6c9d9
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link OrderedThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, [email protected]
+ */
+public class OrderedThreadPoolExecutorTest
+{
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after a RuntimeException is thrown when the {@link
OrderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the
+ * RuntimeException that is being used in the implementation of this test.
The purpose of this test is to verify
+ * Worker's behavior when a RuntimeException is thrown during execution
occurs (even if that RuntimeException cannot
+ * occur in the way that this test simulates it). A test that implements
the execution in a more realistic manner is
+ * provided in {@link
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testRuntimeExceptionInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Worker execution has happened.
+ executor.shutdown();
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after an Error is thrown when the {@link
OrderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the Error that
+ * is being used in the implementation of this test. The purpose of this
test is to verify Worker's behavior when an
+ * Error is thrown during execution occurs (even if that Error cannot
occur in the way that this test simulates it).
+ * A test that implements the execution in a more realistic manner is
provided in
+ * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testErrorInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new Error("An Error thrown during unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Ensure that the task has been executed in the executor.
+ executor.shutdown(); // Shutting down and awaiting termination
ensures that test execution blocks until Worker execution has happened.
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Empty implementation of IoFilter.NextFilterAdapter, intended to
facilitate easy subclassing.
+ */
+ private abstract static class NextFilterAdapter implements
IoFilter.NextFilter {
+ public void sessionOpened(IoSession session) {}
+ public void sessionClosed(IoSession session) {}
+ public void sessionIdle(IoSession session, IdleStatus status) {}
+ public void exceptionCaught(IoSession session, Throwable cause) {}
+ public void inputClosed(IoSession session) {}
+ public void messageReceived(IoSession session, Object message) {}
+ public void messageSent(IoSession session, WriteRequest writeRequest)
{}
+ public void filterWrite(IoSession session, WriteRequest writeRequest)
{}
+ public void filterClose(IoSession session) {}
+ public void sessionCreated(IoSession session) {}
+ }
+}
diff --git
a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
new file mode 100644
index 0000000..97c97cd
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link PriorityThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, [email protected]
+ */
+public class PriorityThreadPoolExecutorTest {
+ /**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+ * .
+ *
+ * This test asserts that, without a provided comparator, entries are
+ * considered equal, when they reference the same session.
+ */
+ @Test
+ public void fifoEntryTestNoComparatorSameSession() throws Exception {
+ // Set up fixture.
+ IoSession session = new DummySession();
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(session, null);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(session, null);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertEquals("Without a comparator, entries of the same session are
expected to be equal.", 0, result);
+ }
+
+ /**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+ * .
+ *
+ * This test asserts that, without a provided comparator, the first entry
+ * created is 'less than' an entry that is created later.
+ */
+ @Test
+ public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
+ // Set up fixture (the order in which the entries are created is
+ // relevant here!)
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertTrue("Without a comparator, the first entry created should be
the first entry out. Expected a negative result, instead, got: " + result,
result < 0);
+ }
+
+ /**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+ * .
+ *
+ * This test asserts that, with a provided comparator, entries are
+ * considered equal, when they reference the same session (the provided
+ * comparator is ignored).
+ */
+ @Test
+ public void fifoEntryTestWithComparatorSameSession() throws Exception {
+ // Set up fixture.
+ IoSession session = new DummySession();
+ final int predeterminedResult = 3853;
+
+ Comparator<IoSession> comparator = new Comparator<IoSession>() {
+ @Override
+ public int compare(IoSession o1, IoSession o2) {
+ return predeterminedResult;
+ }
+ };
+
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertEquals("With a comparator, entries of the same session are
expected to be equal.", 0, result);
+ }
+
+ /**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+ * .
+ *
+ * This test asserts that a provided comparator is used instead of the
+ * (fallback) default behavior (when entries are referring different
+ * sessions).
+ */
+ @Test
+ public void fifoEntryTestComparatorDifferentSession() throws Exception {
+ // Set up fixture (the order in which the entries are created is
+ // relevant here!)
+ final int predeterminedResult = 3853;
+
+ Comparator<IoSession> comparator = new Comparator<IoSession>() {
+ @Override
+ public int compare(IoSession o1, IoSession o2) {
+ return predeterminedResult;
+ }
+ };
+
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertEquals("With a comparator, comparing entries of different
sessions is expected to yield the comparator result.", predeterminedResult,
result);
+ }
+
+ /**
+ * Asserts that, when enough work is being submitted to the executor for it
+ * to start queuing work, prioritisation of work starts to occur.
+ *
+ * This implementation starts a number of sessions, and evenly distributes
a
+ * number of messages to them. Processing each message is artificially made
+ * 'expensive', while the executor pool is kept small. This causes work to
+ * be queued in the executor.
+ *
+ * The executor that is used is configured to prefer one specific session.
+ * Each session records the timestamp of its last activity. After all work
+ * has been processed, the test asserts that the last activity of all
+ * sessions was later than the last activity of the preferred session.
+ */
+ @Test
+ @Ignore("This test faiuls randomly")
+ public void testPrioritisation() throws Throwable {
+ // Set up fixture.
+ MockWorkFilter nextFilter = new MockWorkFilter();
+ List<LastActivityTracker> sessions = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ sessions.add(new LastActivityTracker());
+ }
+
+ LastActivityTracker preferredSession = sessions.get(4); // prefer an
arbitrary session
+ // (but not
the first or last
+ // session,
for good measure).
+ Comparator<IoSession> comparator = new
UnfairComparator(preferredSession);
+ int maximumPoolSize = 1; // keep this low, to force resource
contention.
+ int amountOfTasks = 400;
+
+ ExecutorService executor = new
PriorityThreadPoolExecutor(maximumPoolSize, comparator);
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ // Execute system under test.
+ for (int i = 0; i < amountOfTasks; i++) {
+ int sessionIndex = i % sessions.size();
+
+ LastActivityTracker currentSession = sessions.get(sessionIndex);
+ filter.messageReceived(nextFilter, currentSession, null);
+
+ if (nextFilter.throwable != null) {
+ throw nextFilter.throwable;
+ }
+ }
+
+ executor.shutdown();
+
+ // Verify results.
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+
+ for (LastActivityTracker session : sessions) {
+ if (session != preferredSession) {
+ assertTrue("All other sessions should have finished later than
the preferred session (but at least one did not).",
+ session.lastActivity > preferredSession.lastActivity);
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after a RuntimeException is thrown when the {@link
PriorityThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the
+ * RuntimeException that is being used in the implementation of this test.
The purpose of this test is to verify
+ * Worker's behavior when a RuntimeException is thrown during execution
occurs (even if that RuntimeException cannot
+ * occur in the way that this test simulates it). A test that implements
the execution in a more realistic manner is
+ * provided in {@link
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testRuntimeExceptionInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new
PriorityThreadPoolExecutorTest.NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Worker execution has happened.
+ executor.shutdown();
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after an Error is thrown when the {@link
PriorityThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the Error that
+ * is being used in the implementation of this test. The purpose of this
test is to verify Worker's behavior when an
+ * Error is thrown during execution occurs (even if that Error cannot
occur in the way that this test simulates it).
+ * A test that implements the execution in a more realistic manner is
provided in
+ * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testErrorInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new
PriorityThreadPoolExecutorTest.NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new Error("An Error thrown during unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Ensure that the task has been executed in the executor.
+ executor.shutdown(); // Shutting down and awaiting termination
ensures that test execution blocks until Worker execution has happened.
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * A comparator that prefers a particular session.
+ */
+ private static class UnfairComparator implements Comparator<IoSession> {
+ private IoSession preferred;
+
+ public UnfairComparator(IoSession preferred) {
+ this.preferred = preferred;
+ }
+
+ @Override
+ public int compare(IoSession o1, IoSession o2) {
+ if (o1 == preferred) {
+ return -1;
+ }
+
+ if (o2 == preferred) {
+ return 1;
+ }
+
+ return 0;
+ }
+ }
+
+ /**
+ * A session that tracks the timestamp of last activity.
+ */
+ private static class LastActivityTracker extends DummySession {
+ long lastActivity = System.currentTimeMillis();
+
+ public synchronized void setLastActivity() {
+ lastActivity = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * A filter that simulates a non-negligible amount of work.
+ */
+ private static class MockWorkFilter implements IoFilter.NextFilter {
+ Throwable throwable;
+
+ public void sessionOpened(IoSession session) {
+ // Do nothing
+ }
+
+ public void sessionClosed(IoSession session) {
+ // Do nothing
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) {
+ // Do nothing
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) {
+ // Do nothing
+ }
+
+ public void inputClosed(IoSession session) {
+ // Do nothing
+ }
+
+ public void messageReceived(IoSession session, Object message) {
+ try {
+ Thread.sleep(20); // mimic work.
+ ((LastActivityTracker) session).setLastActivity();
+ } catch (Exception e) {
+ if (this.throwable == null) {
+ this.throwable = e;
+ }
+ }
+ }
+
+ public void messageSent(IoSession session, WriteRequest writeRequest) {
+ // Do nothing
+ }
+
+ public void filterWrite(IoSession session, WriteRequest writeRequest) {
+ // Do nothing
+ }
+
+ public void filterClose(IoSession session) {
+ // Do nothing
+ }
+
+ public void sessionCreated(IoSession session) {
+ // Do nothing
+ }
+ }
+
+ /**
+ * Empty implementation of IoFilter.NextFilterAdapter, intended to
facilitate easy subclassing.
+ */
+ private abstract static class NextFilterAdapter implements
IoFilter.NextFilter {
+ public void sessionOpened(IoSession session) {}
+ public void sessionClosed(IoSession session) {}
+ public void sessionIdle(IoSession session, IdleStatus status) {}
+ public void exceptionCaught(IoSession session, Throwable cause) {}
+ public void inputClosed(IoSession session) {}
+ public void messageReceived(IoSession session, Object message) {}
+ public void messageSent(IoSession session, WriteRequest writeRequest)
{}
+ public void filterWrite(IoSession session, WriteRequest writeRequest)
{}
+ public void filterClose(IoSession session) {}
+ public void sessionCreated(IoSession session) {}
+ }
+}
diff --git
a/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
new file mode 100644
index 0000000..22adcea
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link UnorderedThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, [email protected]
+ */
+public class UnorderedThreadPoolExecutorTest
+{
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after a RuntimeException is thrown when the {@link
UnorderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the
+ * RuntimeException that is being used in the implementation of this test.
The purpose of this test is to verify
+ * Worker's behavior when a RuntimeException is thrown during execution
occurs (even if that RuntimeException cannot
+ * occur in the way that this test simulates it). A test that implements
the execution in a more realistic manner is
+ * provided in {@link
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testRuntimeExceptionInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Worker execution has happened.
+ executor.shutdown();
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after an Error is thrown when the {@link
UnorderedThreadPoolExecutor.Worker} is running.
+ *
+ * Note that the implementation of this test is <em>not
representative</em> of how tasks are normally executed, as
+ * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain
implementations would catch the Error that
+ * is being used in the implementation of this test. The purpose of this
test is to verify Worker's behavior when an
+ * Error is thrown during execution occurs (even if that Error cannot
occur in the way that this test simulates it).
+ * A test that implements the execution in a more realistic manner is
provided in
+ * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+ *
+ * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+ * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+ */
+ @Test
+ public void testErrorInWorkerRun() throws Throwable
+ {
+ // Set up test fixture.
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message) {
+ throw new Error("An Error thrown during unit testing.");
+ }
+ };
+ DummySession session = new DummySession();
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ try {
+ // Execute system under test.
+ filter.messageReceived(nextFilter, session, null);
+
+ // Ensure that the task has been executed in the executor.
+ executor.shutdown(); // Shutting down and awaiting termination
ensures that test execution blocks until Worker execution has happened.
+ if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("Bug in test implementation.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Empty implementation of IoFilter.NextFilterAdapter, intended to
facilitate easy subclassing.
+ */
+ private abstract static class NextFilterAdapter implements
IoFilter.NextFilter {
+ public void sessionOpened(IoSession session) {}
+ public void sessionClosed(IoSession session) {}
+ public void sessionIdle(IoSession session, IdleStatus status) {}
+ public void exceptionCaught(IoSession session, Throwable cause) {}
+ public void inputClosed(IoSession session) {}
+ public void messageReceived(IoSession session, Object message) {}
+ public void messageSent(IoSession session, WriteRequest writeRequest)
{}
+ public void filterWrite(IoSession session, WriteRequest writeRequest)
{}
+ public void filterClose(IoSession session) {}
+ public void sessionCreated(IoSession session) {}
+ }
+}
diff --git
a/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
new file mode 100644
index 0000000..40b7073
--- /dev/null
+++ b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
+import org.apache.mina.filter.executor.PriorityThreadPoolExecutor;
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
+import org.junit.Test;
+
+/**
+ * Tests that reproduces a bug as described in issue DIRMINA-1156
+ *
+ * @author Guus der Kinderen, [email protected]
+ * @see <a
href="https://issues.apache.org/jira/browse/DIRMINA-1156">DIRMINA-1156</a>
+ */
+public class DIRMINA1156Test
+{
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after an {@link Error} is thrown by a session's handler that was
invoked through an OrderedThreadPoolExecutor.
+ */
+ @Test
+ public void testOrderedThreadPoolExecutorSessionHandlerThrowingError()
throws Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Error("An Error thrown during unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after a {@link RuntimeException} is thrown by a session's handler that
was invoked through an
+ * OrderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testOrderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and
{@link OrderedThreadPoolExecutor#workers}
+ * after a (checked) {@link Exception} is thrown by a session's handler
that was invoked through an
+ * OrderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testOrderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ OrderedThreadPoolExecutor executor = new
OrderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Exception("A (checked) Exception thrown during unit
testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after an {@link Error} is thrown by a session's handler that was
invoked through an UnorderedThreadPoolExecutor.
+ */
+ @Test
+ public void testUnorderedThreadPoolExecutorSessionHandlerThrowingError()
throws Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Error("An Error thrown during unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after a {@link RuntimeException} is thrown by a session's handler that
was invoked through an
+ * UnorderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testUnorderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and
{@link UnorderedThreadPoolExecutor#workers}
+ * after a (checked) {@link Exception} is thrown by a session's handler
that was invoked through an
+ * UnorderedThreadPoolExecutor.
+ */
+ @Test
+ public void
testUnorderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ UnorderedThreadPoolExecutor executor = new
UnorderedThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Exception("A (checked) Exception thrown during unit
testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after an {@link Error} is thrown by a session's handler that was
invoked through an PriorityThreadPoolExecutor.
+ */
+ @Test
+ public void testPriorityThreadPoolExecutorSessionHandlerThrowingError()
throws Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Error("An Error thrown during unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after a {@link RuntimeException} is thrown by a session's handler that
was invoked through an
+ * PriorityThreadPoolExecutor.
+ */
+ @Test
+ public void
testPriorityThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new RuntimeException("A RuntimeException thrown during
unit testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and
{@link PriorityThreadPoolExecutor#workers}
+ * after a (checked) {@link Exception} is thrown by a session's handler
that was invoked through an
+ * PriorityThreadPoolExecutor.
+ */
+ @Test
+ public void
testPriorityThreadPoolExecutorSessionHandlerThrowingCheckedException() throws
Exception
+ {
+ // Set up test fixture.
+ final boolean[] filterTriggered = {false}; // Used to verify the
implementation of this test (to see if the Handler is invoked at all).
+ int corePoolSize = 1; // Prevent an idle worker from being cleaned up,
which would skew the results of this test.
+ DummySession session = new DummySession();
+ IoFilterChain chain = session.getFilterChain();
+ PriorityThreadPoolExecutor executor = new
PriorityThreadPoolExecutor(corePoolSize,1);
+ chain.addLast("executor", new ExecutorFilter(executor));
+ session.setHandler( new IoHandlerAdapter() {
+ @Override
+ public void messageReceived(IoSession session, Object message)
throws Exception {
+ filterTriggered[0] = true;
+ throw new Exception("A (checked) Exception thrown during unit
testing.");
+ }
+ });
+
+ // Execute system under test.
+ try {
+ chain.fireMessageReceived("foo");
+
+ // Shutting down and awaiting termination ensures that test
execution blocks until Handler invocation has happened.
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!filterTriggered[0]) {
+ throw new IllegalStateException("Bug in test implementation:
the session handler was never invoked.");
+ }
+
+ // Verify results.
+ final Field idleWorkersField =
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using
reflection as the field is not accessible. It might be nicer to make the field
package-protected for testing.
+ idleWorkersField.setAccessible(true);
+ final AtomicInteger idleWorkers = (AtomicInteger)
idleWorkersField.get(executor);
+ assertEquals("After all tasks have finished, the amount of workers
that are idle should equal the amount of workers, but did not.",
executor.getPoolSize(), idleWorkers.get());
+ } finally {
+ // Clean up test fixture.
+ if (!executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+ }
+ }
+}