This is an automated email from the ASF dual-hosted git repository.
elecharny pushed a commit to branch 2.0.X
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.0.X by this push:
new 9ce82a16a Applied Tomas' patch for DIRMINA-1169, with a slight
modification (replacement of the :: operator)'
9ce82a16a is described below
commit 9ce82a16a773585f14e927993e147870384b809f
Author: emmanuel lecharny <[email protected]>
AuthorDate: Tue Oct 4 22:48:01 2022 +0200
Applied Tomas' patch for DIRMINA-1169, with a slight modification
(replacement of the :: operator)'
---
.../core/polling/AbstractPollingIoAcceptor.java | 97 ++++++++++++++--------
.../transport/socket/nio/NioSocketAcceptor.java | 60 ++++++++++---
.../transport/socket/nio/SocketAcceptorTest.java | 71 ++++++++++++++++
3 files changed, 182 insertions(+), 46 deletions(-)
diff --git
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
index 8ca46a9e9..fc6f9b8ba 100644
---
a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
+++
b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoAcceptor.java
@@ -22,6 +22,8 @@ package org.apache.mina.core.polling;
import java.net.SocketAddress;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -335,6 +337,13 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
wakeup();
}
+ /**
+ * Invoked when a bind request has been registered for processing. The
default implementation does nothing.
+ */
+ protected void bindRequestAdded() {
+ // Nothing
+ }
+
/**
* {@inheritDoc}
*/
@@ -347,6 +356,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
// adds the Registration request to the queue for the Workers
// to handle
registerQueue.add(request);
+ bindRequestAdded();
// creates the Acceptor instance and has the local
// executor kick it off.
@@ -430,6 +440,54 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
+ /**
+ * Handles new incoming connections by accepting the connections and
creating new sessions for them.
+ *
+ * @param handles the connection handles to accept and create new
sessions for
+ * @throws Exception on errors
+ */
+ @SuppressWarnings("unchecked")
+ protected void processHandles(Iterator<H> handles) throws Exception {
+ while (handles.hasNext()) {
+ H handle = handles.next();
+ handles.remove();
+
+ // Associates a new created connection to a processor,
+ // and get back a session
+ S session = accept(processor, handle);
+
+ if (session == null) {
+ continue;
+ }
+
+ initSession(session, null, null);
+
+ // add the session to the SocketIoProcessor
+ session.getProcessor().add(session);
+ }
+ }
+
+ /**
+ * Tells whether there are pending unbindings.
+ *
+ * @return {@code true} if there are any unbindings pending; {@code false}
otherwise
+ */
+ protected boolean hasUnbindings() {
+ return !cancelQueue.isEmpty();
+ }
+
+ /**
+ * Processes the futures for executed unbindings, marking all futures as
done.
+ *
+ * @param unboundFutures describing the unbindings
+ * @throws Exception on errors
+ */
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ for (AcceptorOperationFuture unboundFuture:unboundFutures) {
+ unboundFuture.setDone();
+ }
+ }
+
/**
* This class is called by the startupAcceptor() method and is
* placed into a NamePreservingRunnable class.
@@ -491,8 +549,9 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
// check to see if any cancellation request has been made.
- nHandles -= unregisterHandles();
- } catch (ClosedSelectorException cse) {
+ Collection<AcceptorOperationFuture> cancellations = new
ArrayList<>();
+ nHandles -= unregisterHandles(cancellations);
+ handleUnbound(cancellations); } catch
(ClosedSelectorException cse) {
// If the selector has been closed, we can exit the loop
ExceptionMonitor.getInstance().exceptionCaught(cse);
break;
@@ -530,36 +589,6 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
- /**
- * This method will process new sessions for the Worker class. All
- * keys that have had their status updates as per the
Selector.selectedKeys()
- * method will be processed here. Only keys that are ready to accept
- * connections are handled here.
- * <p/>
- * Session objects are created by making new instances of
SocketSessionImpl
- * and passing the session object to the SocketIoProcessor class.
- */
- @SuppressWarnings("unchecked")
- private void processHandles(Iterator<H> handles) throws Exception {
- while (handles.hasNext()) {
- H handle = handles.next();
- handles.remove();
-
- // Associates a new created connection to a processor,
- // and get back a session
- S session = accept(processor, handle);
-
- if (session == null) {
- continue;
- }
-
- initSession(session, null, null);
-
- // add the session to the SocketIoProcessor
- session.getProcessor().add(session);
- }
- }
-
/**
* Sets up the socket communications. Sets items such as:
* <p/>
@@ -628,7 +657,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
* is CancellationRequest objects and the only place this happens is in
* the doUnbind() method.
*/
- private int unregisterHandles() {
+ private int unregisterHandles(Collection<AcceptorOperationFuture>
cancelled) {
int cancelledHandles = 0;
for (;;) {
AcceptorOperationFuture future = cancelQueue.poll();
@@ -654,7 +683,7 @@ public abstract class AbstractPollingIoAcceptor<S extends
AbstractIoSession, H>
}
}
- future.setDone();
+ cancelled.add(future);
}
return cancelledHandles;
diff --git
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
index b0bebaff8..0d937fcd6 100644
---
a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
+++
b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
@@ -66,7 +66,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
/**
* Constructor for {@link NioSocketAcceptor} using default parameters, and
* given number of {@link NioProcessor} for multithreading I/O operations.
- *
+ *
* @param processorCount the number of processor to create and place in a
* {@link SimpleIoProcessorPool}
*/
@@ -135,6 +135,36 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
}
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ // If we're on Java >= 11, unbindings may take effect only on the next
select()
+ // TODO: add a check (java.specification.version?) to do this only on
a JVM >= 11?
+ if (!unboundFutures.isEmpty()) {
+ int selected = 0;
+ try {
+ // Simply select() would also work since wakeup() *was*
called, but let's be explicit.
+ selected = selector.selectNow();
+ } finally {
+ super.handleUnbound(unboundFutures); // Marks the futures as
done
+
+ if (hasUnbindings()) {
+ // Depending on when these new unbindings were added,
their wakeup() call may just have been
+ // cancelled by the above select. Re-instate it, so that
the next select will not block, as
+ // expected.
+ wakeup();
+ }
+ }
+ if (selected > 0) {
+ processHandles(selectedHandles());
+ }
+ } else {
+ super.handleUnbound(unboundFutures);
+ }
+ }
+
/**
* {@inheritDoc}
*/
@@ -148,6 +178,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
/**
* {@inheritDoc}
*/
+ @Override
public TransportMetadata getTransportMetadata() {
return NioSocketSession.METADATA;
}
@@ -171,6 +202,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
/**
* {@inheritDoc}
*/
+ @Override
public void setDefaultLocalAddress(InetSocketAddress localAddress) {
setDefaultLocalAddress((SocketAddress) localAddress);
}
@@ -194,7 +226,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
// accept the connection from the client
try {
SocketChannel ch = handle.accept();
-
+
if (ch == null) {
return null;
}
@@ -260,20 +292,21 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
channel.setOption(StandardSocketOptions.SO_RCVBUF,
config.getReceiveBufferSize());
}
-
// and bind.
try {
socket.bind(localAddress, getBacklog());
} catch (IOException ioe) {
// Add some info regarding the address we try to bind to the
// message
- String newMessage = "Error while binding on " + localAddress +
"\n" + "original message : "
- + ioe.getMessage();
+ String newMessage = "Error while binding on " + localAddress;
Exception e = new IOException(newMessage, ioe);
- e.initCause(ioe.getCause());
-
- // And close the channel
- channel.close();
+
+ try {
+ // And close the channel
+ channel.close();
+ } catch (IOException nested) {
+ e.addSuppressed(nested);
+ }
throw e;
}
@@ -305,7 +338,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
* It returns only after at least one channel is selected,
* this selector's wakeup method is invoked, or the current thread
* is interrupted, whichever comes first.
- *
+ *
* @return The number of keys having their ready-operation set updated
* @throws IOException If an I/O error occurs
*/
@@ -355,7 +388,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
/**
* Build a SocketChannel iterator which will return a SocketChannel
instead of
* a SelectionKey.
- *
+ *
* @param selectedKeys The selector selected-key set
*/
private ServerSocketChannelIterator(Collection<SelectionKey>
selectedKeys) {
@@ -367,6 +400,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
* @return <tt>true</tt> if there is at least one more
* SockectChannel object to read
*/
+ @Override
public boolean hasNext() {
return iterator.hasNext();
}
@@ -374,9 +408,10 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
/**
* Get the next SocketChannel in the operator we have built from
* the selected-key et for this selector.
- *
+ *
* @return The next SocketChannel in the iterator
*/
+ @Override
public ServerSocketChannel next() {
SelectionKey key = iterator.next();
@@ -390,6 +425,7 @@ public class NioSocketAcceptor extends
AbstractPollingIoAcceptor<NioSession, Se
/**
* Remove the current SocketChannel from the iterator
*/
+ @Override
public void remove() {
iterator.remove();
}
diff --git
a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
new file mode 100644
index 000000000..b7f1d7c9c
--- /dev/null
+++
b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/SocketAcceptorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.util.AvailablePortFinder;
+import org.junit.Test;
+
+public class SocketAcceptorTest {
+
+ @Test
+ public void testBindTwice() throws Exception {
+ NioSocketAcceptor acceptor = new NioSocketAcceptor() {
+
+ private int nRequests;
+
+ private CountDownLatch secondRequestAdded = new CountDownLatch(1);
+
+ @Override
+ protected void bindRequestAdded() {
+ super.bindRequestAdded();
+ nRequests++;
+ if (nRequests == 2) {
+ secondRequestAdded.countDown();
+ }
+ }
+
+ @Override
+ protected void handleUnbound(Collection<AcceptorOperationFuture>
unboundFutures) throws Exception {
+ super.handleUnbound(unboundFutures);
+ if (!unboundFutures.isEmpty() && nRequests == 1) {
+ secondRequestAdded.await();
+ }
+ }
+ };
+ acceptor.setCloseOnDeactivation(false);
+ acceptor.setReuseAddress(true);
+ acceptor.setHandler(new IoHandlerAdapter());
+ try {
+ int port = AvailablePortFinder.getNextAvailable(1025);
+ InetSocketAddress address = new InetSocketAddress("127.0.0.1",
port);
+ acceptor.bind(address);
+ acceptor.unbind(address);
+ acceptor.bind(address);
+ acceptor.unbind(address);
+ } finally {
+ acceptor.dispose();
+ }
+ }
+}