This is an automated email from the ASF dual-hosted git repository.
elecharny pushed a commit to branch 2.1.X
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.1.X by this push:
new 6ab1017cd Backported patch for DIRMINA-1169
6ab1017cd is described below
commit 6ab1017cd0e50d125d4e6450cf42d48349126c84
Author: emmanuel lecharny <[email protected]>
AuthorDate: Mon May 22 09:24:55 2023 +0200
Backported patch for DIRMINA-1169
---
.../core/polling/AbstractPollingIoAcceptor.java | 98 ++++++++++++++--------
.../transport/socket/nio/NioSocketAcceptor.java | 42 +++++++++-
.../transport/socket/nio/SocketAcceptorTest.java | 71 ++++++++++++++++
3 files changed, 175 insertions(+), 36 deletions(-)
diff --git
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index 8ca46a9e9..3fc26993f 100644
---
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -22,6 +22,8 @@ package org.apache.mina.core.polling;
import java.net.SocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -335,6 +337,13 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
wakeup();
}
+ /**
+ * Invoked when a bind request has been registered for processing. The
default implementation does nothing.
+ */
+ protected void bindRequestAdded() {
+ // Nothing
+ }
+
/**
* {@inheritDoc}
*/
@@ -347,6 +356,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
// adds the Registration request to the queue for the Workers
// to handle
registerQueue.add(request);
+ bindRequestAdded();
// creates the Acceptor instance and has the local
// executor kick it off.
@@ -429,6 +439,54 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
throw future.getException();
}
}
+
+ /**
+ * Handles new incoming connections by accepting the connections and
creating new sessions for them.
+ *
+ * @param handles the connection handles to accept and create new
sessions for
+ * @throws Exception on errors
+ */
+ @SuppressWarnings("unchecked")
+ protected void processHandles(Iterator<H> handles) throws Exception {
+ while (handles.hasNext()) {
+ H handle = handles.next();
+ handles.remove();
+
+ // Associates a new created connection to a processor,
+ // and get back a session
+ S session = accept(processor, handle);
+
+ if (session == null) {
+ continue;
+ }
+
+ initSession(session, null, null);
+
+ // add the session to the SocketIoProcessor
+ session.getProcessor().add(session);
+ }
+ }
+
+ /**
+ * Tells whether there are pending unbindings.
+ *
+ * @return {@code true} if there are any unbindings pending; {@code false}
otherwise
+ */
+ protected boolean hasUnbindings() {
+ return !cancelQueue.isEmpty();
+ }
+
+ /**
+ * Processes the futures for executed unbindings, marking all futures as
done.
+ *
+ * @param unboundFutures describing the unbindings
+ * @throws Exception on errors
+ */
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ for (AcceptorOperationFuture unboundFuture:unboundFutures) {
+ unboundFuture.setDone();
+ }
+ }
/**
* This class is called by the startupAcceptor() method and is
@@ -489,9 +547,11 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
// them here.
processHandles(selectedHandles());
}
-
+
// check to see if any cancellation request has been made.
- nHandles -= unregisterHandles();
+ Collection<AcceptorOperationFuture> cancellations = new
ArrayList<>();
+ nHandles -= unregisterHandles(cancellations);
+ handleUnbound(cancellations);
} catch (ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance().exceptionCaught(cse);
@@ -530,36 +590,6 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
- /**
- * This method will process new sessions for the Worker class. All
- * keys that have had their status updates as per the
Selector.selectedKeys()
- * method will be processed here. Only keys that are ready to accept
- * connections are handled here.
- * <p/>
- * Session objects are created by making new instances of
SocketSessionImpl
- * and passing the session object to the SocketIoProcessor class.
- */
- @SuppressWarnings("unchecked")
- private void processHandles(Iterator<H> handles) throws Exception {
- while (handles.hasNext()) {
- H handle = handles.next();
- handles.remove();
-
- // Associates a new created connection to a processor,
- // and get back a session
- S session = accept(processor, handle);
-
- if (session == null) {
- continue;
- }
-
- initSession(session, null, null);
-
- // add the session to the SocketIoProcessor
- session.getProcessor().add(session);
- }
- }
-
/**
* Sets up the socket communications. Sets items such as:
* <p/>
@@ -628,7 +658,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
* is CancellationRequest objects and the only place this happens is in
* the doUnbind() method.
*/
- private int unregisterHandles() {
+ private int unregisterHandles(Collection<AcceptorOperationFuture>
cancelled) {
int cancelledHandles = 0;
for (;;) {
AcceptorOperationFuture future = cancelQueue.poll();
@@ -654,7 +684,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
- future.setDone();
+ cancelled.add(future);
}
return cancelledHandles;
diff --git
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
index fc672b09d..836326609 100644
---
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
+++
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
@@ -135,6 +135,35 @@ implements SocketAcceptor {
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ // If we're on Java >= 11, unbindings may take effect only on the next
select()
+ // TODO: add a check (java.specification.version?) to do this only on
a JVM >= 11?
+ if (!unboundFutures.isEmpty()) {
+ int selected = 0;
+ try {
+ // Simply select() would also work since wakeup() *was*
called, but let's be explicit.
+ selected = selector.selectNow();
+ } finally {
+ super.handleUnbound(unboundFutures); // Marks the futures as
done
+ if (hasUnbindings()) {
+ // Depending on when these new unbindings were added,
their wakeup() call may just have been
+ // cancelled by the above select. Re-instate it, so that
the next select will not block, as
+ // expected.
+ wakeup();
+ }
+ }
+ if (selected > 0) {
+ processHandles(selectedHandles());
+ }
+ } else {
+ super.handleUnbound(unboundFutures);
+ }
+ }
+
/**
* {@inheritDoc}
*/
@@ -148,6 +177,7 @@ implements SocketAcceptor {
/**
* {@inheritDoc}
*/
+ @Override
public TransportMetadata getTransportMetadata() {
return NioSocketSession.METADATA;
}
@@ -171,6 +201,7 @@ implements SocketAcceptor {
/**
* {@inheritDoc}
*/
+ @Override
public void setDefaultLocalAddress(InetSocketAddress localAddress) {
setDefaultLocalAddress((SocketAddress) localAddress);
}
@@ -269,8 +300,12 @@ implements SocketAcceptor {
String newMessage = "Error while binding on " + localAddress;
Exception e = new IOException(newMessage, ioe);
- // And close the channel
- channel.close();
+ try {
+ // And close the channel
+ channel.close();
+ } catch (IOException nested) {
+ e.addSuppressed(nested);
+ }
throw e;
}
@@ -364,6 +399,7 @@ implements SocketAcceptor {
* @return <tt>true</tt> if there is at least one more
* SockectChannel object to read
*/
+ @Override
public boolean hasNext() {
return iterator.hasNext();
}
@@ -374,6 +410,7 @@ implements SocketAcceptor {
*
* @return The next SocketChannel in the iterator
*/
+ @Override
public ServerSocketChannel next() {
SelectionKey key = iterator.next();
@@ -387,6 +424,7 @@ implements SocketAcceptor {
/**
* Remove the current SocketChannel from the iterator
*/
+ @Override
public void remove() {
iterator.remove();
}
diff --git
a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
new file mode 100644
index 000000000..3c09ccc87
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.util.AvailablePortFinder;
+import org.junit.Test;
+
+public class SocketAcceptorTest {
+
+ @Test
+ public void testBindTwice() throws Exception {
+ NioSocketAcceptor acceptor = new NioSocketAcceptor() {
+
+ private int nRequests;
+
+ private CountDownLatch secondRequestAdded = new CountDownLatch(1);
+
+ @Override
+ protected void bindRequestAdded() {
+ super.bindRequestAdded();
+ nRequests++;
+ if (nRequests == 2) {
+ secondRequestAdded.countDown();
+ }
+ }
+
+ @Override
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ super.handleUnbound(unboundFutures);
+ if (!unboundFutures.isEmpty() && nRequests == 1) {
+ secondRequestAdded.await();
+ }
+ }
+ };
+ acceptor.setCloseOnDeactivation(false);
+ acceptor.setReuseAddress(true);
+ acceptor.setHandler(new IoHandlerAdapter());
+ try {
+ int port = AvailablePortFinder.getNextAvailable(1025);
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1",
port);
+ acceptor.bind(address);
+ acceptor.unbind(address);
+ acceptor.bind(address);
+ acceptor.unbind(address);
+ } finally {
+ acceptor.dispose();
+ }
+ }
+}
\ No newline at end of file