Author: mdonoughe Date: 2006-06-27 21:45:03 -0700 (Tue, 27 Jun 2006) New Revision: 3072
Removed: freeway/src/org/gnu/freeway/util/net/TCPServer.java freeway/src/org/gnu/freeway/util/net/TCPSession.java Modified: freeway/src/org/gnu/freeway/AbstractClient.java freeway/src/org/gnu/freeway/server/ClientServer.java freeway/src/org/gnu/freeway/test/TCPTest.java freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java freeway/src/org/gnu/freeway/util/net/CSServer.java freeway/src/org/gnu/freeway/util/net/CSSession.java Log: Move TCPServer and TCPSession into CSServer and CSSession because they are the only implementations Modified: freeway/src/org/gnu/freeway/AbstractClient.java =================================================================== --- freeway/src/org/gnu/freeway/AbstractClient.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/AbstractClient.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -60,7 +60,7 @@ return null; } - session=new TCPSession(); + session=new CSSession(); if (!session.connect(ip,port,true)) { log(Level.SEVERE,"Could not connect to gnunetd !"); Modified: freeway/src/org/gnu/freeway/server/ClientServer.java =================================================================== --- freeway/src/org/gnu/freeway/server/ClientServer.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/server/ClientServer.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -34,7 +34,7 @@ private Stat bytesOut; - private TCPServer server; + private CSServer server; private PersistentDecoder decoder; @@ -44,7 +44,7 @@ super(true); handlers=new ArrayList(); exitHandlers=new LinkedList(); - server=new TCPServer("C/S",this); + server=new CSServer("C/S",this); decoder=null; } @@ -150,7 +150,7 @@ // debug("Accepted connection from "+ip.getHostAddress()+":"+socket.socket().getPort()+"."); - hd=new TCPSession(server); + hd=new CSSession(server); return (hd.connect(socket,true) ? hd : null); } Modified: freeway/src/org/gnu/freeway/test/TCPTest.java =================================================================== --- freeway/src/org/gnu/freeway/test/TCPTest.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/test/TCPTest.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -46,7 +46,7 @@ imax=4; for (i=0; i<imax; i++) { - acceptSocket=new TCPSession(); + acceptSocket=new CSSession(); if (acceptSocket.connect(doAccept(serverSocket),true)) { acceptSocket.setBlocking(true); Modified: freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java =================================================================== --- freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -18,7 +18,7 @@ * Transport Session handle. */ -public class TCPSession extends org.gnu.freeway.util.net.TCPSession implements Session +public class TCPSession extends org.gnu.freeway.util.net.CSSession implements Session { /** after how much time of the core not being associated with a tcp connection anymore do we close it ? */ public static final long TIME_OUT = Scheduler.SECS_30; @@ -45,7 +45,7 @@ private long lastWrite; - public TCPSession( TCPServer s, TCPTransport t ) + public TCPSession( CSServer s, TCPTransport t ) { super(s); transport=t; Modified: freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java =================================================================== --- freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -26,7 +26,7 @@ private StatusCallsService status; /** */ - private TCPServer server; + private CSServer server; /** */ private int mtu; @@ -44,7 +44,7 @@ public TCPTransport() { super(TCP_PROTOCOL_NUMBER,"TCP"); - server=new TCPServer("TCP PEER SERVER",this); + server=new CSServer("TCP PEER SERVER",this); } public String toString() Modified: freeway/src/org/gnu/freeway/util/net/CSServer.java =================================================================== --- freeway/src/org/gnu/freeway/util/net/CSServer.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/util/net/CSServer.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -4,24 +4,645 @@ package org.gnu.freeway.util.net; +import org.gnu.freeway.util.*; + +import java.io.*; +import java.net.*; +import java.nio.channels.*; +import java.util.*; +import java.util.logging.*; + /** * */ -public interface CSServer +public class CSServer extends LoggedObject { - public String getLabel(); + /** */ + private static final long SELECT_TIMEOUT = Scheduler.SECS_3; - public boolean isLaunched(); - public boolean launch( int port ); - public boolean shutdown(); + /** Maximum of pending unhandled connections. */ + private static final int MAX_QUEUED_REQUESTS = 5; + /** Maximum number of concurrent allowed sessions. */ + private static final int MAX_SESSIONS = 64; + + /** Name of this server (for debugging purpose only). */ + private String label; + + /** Selector of the server thread */ + private Selector selector; + + /** The TCP socket that we listen on for new inbound connections. */ + private ServerSocketChannel server; + + /** Thread for listening for new connections. */ + private MasterTask listenTask; + + /** Thread for accepting new connections. */ + private SlaveTask acceptTask; + + /** Thread for reading on all open sockets. */ + private SlaveTask readTask; + + /** Thread for writing on all open sockets. */ + private SlaveTask writeTask; + + /** Should the listen thread exit ? */ + private boolean running; + + /** Array of currently active TCP sessions. */ + private CSSession[] sessions; + + /** */ + private int sessionCount; + + /** Sessions' current operations. */ + private int[] sessionsOps; + + /** */ + private boolean acceptingOp; + + /** Sessions lock. */ + private Object internal; + + /** */ + private CSSessionHandler handler; + + + public CSServer( String str, CSSessionHandler h ) + { + super(true); + label=str; + selector=null; + server=null; + listenTask=new MasterTask("TCP-LISTEN("+str+")",new EvalAction(this,"performListen")); + acceptTask=listenTask.create("TCP-ACCEPT("+str+")",new EvalAction(this,"performAccept")); + readTask=listenTask.create("TCP-READ("+str+")",new EvalAction(this,"performRead")); + writeTask=listenTask.create("TCP-WRITE("+str+")",new EvalAction(this,"performWrite")); + running=false; + sessions=new CSSession[0]; + sessionCount=0; + sessionsOps=new int[0]; + acceptingOp=false; + internal=new Object(); + handler=h; + } + + public String toString() + { + return "Abstract TCP server"; + } + + + //////////////////////////////////////////////////////////////////////////////////////////////// + + public String getLabel() + { + return label; + } + + public boolean isLaunched() + { + return running; + } + + public boolean launch( int port ) + { + int secs; + + log(Level.INFO,label+" Launching TCP server..."); + + // open selector + try { + selector=Selector.open(); + } + catch( IOException x ) { + err("Could not create selector !",x); + return false; + } + + // create server socket + secs=5; + while (server==null && secs<60) { + try { + server=ServerSocketChannel.open(); + server.configureBlocking(false); + server.socket().setReuseAddress(true); + server.socket().bind(new InetSocketAddress(port),MAX_QUEUED_REQUESTS); + log(Level.INFO,label+" TCP server bound to port "+port+"."); + } + catch( IOException x ) { + err("Failed to open socket at port "+port+". Trying again in "+secs+" seconds...",x); + + Scheduler.sleep(Scheduler.seconds(secs)); + secs+=5; // slow progression... + + if (server!=null) { + try { + server.close(); + } + catch( IOException xx ) { + } + server=null; + } + } + } + + if (server==null) { + log(Level.SEVERE,label+" Could not create socket, abort."); + try { + selector.close(); + } + catch( IOException x ) { + } + selector=null; + return false; + } + + // start listening thread + running=true; + listenTask.launch(); + return true; + } + + public boolean shutdown() + { + int i; + + // signal listening thread + running=false; + selector.wakeup(); + + // stop listening thread + listenTask.shutdown(); + + try { + server.close(); + } + catch( IOException x ) { + err("Failed to close socket !",x); + return false; + } + finally { + server=null; + try { + selector.close(); + } + catch( IOException x ) { + err("Failed to close selector !",x); + return false; + } + finally { + selector=null; + } + } + log(Level.INFO,label+" TCP server stopped."); + + synchronized(internal) { + for (i=0; i<sessions.length; i++) { + if (sessions[i]!=null) { + log(Level.WARNING,label+" Session still alive : "+sessions[i].getLabel()); + destroySession(i); + } + } + } + return true; + } + + public void wakeUp() + { + selector.wakeup(); + } + /** - * Add manually a session to the pool of listened sessions. + * Add session to the pool of listened sessions. If it can't be added, session will be disconnected and false returned. * - * @param s The session to add. - * @return True if okay (enough ressources), false otherwise. + * @param s + * @return + * @see CSSession#disconnect() */ - public boolean register( CSSession s ); + public boolean register( CSSession s ) + { + if (addSession(s)>=0) { + // signal the thread that is blocked in a select call that the set of sockets to listen to has changed + selector.wakeup(); + return true; + } + return false; + } + + /** + * Listen for incoming connections. + * Main method for the thread listening on the tcp socket and all tcp connections. + * Whenever a message is received, it is processed by the handler. + * This thread waits for activity on any of the TCP connections and processes deferred (async) writes and buffers reads + * until an entire message has been received. + * + * @throws IOException + */ + + public void performListen() throws IOException + { + SelectionKey key; + Iterator iter; + int mergedOps,ops,ret,i; + + acceptingOp=true; + + while (running) { + synchronized(internal) { + server.register(selector,(acceptingOp ? SelectionKey.OP_ACCEPT : 0)); + + for (i=0; i<sessions.length; i++) { + if (sessions[i]!=null) { + if (sessions[i].isConnected()) { // always check because impl. may disconnect after timeout... + key=sessions[i].registerOps(selector,(sessions[i].getOps() & ~sessionsOps[i])); + if (key!=null) { + key.attach(new Integer(i)); + } + else { + destroySession(i); + } + } + else { + // clean up (depends on session implementation : timeout detected, other side closed connection...) + destroySession(i); + } + } + } + } + + // should wake up regularly (to clean up sessions...) + ret=selector.select(Scheduler.toMillis(SELECT_TIMEOUT)); + if (ret==0) { + continue; + } + + synchronized(internal) { + mergedOps=0; + + iter=selector.selectedKeys().iterator(); + while (iter.hasNext()) { + key=(SelectionKey) iter.next(); + iter.remove(); + + if (key.isValid()) { + ops=key.readyOps(); + mergedOps|=ops; + if ((ops & SelectionKey.OP_ACCEPT)==0) { // read or write op + i=((Number) key.attachment()).intValue(); + sessionsOps[i]|=ops; + } + } + } + + debug(label+" Selected #"+ret+" sockets with merged ops { "+NetUtils.labelForOps(mergedOps)+" }."); + + // signal appropriate tasks + if ((mergedOps & SelectionKey.OP_ACCEPT)!=0) { + acceptingOp=false; + acceptTask.signal(); + } + if ((mergedOps & SelectionKey.OP_READ)!=0) { + readTask.signal(); + } + if ((mergedOps & SelectionKey.OP_WRITE)!=0) { + writeTask.signal(); + } + } + } + + // shutdown... close all sessions + synchronized(internal) { + for (i=0; i<sessions.length; i++) { + if (sessions[i]!=null) { + destroySession(i); + } + } + } + } + + public void performAccept() + { + CSSession s; + SocketChannel c; + + try { + for (c=server.accept(); c!=null; c=server.accept()) { + s=handler.handleAccept(c); + if (s!=null) { + if (addSession(s)<0) { + s.disconnect(); + } + } + else { + try { + c.close(); + } + catch( IOException xx ) { + err("Failed to close channel !",xx); + } + } + } + } + catch( IOException x ) { + err("Failed to accept new connection !",x); + } + + synchronized(internal) { + acceptingOp=true; + } + + selector.wakeup(); + } + + public void performRead() + { + CSSession s; + int len; + + do { + s=firstSessionWithOp(SelectionKey.OP_READ); + if (s!=null) { + len=s.doReceive(); + if (len>0 && handler.handleRead(s,len)) { + clearSessionOp(s,SelectionKey.OP_READ); + } + else { + debug(s.getLabel()+" End of stream."); + destroySession(s); + } + } + } + while (s!=null); + + // signal the thread that is blocked in a select call that the set of sockets to listen to has changed + selector.wakeup(); + } + + public void performWrite() + { + CSSession s; + int len; + + do { + s=firstSessionWithOp(SelectionKey.OP_WRITE); + if (s!=null) { + len=s.doSend(); + if (len>0 && handler.handleWrite(s,len)) { + clearSessionOp(s,SelectionKey.OP_WRITE); + } + else { + debug(s.getLabel()+" End of stream."); + destroySession(s); + } + } + } + while (s!=null); + + // signal the thread that is blocked in a select call that the set of sockets to listen to has changed + selector.wakeup(); + } + + /** + * Add a new session to the array watched by the select thread. Grows the array if needed. + * + * @param s Session to add. + * @return Index of added session, or -1 on error. + */ + + protected int addSession( CSSession s ) + { + CSSession[] tmp; + int[] tmp2; + int i; + + synchronized(internal) { + if (sessionCount==MAX_SESSIONS) { + log(Level.WARNING,"Too many sessions ("+MAX_SESSIONS+"), ignore connection."); + return -1; + } + + for (i=0; i<sessions.length && sessions[i]!=null; i++) {} + if (i==sessions.length) { + tmp=new CSSession[sessions.length+16]; + Arrays.fill(tmp,null); + System.arraycopy(sessions,0,tmp,0,sessions.length); + sessions=tmp; + + tmp2=new int[sessionsOps.length+16]; + Arrays.fill(tmp2,0); + System.arraycopy(sessionsOps,0,tmp2,0,sessionsOps.length); + sessionsOps=tmp2; + } + + sessions[i]=s; + sessionsOps[i]=0; + sessionCount++; + debug("Add session at slot #"+i+" "+Utils.gauge(sessionCount,sessions.length)+"."); + return i; + } + } + + protected CSSession firstSessionWithOp( int op ) + { + int i; + + synchronized(internal) { + for (i=0; i<sessionsOps.length && (sessionsOps[i] & op)==0; i++) {} + return (i<sessionsOps.length ? sessions[i] : null); + } + } + + protected boolean clearSessionOp( CSSession s, int op ) + { + int i; + + assert(s!=null); + + synchronized(internal) { + for (i=0; i<sessions.length && sessions[i]!=s; i++) {} + if (i==sessions.length) { + log(Level.WARNING,label+" Session not found : "+s.getLabel()+"."); + return false; + } + + sessionsOps[i]&=~op; + return true; + } + } + + protected boolean destroySession( CSSession s ) + { + int i; + + assert(s!=null); + + synchronized(internal) { + for (i=0; i<sessions.length && sessions[i]!=s; i++) {} + if (i==sessions.length) { + log(Level.WARNING,label+" Session not found : "+s.getLabel()+"."); + return false; + } + + return destroySession(i); + } + } + + /** + * The client has disconnected. Close the socket, free the buffers, unlink session from the linked list. + * Remove a session, either the other side closed the connection or we have otherwise reason to believe + * that it should better be killed. + * + * @param index index to the session handle + * @return + */ + + protected boolean destroySession( int index ) + { + assert(index>=0); + + synchronized(internal) { + if (index>=sessions.length || sessions[index]==null) { + log(Level.WARNING,label+" No session at slot "+index+"."); + return false; + } + + if (sessions[index].isConnected()) { + sessions[index].disconnect(); + } + + handler.handleDestroy(sessions[index]); + + sessions[index]=null; + sessionsOps[index]=0; + sessionCount--; + debug("Destroyed session at slot #"+index+" "+Utils.gauge(sessionCount,sessions.length)+"."); + } + return true; + } } + + +/* + public void add( int id, Class c, CommandHandler h ) + { + decoder.add(id,c); + setHandler(c,h); + } + + public void setDefault( int id, Class c, CommandHandler h ) + { + decoder.add(id,c); + setHandler(c,h); + decoder.setDefault(id); + } + + public void setCorrupted( int id, Class c, CommandHandler h ) + { + decoder.add(id,c); + setHandler(c,h); + decoder.setCorrupted(id); + } + + public boolean hasHandler( Class c ) + { + synchronized(handlers) { + return handlers.get(c)!=null; + } + } + + public CommandHandler getHandler( Class c ) + { + synchronized(handlers) { + return (CommandHandler) handlers.get(c); + } + } + + public boolean setHandler( Class c, CommandHandler h ) + { + synchronized(handlers) { + if (handlers.get(c)!=null) { + log(Level.WARNING,"Could not assign handler, class "+c.getName()+" is already registered."); + return false; + } + handlers.put(c,h); + return true; + } + } + + public boolean removeHandler( Class c, CommandHandler h ) + { + synchronized(handlers) { + if (handlers.get(c)==null) { + log(Level.WARNING,"Could not remove handler, class "+c.getName()+" is not registered."); + return false; + } + handlers.remove(c); + return true; + } + } + + +deriver de TCP Server: + public void start() + { + add(ProxyCommand.HELLO_ID,ProxyHello.class,this); + add(ProxyCommand.CONNECT_ID,ProxyConnect.class,this); + add(ProxyCommand.SETBLOCKING_ID,ProxySetBlocking.class,this); + + setDefault(ProxyCommand.UNKNOWN_ID,ProxyUnknown.class,this); + setCorrupted(ProxyCommand.CORRUPTED_ID,ProxyCorrupted.class,this); + + super.start(); + } + + public void stop() + { + super.stop(); + } + + public TCP Session createSession( PersistentDecoder decoder ) + { + return new ProxySession(decoder); + } + + public boolean handle( TCP Session session, Persistent p ) + { + ProxySession s; + ProxyToken token; + int id; + + id=((ProxyCommand) p).getID(); + + s=(ProxySession) session; + if (!s.isWelcomed()) { + if (id!=ProxyCommand.HELLO_ID) { + log("No hello received, close session."); + return false; + } + s.welcome(); + + token=new ProxyToken(s.getToken()); + token.setAddress("",0); + sendToClient(s,token); + return true; + } + + switch (id) { + case ProxyCommand.CONNECT_ID: + break; + + case ProxyCommand.SETBLOCKING_ID: + break; + + default: + log("Unknown message : "+p); + return false; + } + return true; + } + +*/ Modified: freeway/src/org/gnu/freeway/util/net/CSSession.java =================================================================== --- freeway/src/org/gnu/freeway/util/net/CSSession.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/util/net/CSSession.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -4,36 +4,407 @@ package org.gnu.freeway.util.net; +import org.gnu.freeway.util.*; + import java.net.*; import java.nio.channels.*; +import java.util.logging.*; /** + * Per-client data structure (kept in linked list). Also: the opaque + * handle for client connections passed by the core to the CSHandlers. + * Opaque handle for client connections passed by + * the core to the CSHandlers. * + * A connection to a freeway client application. To be used in non-blocking mode. + * + * Struct to refer to a GNUnet TCP connection. + * This is more than just a socket because if the server + * drops the connection, the client automatically tries + * to reconnect (and for that needs connection information). + * + * Code for synchronized access to TCP streams + * + * Generic TCP code for reliable, mostly blocking, record-oriented TCP + * connections. GNUnet uses the "tcpio" code for trusted client-server + * (e.g. gnunet-gtk to gnunetd via loopback) communications. Note + * that an unblocking write is also provided since if both client and + * server use blocking IO, both may block on a write and cause a + * mutual inter-process deadlock. + * + * Since we do not want other peers (!) to be able to block a peer by + * not reading from the TCP stream, the peer-to-peer TCP transport + * uses unreliable, buffered, non-blocking, record-oriented TCP code + * with a select call to reduce the number of threads which is + * provided in transports/tcp.c. + * Generic TCP code. This module is used to receive or send records + * (!) from a TCP stream. The code automatically attempts to + * re-connect if the other side closes the connection.<br> + * + * The code can be used on the server- or the client side, just in + * case of the server the reconnect can of course not be used. The TCP + * stream is broken into records of maximum length MAX_BUFFER_SIZE, + * each preceeded by a 16 bits integer (not signed) giving the length of the + * following record.<p> */ -public interface CSSession +public class CSSession extends LoggedObject { - public String getLabel(); + /** */ + private CSServer server; - public int getOps(); - public SelectionKey registerOps( Selector sel, int ops ); + /** Socket to communicate with the other side. */ + private PersistentSocket socket; - public boolean isConnected(); - public boolean connect( InetAddress ip, int port, boolean careful ); - public boolean connect( SocketChannel channel, boolean careful ); - public boolean disconnect(); + /** */ + private String label; - public boolean isBlocking(); - public void setBlocking( boolean flag ); + /** Lock used to synchronized read operations. */ + protected Object readLock; - public int doReceive(); - public boolean hasReceived(); - public Persistent receive( Class c ); - public Persistent receive( PersistentDecoder decoder ); + /** Lock used to synchronized write operations. */ + protected Object writeLock; - public boolean send( Persistent p ); - public boolean sendAndCheck( Persistent p ); - public boolean flushAndSend( Persistent p ); - public boolean hasToSend(); - public int doSend(); + + public CSSession() + { + this(null); + } + + public CSSession( CSServer s ) + { + super(true); + server=s; + socket=new PersistentSocket(); + socket.setDebug(false); + label=socket.getLabel(); + readLock=new Object(); + writeLock=new Object(); + } + + public String toString() + { + return "Client/server session"; + } + + + //////////////////////////////////////////////////////////////////////////////////////////////// + + public String getLabel() + { + return label; + } + + public int getOps() + { + synchronized(readLock) { + synchronized(writeLock) { + return (socket.shouldWrite() ? (SelectionKey.OP_READ | SelectionKey.OP_WRITE) : SelectionKey.OP_READ); + } + } + } + + public SelectionKey registerOps( Selector sel, int ops ) + { + SelectionKey key; + + synchronized(readLock) { + synchronized(writeLock) { + key=socket.getChannel().keyFor(sel); + if (key==null) { + try { + key=socket.getChannel().register(sel,0); + } + catch( ClosedChannelException x ) { + err(label+" Failed to register on selector !",x); + return null; + } + } + key.interestOps(ops); + return key; + } + } + } + + public boolean isConnected() + { + synchronized(readLock) { + synchronized(writeLock) { + return !socket.isClosed(); + } + } + } + + /** + * Connect this session to the specified ip and port in *blocking mode*. + * Used when connecting to a server at {ip,port}. + * + * @param ip IP of the host to connect to. + * @param port The port number. + * @param careful Should we treat socket with respect (SO_LINGER not set) ? + * @return True if successful, false on failure. + */ + + public boolean connect( InetAddress ip, int port, boolean careful ) + { + boolean res; + + synchronized(readLock) { + synchronized(writeLock) { + res=socket.open(ip,port,careful); + if (res) { + label=socket.getLabel(); + debug(label+" Connected."); + } + return res; + } + } + } + + /** + * Connect this session to the specified channel in *non blocking* mode. + * Used when connecting to a client from a server. + * + * @param channel The open client socket. + * @param careful Should we treat socket with respect (SO_LINGER not set) ? + * @return True if successful, false on failure. + */ + + public boolean connect( SocketChannel channel, boolean careful ) + { + boolean res; + + synchronized(readLock) { + synchronized(writeLock) { + res=socket.open(channel,careful); + if (res) { + label=socket.getLabel(); + debug(label+" Connected."); + } + return res; + } + } + } + + /** + * Close the session. + * + * @return True if succeedeed, false otherwise. + */ + + public boolean disconnect() + { + boolean res; + + synchronized(readLock) { + synchronized(writeLock) { + if (socket.isClosed()) { + log(Level.WARNING,"Session is already closed."); + return false; + } + + res=socket.close(); + if (res) { + debug(label+" Disconnected."); + } + return res; + } + } + } + + public boolean isBlocking() + { + synchronized(readLock) { + synchronized(writeLock) { + return socket.isBlocking(); + } + } + } + + public void setBlocking( boolean flag ) + { + synchronized(readLock) { + synchronized(writeLock) { + socket.setBlocking(flag); + } + } + } + + /** + * Buffer data received from the other side. + * + * @return True if at least one byte has been received, false if the socket was closed by the other side or if an error occured. + */ + + public int doReceive() + { + int len; + + synchronized(readLock) { + len=socket.doRead(); + if (len>0) { + debug(label+" Have read "+len+" bytes."); + } + return len; + } + } + + public boolean hasReceived() + { + synchronized(readLock) { + return socket.shouldDequeue(); + } + } + + /** + * @param c + * @return + */ + + public Persistent receive( Class c ) + { + Persistent p; + + synchronized(readLock) { + if (!socket.shouldDequeue() && socket.isBlocking()) { + doReceive(); + } + + p=socket.dequeue(c); + if (p!=null) { + debug(label+" Received message : "+p+"."); + } + return p; + } + } + + /** + * Decode buffered data. If in blocking mode and no messages are buffered, an attempt is made to read fresh data. + * + * @param decoder Decoder used to transform transmitted data into messages. + * @return Any decoded data if available, null otherwise. + */ + + public Persistent receive( PersistentDecoder decoder ) + { + Persistent p; + + synchronized(readLock) { + if (!socket.shouldDequeue() && socket.isBlocking()) { + doReceive(); + } + + p=socket.dequeue(decoder); + if (p!=null) { + debug(label+" Received message : "+p+"."); + } + return p; + } + } + + /** + * Add data to the buffer, and if blocking, start transferring buffered data. + * + * <div>When in blocking mode, try to also send buffered data to the other side. Returns true if, at least, one byte + * has been transmitted. Please note that it does *not* imply that any part of the data <code>p</code> has been transmitted, + * since other data may had been buffered previously (transfer is initiated but may be incomplete).</div> + * + * <div>In non-blocking mode, returns true. The actual transfer happens asynchronously.</div> + * + * @param p The data to write (duplicated, because may be buffered and stored a certain amount of time...). + * @return True if in non-blocking mode, or if at least one byte of buffered data has been transmitted, false otherwise. + */ + + public boolean send( Persistent p ) + { + p=PersistentHelper.copy(p); + + synchronized(writeLock) { + socket.enqueue(p); + debug(label+" Sent message : "+p+"."); + + if (server!=null) { + server.wakeUp(); + } + + return (socket.isBlocking() ? doSend()>0 : true); + } + } + + public boolean sendAndCheck( Persistent p ) + { + CSResult res; + + synchronized(writeLock) { + if (!send(p)) { + return false; + } + + res=(CSResult) receive(CSResult.class); + return (res!=null && res.isOkay()); + } + } + + /** + * Flush buffered data, buffer given data <code>p</code> and try to initiate transfer of this data. + * Note that it is possible that only a part of the message is sent. + * + * Returning true here means that at least a small part of the message has been transmitted, + * though it may be transmitted entirely a bit later. + * + * @param p The data to write (duplicated, because may be buffered and stored a certain amount of time...). + * @return False if an I/O error occurred, or if it did not transmit any byte of the message. Return true otherwise. + */ + + public boolean flushAndSend( Persistent p ) + { + boolean empty; + + p=PersistentHelper.copy(p); + + synchronized(writeLock) { + doSend(); + + empty=!socket.shouldWrite(); + + socket.enqueue(p); + debug(label+" Sent message : "+p+"."); + + if (server!=null) { + server.wakeUp(); + } + + return (empty ? doSend()>0 : false); + } + } + + public boolean hasToSend() + { + synchronized(writeLock) { + return socket.shouldWrite(); + } + } + + /** + * Send buffered data, if any. + * + * @return True if at least one byte has been transmitted, false otherwise + * (an error occured, the other side is not ready, or there is no data in buffer). + */ + + public int doSend() + { + int len; + + synchronized(writeLock) { + len=0; + if (socket.shouldWrite()) { + len=socket.doWrite(); + if (len>0) { + debug(label+" Have written "+len+" bytes."); + } + } + return len; + } + } } Deleted: freeway/src/org/gnu/freeway/util/net/TCPServer.java =================================================================== --- freeway/src/org/gnu/freeway/util/net/TCPServer.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/util/net/TCPServer.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -1,648 +0,0 @@ -/** - * @PROJECT_INFO@ - */ - -package org.gnu.freeway.util.net; - -import org.gnu.freeway.util.*; - -import java.io.*; -import java.net.*; -import java.nio.channels.*; -import java.util.*; -import java.util.logging.*; - -/** - * - */ - -public class TCPServer extends LoggedObject implements CSServer -{ - /** */ - private static final long SELECT_TIMEOUT = Scheduler.SECS_3; - - /** Maximum of pending unhandled connections. */ - private static final int MAX_QUEUED_REQUESTS = 5; - - /** Maximum number of concurrent allowed sessions. */ - private static final int MAX_SESSIONS = 64; - - /** Name of this server (for debugging purpose only). */ - private String label; - - /** Selector of the server thread */ - private Selector selector; - - /** The TCP socket that we listen on for new inbound connections. */ - private ServerSocketChannel server; - - /** Thread for listening for new connections. */ - private MasterTask listenTask; - - /** Thread for accepting new connections. */ - private SlaveTask acceptTask; - - /** Thread for reading on all open sockets. */ - private SlaveTask readTask; - - /** Thread for writing on all open sockets. */ - private SlaveTask writeTask; - - /** Should the listen thread exit ? */ - private boolean running; - - /** Array of currently active TCP sessions. */ - private CSSession[] sessions; - - /** */ - private int sessionCount; - - /** Sessions' current operations. */ - private int[] sessionsOps; - - /** */ - private boolean acceptingOp; - - /** Sessions lock. */ - private Object internal; - - /** */ - private CSSessionHandler handler; - - - public TCPServer( String str, CSSessionHandler h ) - { - super(true); - label=str; - selector=null; - server=null; - listenTask=new MasterTask("TCP-LISTEN("+str+")",new EvalAction(this,"performListen")); - acceptTask=listenTask.create("TCP-ACCEPT("+str+")",new EvalAction(this,"performAccept")); - readTask=listenTask.create("TCP-READ("+str+")",new EvalAction(this,"performRead")); - writeTask=listenTask.create("TCP-WRITE("+str+")",new EvalAction(this,"performWrite")); - running=false; - sessions=new CSSession[0]; - sessionCount=0; - sessionsOps=new int[0]; - acceptingOp=false; - internal=new Object(); - handler=h; - } - - public String toString() - { - return "Abstract TCP server"; - } - - - //////////////////////////////////////////////////////////////////////////////////////////////// - - public String getLabel() - { - return label; - } - - public boolean isLaunched() - { - return running; - } - - public boolean launch( int port ) - { - int secs; - - log(Level.INFO,label+" Launching TCP server..."); - - // open selector - try { - selector=Selector.open(); - } - catch( IOException x ) { - err("Could not create selector !",x); - return false; - } - - // create server socket - secs=5; - while (server==null && secs<60) { - try { - server=ServerSocketChannel.open(); - server.configureBlocking(false); - server.socket().setReuseAddress(true); - server.socket().bind(new InetSocketAddress(port),MAX_QUEUED_REQUESTS); - log(Level.INFO,label+" TCP server bound to port "+port+"."); - } - catch( IOException x ) { - err("Failed to open socket at port "+port+". Trying again in "+secs+" seconds...",x); - - Scheduler.sleep(Scheduler.seconds(secs)); - secs+=5; // slow progression... - - if (server!=null) { - try { - server.close(); - } - catch( IOException xx ) { - } - server=null; - } - } - } - - if (server==null) { - log(Level.SEVERE,label+" Could not create socket, abort."); - try { - selector.close(); - } - catch( IOException x ) { - } - selector=null; - return false; - } - - // start listening thread - running=true; - listenTask.launch(); - return true; - } - - public boolean shutdown() - { - int i; - - // signal listening thread - running=false; - selector.wakeup(); - - // stop listening thread - listenTask.shutdown(); - - try { - server.close(); - } - catch( IOException x ) { - err("Failed to close socket !",x); - return false; - } - finally { - server=null; - try { - selector.close(); - } - catch( IOException x ) { - err("Failed to close selector !",x); - return false; - } - finally { - selector=null; - } - } - log(Level.INFO,label+" TCP server stopped."); - - synchronized(internal) { - for (i=0; i<sessions.length; i++) { - if (sessions[i]!=null) { - log(Level.WARNING,label+" Session still alive : "+sessions[i].getLabel()); - destroySession(i); - } - } - } - return true; - } - - public void wakeUp() - { - selector.wakeup(); - } - - /** - * Add session to the pool of listened sessions. If it can't be added, session will be disconnected and false returned. - * - * @param s - * @return - * @see CSSession#disconnect() - */ - - public boolean register( CSSession s ) - { - if (addSession(s)>=0) { - // signal the thread that is blocked in a select call that the set of sockets to listen to has changed - selector.wakeup(); - return true; - } - return false; - } - - /** - * Listen for incoming connections. - * Main method for the thread listening on the tcp socket and all tcp connections. - * Whenever a message is received, it is processed by the handler. - * This thread waits for activity on any of the TCP connections and processes deferred (async) writes and buffers reads - * until an entire message has been received. - * - * @throws IOException - */ - - public void performListen() throws IOException - { - SelectionKey key; - Iterator iter; - int mergedOps,ops,ret,i; - - acceptingOp=true; - - while (running) { - synchronized(internal) { - server.register(selector,(acceptingOp ? SelectionKey.OP_ACCEPT : 0)); - - for (i=0; i<sessions.length; i++) { - if (sessions[i]!=null) { - if (sessions[i].isConnected()) { // always check because impl. may disconnect after timeout... - key=sessions[i].registerOps(selector,(sessions[i].getOps() & ~sessionsOps[i])); - if (key!=null) { - key.attach(new Integer(i)); - } - else { - destroySession(i); - } - } - else { - // clean up (depends on session implementation : timeout detected, other side closed connection...) - destroySession(i); - } - } - } - } - - // should wake up regularly (to clean up sessions...) - ret=selector.select(Scheduler.toMillis(SELECT_TIMEOUT)); - if (ret==0) { - continue; - } - - synchronized(internal) { - mergedOps=0; - - iter=selector.selectedKeys().iterator(); - while (iter.hasNext()) { - key=(SelectionKey) iter.next(); - iter.remove(); - - if (key.isValid()) { - ops=key.readyOps(); - mergedOps|=ops; - if ((ops & SelectionKey.OP_ACCEPT)==0) { // read or write op - i=((Number) key.attachment()).intValue(); - sessionsOps[i]|=ops; - } - } - } - - debug(label+" Selected #"+ret+" sockets with merged ops { "+NetUtils.labelForOps(mergedOps)+" }."); - - // signal appropriate tasks - if ((mergedOps & SelectionKey.OP_ACCEPT)!=0) { - acceptingOp=false; - acceptTask.signal(); - } - if ((mergedOps & SelectionKey.OP_READ)!=0) { - readTask.signal(); - } - if ((mergedOps & SelectionKey.OP_WRITE)!=0) { - writeTask.signal(); - } - } - } - - // shutdown... close all sessions - synchronized(internal) { - for (i=0; i<sessions.length; i++) { - if (sessions[i]!=null) { - destroySession(i); - } - } - } - } - - public void performAccept() - { - CSSession s; - SocketChannel c; - - try { - for (c=server.accept(); c!=null; c=server.accept()) { - s=handler.handleAccept(c); - if (s!=null) { - if (addSession(s)<0) { - s.disconnect(); - } - } - else { - try { - c.close(); - } - catch( IOException xx ) { - err("Failed to close channel !",xx); - } - } - } - } - catch( IOException x ) { - err("Failed to accept new connection !",x); - } - - synchronized(internal) { - acceptingOp=true; - } - - selector.wakeup(); - } - - public void performRead() - { - CSSession s; - int len; - - do { - s=firstSessionWithOp(SelectionKey.OP_READ); - if (s!=null) { - len=s.doReceive(); - if (len>0 && handler.handleRead(s,len)) { - clearSessionOp(s,SelectionKey.OP_READ); - } - else { - debug(s.getLabel()+" End of stream."); - destroySession(s); - } - } - } - while (s!=null); - - // signal the thread that is blocked in a select call that the set of sockets to listen to has changed - selector.wakeup(); - } - - public void performWrite() - { - CSSession s; - int len; - - do { - s=firstSessionWithOp(SelectionKey.OP_WRITE); - if (s!=null) { - len=s.doSend(); - if (len>0 && handler.handleWrite(s,len)) { - clearSessionOp(s,SelectionKey.OP_WRITE); - } - else { - debug(s.getLabel()+" End of stream."); - destroySession(s); - } - } - } - while (s!=null); - - // signal the thread that is blocked in a select call that the set of sockets to listen to has changed - selector.wakeup(); - } - - /** - * Add a new session to the array watched by the select thread. Grows the array if needed. - * - * @param s Session to add. - * @return Index of added session, or -1 on error. - */ - - protected int addSession( CSSession s ) - { - CSSession[] tmp; - int[] tmp2; - int i; - - synchronized(internal) { - if (sessionCount==MAX_SESSIONS) { - log(Level.WARNING,"Too many sessions ("+MAX_SESSIONS+"), ignore connection."); - return -1; - } - - for (i=0; i<sessions.length && sessions[i]!=null; i++) {} - if (i==sessions.length) { - tmp=new CSSession[sessions.length+16]; - Arrays.fill(tmp,null); - System.arraycopy(sessions,0,tmp,0,sessions.length); - sessions=tmp; - - tmp2=new int[sessionsOps.length+16]; - Arrays.fill(tmp2,0); - System.arraycopy(sessionsOps,0,tmp2,0,sessionsOps.length); - sessionsOps=tmp2; - } - - sessions[i]=s; - sessionsOps[i]=0; - sessionCount++; - debug("Add session at slot #"+i+" "+Utils.gauge(sessionCount,sessions.length)+"."); - return i; - } - } - - protected CSSession firstSessionWithOp( int op ) - { - int i; - - synchronized(internal) { - for (i=0; i<sessionsOps.length && (sessionsOps[i] & op)==0; i++) {} - return (i<sessionsOps.length ? sessions[i] : null); - } - } - - protected boolean clearSessionOp( CSSession s, int op ) - { - int i; - - assert(s!=null); - - synchronized(internal) { - for (i=0; i<sessions.length && sessions[i]!=s; i++) {} - if (i==sessions.length) { - log(Level.WARNING,label+" Session not found : "+s.getLabel()+"."); - return false; - } - - sessionsOps[i]&=~op; - return true; - } - } - - protected boolean destroySession( CSSession s ) - { - int i; - - assert(s!=null); - - synchronized(internal) { - for (i=0; i<sessions.length && sessions[i]!=s; i++) {} - if (i==sessions.length) { - log(Level.WARNING,label+" Session not found : "+s.getLabel()+"."); - return false; - } - - return destroySession(i); - } - } - - /** - * The client has disconnected. Close the socket, free the buffers, unlink session from the linked list. - * Remove a session, either the other side closed the connection or we have otherwise reason to believe - * that it should better be killed. - * - * @param index index to the session handle - * @return - */ - - protected boolean destroySession( int index ) - { - assert(index>=0); - - synchronized(internal) { - if (index>=sessions.length || sessions[index]==null) { - log(Level.WARNING,label+" No session at slot "+index+"."); - return false; - } - - if (sessions[index].isConnected()) { - sessions[index].disconnect(); - } - - handler.handleDestroy(sessions[index]); - - sessions[index]=null; - sessionsOps[index]=0; - sessionCount--; - debug("Destroyed session at slot #"+index+" "+Utils.gauge(sessionCount,sessions.length)+"."); - } - return true; - } -} - - -/* - public void add( int id, Class c, CommandHandler h ) - { - decoder.add(id,c); - setHandler(c,h); - } - - public void setDefault( int id, Class c, CommandHandler h ) - { - decoder.add(id,c); - setHandler(c,h); - decoder.setDefault(id); - } - - public void setCorrupted( int id, Class c, CommandHandler h ) - { - decoder.add(id,c); - setHandler(c,h); - decoder.setCorrupted(id); - } - - public boolean hasHandler( Class c ) - { - synchronized(handlers) { - return handlers.get(c)!=null; - } - } - - public CommandHandler getHandler( Class c ) - { - synchronized(handlers) { - return (CommandHandler) handlers.get(c); - } - } - - public boolean setHandler( Class c, CommandHandler h ) - { - synchronized(handlers) { - if (handlers.get(c)!=null) { - log(Level.WARNING,"Could not assign handler, class "+c.getName()+" is already registered."); - return false; - } - handlers.put(c,h); - return true; - } - } - - public boolean removeHandler( Class c, CommandHandler h ) - { - synchronized(handlers) { - if (handlers.get(c)==null) { - log(Level.WARNING,"Could not remove handler, class "+c.getName()+" is not registered."); - return false; - } - handlers.remove(c); - return true; - } - } - - -deriver de TCP Server: - public void start() - { - add(ProxyCommand.HELLO_ID,ProxyHello.class,this); - add(ProxyCommand.CONNECT_ID,ProxyConnect.class,this); - add(ProxyCommand.SETBLOCKING_ID,ProxySetBlocking.class,this); - - setDefault(ProxyCommand.UNKNOWN_ID,ProxyUnknown.class,this); - setCorrupted(ProxyCommand.CORRUPTED_ID,ProxyCorrupted.class,this); - - super.start(); - } - - public void stop() - { - super.stop(); - } - - public TCP Session createSession( PersistentDecoder decoder ) - { - return new ProxySession(decoder); - } - - public boolean handle( TCP Session session, Persistent p ) - { - ProxySession s; - ProxyToken token; - int id; - - id=((ProxyCommand) p).getID(); - - s=(ProxySession) session; - if (!s.isWelcomed()) { - if (id!=ProxyCommand.HELLO_ID) { - log("No hello received, close session."); - return false; - } - s.welcome(); - - token=new ProxyToken(s.getToken()); - token.setAddress("",0); - sendToClient(s,token); - return true; - } - - switch (id) { - case ProxyCommand.CONNECT_ID: - break; - - case ProxyCommand.SETBLOCKING_ID: - break; - - default: - log("Unknown message : "+p); - return false; - } - return true; - } - -*/ Deleted: freeway/src/org/gnu/freeway/util/net/TCPSession.java =================================================================== --- freeway/src/org/gnu/freeway/util/net/TCPSession.java 2006-06-28 04:24:55 UTC (rev 3071) +++ freeway/src/org/gnu/freeway/util/net/TCPSession.java 2006-06-28 04:45:03 UTC (rev 3072) @@ -1,410 +0,0 @@ -/** - * @PROJECT_INFO@ - */ - -package org.gnu.freeway.util.net; - -import org.gnu.freeway.util.*; - -import java.net.*; -import java.nio.channels.*; -import java.util.logging.*; - -/** - * Per-client data structure (kept in linked list). Also: the opaque - * handle for client connections passed by the core to the CSHandlers. - * Opaque handle for client connections passed by - * the core to the CSHandlers. - * - * A connection to a freeway client application. To be used in non-blocking mode. - * - * Struct to refer to a GNUnet TCP connection. - * This is more than just a socket because if the server - * drops the connection, the client automatically tries - * to reconnect (and for that needs connection information). - * - * Code for synchronized access to TCP streams - * - * Generic TCP code for reliable, mostly blocking, record-oriented TCP - * connections. GNUnet uses the "tcpio" code for trusted client-server - * (e.g. gnunet-gtk to gnunetd via loopback) communications. Note - * that an unblocking write is also provided since if both client and - * server use blocking IO, both may block on a write and cause a - * mutual inter-process deadlock. - * - * Since we do not want other peers (!) to be able to block a peer by - * not reading from the TCP stream, the peer-to-peer TCP transport - * uses unreliable, buffered, non-blocking, record-oriented TCP code - * with a select call to reduce the number of threads which is - * provided in transports/tcp.c. - * Generic TCP code. This module is used to receive or send records - * (!) from a TCP stream. The code automatically attempts to - * re-connect if the other side closes the connection.<br> - * - * The code can be used on the server- or the client side, just in - * case of the server the reconnect can of course not be used. The TCP - * stream is broken into records of maximum length MAX_BUFFER_SIZE, - * each preceeded by a 16 bits integer (not signed) giving the length of the - * following record.<p> - */ - -public class TCPSession extends LoggedObject implements CSSession -{ - /** */ - private TCPServer server; - - /** Socket to communicate with the other side. */ - private PersistentSocket socket; - - /** */ - private String label; - - /** Lock used to synchronized read operations. */ - protected Object readLock; - - /** Lock used to synchronized write operations. */ - protected Object writeLock; - - - public TCPSession() - { - this(null); - } - - public TCPSession( TCPServer s ) - { - super(true); - server=s; - socket=new PersistentSocket(); - socket.setDebug(false); - label=socket.getLabel(); - readLock=new Object(); - writeLock=new Object(); - } - - public String toString() - { - return "Client/server session"; - } - - - //////////////////////////////////////////////////////////////////////////////////////////////// - - public String getLabel() - { - return label; - } - - public int getOps() - { - synchronized(readLock) { - synchronized(writeLock) { - return (socket.shouldWrite() ? (SelectionKey.OP_READ | SelectionKey.OP_WRITE) : SelectionKey.OP_READ); - } - } - } - - public SelectionKey registerOps( Selector sel, int ops ) - { - SelectionKey key; - - synchronized(readLock) { - synchronized(writeLock) { - key=socket.getChannel().keyFor(sel); - if (key==null) { - try { - key=socket.getChannel().register(sel,0); - } - catch( ClosedChannelException x ) { - err(label+" Failed to register on selector !",x); - return null; - } - } - key.interestOps(ops); - return key; - } - } - } - - public boolean isConnected() - { - synchronized(readLock) { - synchronized(writeLock) { - return !socket.isClosed(); - } - } - } - - /** - * Connect this session to the specified ip and port in *blocking mode*. - * Used when connecting to a server at {ip,port}. - * - * @param ip IP of the host to connect to. - * @param port The port number. - * @param careful Should we treat socket with respect (SO_LINGER not set) ? - * @return True if successful, false on failure. - */ - - public boolean connect( InetAddress ip, int port, boolean careful ) - { - boolean res; - - synchronized(readLock) { - synchronized(writeLock) { - res=socket.open(ip,port,careful); - if (res) { - label=socket.getLabel(); - debug(label+" Connected."); - } - return res; - } - } - } - - /** - * Connect this session to the specified channel in *non blocking* mode. - * Used when connecting to a client from a server. - * - * @param channel The open client socket. - * @param careful Should we treat socket with respect (SO_LINGER not set) ? - * @return True if successful, false on failure. - */ - - public boolean connect( SocketChannel channel, boolean careful ) - { - boolean res; - - synchronized(readLock) { - synchronized(writeLock) { - res=socket.open(channel,careful); - if (res) { - label=socket.getLabel(); - debug(label+" Connected."); - } - return res; - } - } - } - - /** - * Close the session. - * - * @return True if succeedeed, false otherwise. - */ - - public boolean disconnect() - { - boolean res; - - synchronized(readLock) { - synchronized(writeLock) { - if (socket.isClosed()) { - log(Level.WARNING,"Session is already closed."); - return false; - } - - res=socket.close(); - if (res) { - debug(label+" Disconnected."); - } - return res; - } - } - } - - public boolean isBlocking() - { - synchronized(readLock) { - synchronized(writeLock) { - return socket.isBlocking(); - } - } - } - - public void setBlocking( boolean flag ) - { - synchronized(readLock) { - synchronized(writeLock) { - socket.setBlocking(flag); - } - } - } - - /** - * Buffer data received from the other side. - * - * @return True if at least one byte has been received, false if the socket was closed by the other side or if an error occured. - */ - - public int doReceive() - { - int len; - - synchronized(readLock) { - len=socket.doRead(); - if (len>0) { - debug(label+" Have read "+len+" bytes."); - } - return len; - } - } - - public boolean hasReceived() - { - synchronized(readLock) { - return socket.shouldDequeue(); - } - } - - /** - * @param c - * @return - */ - - public Persistent receive( Class c ) - { - Persistent p; - - synchronized(readLock) { - if (!socket.shouldDequeue() && socket.isBlocking()) { - doReceive(); - } - - p=socket.dequeue(c); - if (p!=null) { - debug(label+" Received message : "+p+"."); - } - return p; - } - } - - /** - * Decode buffered data. If in blocking mode and no messages are buffered, an attempt is made to read fresh data. - * - * @param decoder Decoder used to transform transmitted data into messages. - * @return Any decoded data if available, null otherwise. - */ - - public Persistent receive( PersistentDecoder decoder ) - { - Persistent p; - - synchronized(readLock) { - if (!socket.shouldDequeue() && socket.isBlocking()) { - doReceive(); - } - - p=socket.dequeue(decoder); - if (p!=null) { - debug(label+" Received message : "+p+"."); - } - return p; - } - } - - /** - * Add data to the buffer, and if blocking, start transferring buffered data. - * - * <div>When in blocking mode, try to also send buffered data to the other side. Returns true if, at least, one byte - * has been transmitted. Please note that it does *not* imply that any part of the data <code>p</code> has been transmitted, - * since other data may had been buffered previously (transfer is initiated but may be incomplete).</div> - * - * <div>In non-blocking mode, returns true. The actual transfer happens asynchronously.</div> - * - * @param p The data to write (duplicated, because may be buffered and stored a certain amount of time...). - * @return True if in non-blocking mode, or if at least one byte of buffered data has been transmitted, false otherwise. - */ - - public boolean send( Persistent p ) - { - p=PersistentHelper.copy(p); - - synchronized(writeLock) { - socket.enqueue(p); - debug(label+" Sent message : "+p+"."); - - if (server!=null) { - server.wakeUp(); - } - - return (socket.isBlocking() ? doSend()>0 : true); - } - } - - public boolean sendAndCheck( Persistent p ) - { - CSResult res; - - synchronized(writeLock) { - if (!send(p)) { - return false; - } - - res=(CSResult) receive(CSResult.class); - return (res!=null && res.isOkay()); - } - } - - /** - * Flush buffered data, buffer given data <code>p</code> and try to initiate transfer of this data. - * Note that it is possible that only a part of the message is sent. - * - * Returning true here means that at least a small part of the message has been transmitted, - * though it may be transmitted entirely a bit later. - * - * @param p The data to write (duplicated, because may be buffered and stored a certain amount of time...). - * @return False if an I/O error occurred, or if it did not transmit any byte of the message. Return true otherwise. - */ - - public boolean flushAndSend( Persistent p ) - { - boolean empty; - - p=PersistentHelper.copy(p); - - synchronized(writeLock) { - doSend(); - - empty=!socket.shouldWrite(); - - socket.enqueue(p); - debug(label+" Sent message : "+p+"."); - - if (server!=null) { - server.wakeUp(); - } - - return (empty ? doSend()>0 : false); - } - } - - public boolean hasToSend() - { - synchronized(writeLock) { - return socket.shouldWrite(); - } - } - - /** - * Send buffered data, if any. - * - * @return True if at least one byte has been transmitted, false otherwise - * (an error occured, the other side is not ready, or there is no data in buffer). - */ - - public int doSend() - { - int len; - - synchronized(writeLock) { - len=0; - if (socket.shouldWrite()) { - len=socket.doWrite(); - if (len>0) { - debug(label+" Have written "+len+" bytes."); - } - } - return len; - } - } -} _______________________________________________ GNUnet-SVN mailing list GNUnet-SVN@gnu.org http://lists.gnu.org/mailman/listinfo/gnunet-svn