Author: grothoff Date: 2006-07-28 17:51:47 -0700 (Fri, 28 Jul 2006) New Revision: 3150
Added: GNUnet/src/transports/udp_helper.c Modified: GNUnet/src/transports/Makefile.am GNUnet/src/transports/smtp.c GNUnet/src/transports/udp.c GNUnet/src/transports/udp6.c Log: udp Modified: GNUnet/src/transports/Makefile.am =================================================================== --- GNUnet/src/transports/Makefile.am 2006-07-29 00:22:42 UTC (rev 3149) +++ GNUnet/src/transports/Makefile.am 2006-07-29 00:51:47 UTC (rev 3150) @@ -15,7 +15,7 @@ libip.la if !MINGW - smtptransport = libgnunettransport_smtp.la +# smtptransport = libgnunettransport_smtp.la endif libip_la_SOURCES = \ @@ -28,23 +28,21 @@ libgnunettransport_tcp.la \ libgnunettransport_udp.la \ libgnunettransport_nat.la \ - $(v6transports) \ - libgnunettransport_http.la + $(v6transports) # libgnunettransport_http.la -libgnunettransport_smtp_la_SOURCES = smtp.c -libgnunettransport_smtp_la_LIBADD = \ - $(top_builddir)/src/util/libgnunetutil.la -libgnunettransport_smtp_la_LDFLAGS = \ - -export-dynamic -avoid-version -module +#libgnunettransport_smtp_la_SOURCES = smtp.c +#libgnunettransport_smtp_la_LIBADD = \ +# $(top_builddir)/src/util/libgnunetutil.la +#libgnunettransport_smtp_la_LDFLAGS = \ +# -export-dynamic -avoid-version -module +#libgnunettransport_http_la_SOURCES = http.c +#libgnunettransport_http_la_LIBADD = \ +# $(top_builddir)/src/util/libgnunetutil.la \ +# libip.la +#libgnunettransport_http_la_LDFLAGS = \ +# -export-dynamic -avoid-version -module -libgnunettransport_http_la_SOURCES = http.c -libgnunettransport_http_la_LIBADD = \ - $(top_builddir)/src/util/libgnunetutil.la \ - libip.la -libgnunettransport_http_la_LDFLAGS = \ - -export-dynamic -avoid-version -module - libgnunettransport_tcp_la_SOURCES = tcp.c libgnunettransport_tcp_la_LIBADD = \ $(top_builddir)/src/util/libgnunetutil.la \ Modified: GNUnet/src/transports/smtp.c =================================================================== --- GNUnet/src/transports/smtp.c 2006-07-29 00:22:42 UTC (rev 3149) +++ GNUnet/src/transports/smtp.c 2006-07-29 00:51:47 UTC (rev 3150) @@ -293,16 +293,20 @@ if (OK != GN_getHostByName(hostname, &ip)) { - LOG(LOG_ERROR, - _("Could not resolve name of SMTP server `%s': %s"), - hostname, hstrerror(h_errno)); + GE_LOG(ectx, + GE_ERROR, + _("Could not resolve name of SMTP server `%s': %s"), + hostname, + hstrerror(h_errno)); FREE(hostname); return -1; } FREE(hostname); res = SOCKET(PF_INET, SOCK_STREAM, 6);/* 6: TCP */ if (res == -1) { - LOG_STRERROR(LOG_FAILURE, "socket"); + GE_LOG_STRERROR(ectx, + GE_ERROR, + "socket"); return SYSERR; } SETSOCKOPT(res, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)); @@ -663,7 +667,8 @@ */ static int smtpSend(TSession * tsession, const void * message, - const unsigned int size) { + const unsigned int size, + int important) { char * msg; SMTPMessage * mp; P2P_hello_MESSAGE * helo; @@ -941,12 +946,10 @@ smtpAPI.createhello = &createhello; smtpAPI.connect = &smtpConnect; smtpAPI.send = &smtpSend; - smtpAPI.sendReliable = &smtpSend; /* is always blocking, so we can't really do better */ smtpAPI.associate = &smtpAssociate; smtpAPI.disconnect = &smtpDisconnect; smtpAPI.startTransportServer = &startTransportServer; smtpAPI.stopTransportServer = &stopTransportServer; - smtpAPI.reloadConfiguration = &reloadConfiguration; smtpAPI.addressToString = &addressToString; return &smtpAPI; Modified: GNUnet/src/transports/udp.c =================================================================== --- GNUnet/src/transports/udp.c 2006-07-29 00:22:42 UTC (rev 3149) +++ GNUnet/src/transports/udp.c 2006-07-29 00:51:47 UTC (rev 3150) @@ -33,6 +33,8 @@ #define DEBUG_UDP NO +#include "udp_helper.c" + /** * Host-Address in a UDP network. */ @@ -54,74 +56,15 @@ } HostAddress; -/** - * Message-Packet header. - */ -typedef struct { - /** - * this struct is *preceded* by MESSAGE_PARTs - until - * size-sizeof(UDPMessage)! - */ - - /** - * size of the message, in bytes, including this header. - */ - MESSAGE_HEADER header; - - /** - * What is the identity of the sender (hash of public key) - */ - PeerIdentity sender; - -} UDPMessage; - -/* *********** globals ************* */ - -/* apis (our advertised API and the core api ) */ -static CoreAPIForTransport * coreAPI; - -static TransportAPI udpAPI; - -static Stats_ServiceAPI * stats; - -static int stat_bytesReceived; - -static int stat_bytesSent; - -static int stat_bytesDropped; - -static struct GE_Context * ectx; - static struct GC_Configuration * cfg; static struct LoadMonitor * load_monitor; -/** - * thread that listens for inbound messages - */ -static struct SelectHandle * selector; - -/** - * the socket that we receive all data from - */ -static struct SocketHandle * udp_sock; - -/** - * configuration - */ static struct CIDRNetwork * filteredNetworks_; static struct MUTEX * configLock; /** - * Keep used port locally, the one in the configuration - * may change and then we would not be able to send - * the shutdown signal! - */ -static unsigned short port; - - -/** * Get the GNUnet UDP port from the configuration, or from * /etc/services if it is not specified in the config file. * @@ -196,9 +139,16 @@ /** * Check if we are explicitly forbidden to communicate with this IP. */ -static int isBlacklisted(IPaddr ip) { +static int isBlacklisted(const void * addr, + unsigned int len) { + IPaddr ip; int ret; + if (len != sizeof(IPaddr)) + return SYSERR; + memcpy(&ip, + addr, + sizeof(IPaddr)); MUTEX_LOCK(configLock); ret = check_ipv4_listed(filteredNetworks_, ip); @@ -207,68 +157,6 @@ } /** - * The socket of session has data waiting, process! - * - * This function may only be called if the tcplock is - * already held by the caller. - */ -static int select_message_handler(void * mh_cls, - struct SelectHandle * sh, - struct SocketHandle * sock, - void * sock_ctx, - const MESSAGE_HEADER * msg) { - unsigned int len; - P2P_PACKET * mp; - const UDPMessage * um; - - len = ntohs(msg->size); - if (len <= sizeof(UDPMessage)) { - GE_LOG(ectx, - GE_WARNING | GE_USER | GE_BULK, - _("Received malformed message from udp-peer connection. Closing.\n")); - return SYSERR; - } - um = (const UDPMessage*) msg; - mp = MALLOC(sizeof(P2P_PACKET)); - mp->msg = MALLOC(len - sizeof(UDPMessage)); - memcpy(mp->msg, - &um[1], - len - sizeof(UDPMessage)); - mp->sender = um->sender; - mp->size = len - sizeof(UDPMessage); - mp->tsession = NULL; - coreAPI->receive(mp); - if (stats != NULL) - stats->change(stat_bytesReceived, - len); - return OK; -} - - -static void * select_accept_handler(void * ah_cls, - struct SelectHandle * sh, - struct SocketHandle * sock, - const void * addr, - unsigned int addr_len) { - static int nonnullpointer; - return &nonnullpointer; -} - -/** - * Select has been forced to close a connection. - * Free the associated context. - */ -static void select_close_handler(void * ch_cls, - struct SelectHandle * sh, - struct SocketHandle * sock, - void * sock_ctx) { - /* do nothing */ -} - - -/* *************** API implementation *************** */ - -/** * Verify that a hello-Message is correct (a node is reachable at that * address). Since the reply will be asynchronous, a method must be * called on success. @@ -284,7 +172,8 @@ if ( (ntohs(helo->senderAddressSize) != sizeof(HostAddress)) || (ntohs(helo->header.size) != P2P_hello_MESSAGE_size(helo)) || (ntohs(helo->header.type) != p2p_PROTO_hello) || - (YES == isBlacklisted(haddr->senderIP)) ) + (YES == isBlacklisted(&haddr->senderIP, + sizeof(IPaddr))) ) return SYSERR; /* obviously invalid */ else { #if DEBUG_UDP @@ -308,8 +197,7 @@ P2P_hello_MESSAGE * msg; HostAddress * haddr; - if ( ( (selector == NULL) && (getGNUnetUDPPort() == 0) ) || - ( (selector != NULL) && (port == 0) ) ) + if (getGNUnetUDPPort() == 0) return NULL; /* UDP transport configured send-only */ msg = MALLOC(sizeof(P2P_hello_MESSAGE) + sizeof(HostAddress)); @@ -330,10 +218,7 @@ "UDP uses IP address %u.%u.%u.%u.\n", PRIP(ntohl(*(int*)&haddr->senderIP))); #endif - if (selector == NULL) - haddr->senderPort = htons(getGNUnetUDPPort()); - else - haddr->senderPort = htons(port); + haddr->senderPort = htons(getGNUnetUDPPort()); haddr->reserved = htons(0); msg->senderAddressSize = htons(sizeof(HostAddress)); msg->protocol = htons(UDP_PROTOCOL_NUMBER); @@ -342,49 +227,6 @@ } /** - * Establish a connection to a remote node. - * @param helo the hello-Message for the target node - * @param tsessionPtr the session handle that is to be set - * @return OK on success, SYSERR if the operation failed - */ -static int udpConnect(const P2P_hello_MESSAGE * helo, - TSession ** tsessionPtr) { - TSession * tsession; - HostAddress * haddr; - - tsession = MALLOC(sizeof(TSession)); - tsession->internal = MALLOC(P2P_hello_MESSAGE_size(helo)); - memcpy(tsession->internal, - helo, - P2P_hello_MESSAGE_size(helo)); - tsession->ttype = udpAPI.protocolNumber; - haddr = (HostAddress*) &helo[1]; -#if DEBUG_UDP - GE_LOG(ectx, GE_DEBUG | GE_USER | GE_BULK, - "Connecting via UDP to %u.%u.%u.%u:%u.\n", - PRIP(ntohl(*(int*)&haddr->senderIP.addr)), - ntohs(haddr->senderPort)); -#endif - (*tsessionPtr) = tsession; - return OK; -} - -/** - * A (core) Session is to be associated with a transport session. The - * transport service may want to know in order to call back on the - * core if the connection is being closed. - * - * @param tsession the session handle passed along - * from the call to receive that was made by the transport - * layer - * @return OK if the session could be associated, - * SYSERR if not. - */ -int udpAssociate(TSession * tsession) { - return SYSERR; /* UDP connections can never be associated */ -} - -/** * Send a message to the specified remote node. * * @param tsession the P2P_hello_MESSAGE identifying the remote node @@ -475,27 +317,13 @@ } /** - * Disconnect from a remote node. - * - * @param tsession the session that is closed - * @return OK on success, SYSERR if the operation failed - */ -static int udpDisconnect(TSession * tsession) { - if (tsession != NULL) { - if (tsession->internal != NULL) - FREE(tsession->internal); - FREE(tsession); - } - return OK; -} - -/** * Start the server process to receive inbound traffic. * * @return OK on success, SYSERR if the operation failed */ static int startTransportServer(void) { int sock; + unsigned short port; GE_ASSERT(ectx, selector == NULL); /* initialize UDP network */ @@ -535,21 +363,6 @@ } /** - * Shutdown the server process (stop receiving inbound traffic). Maybe - * restarted later! - */ -static int stopTransportServer() { - GE_ASSERT(ectx, udp_sock != NULL); - if (selector != NULL) { - select_destroy(selector); - selector = NULL; - } - socket_destroy(udp_sock); - udp_sock = NULL; - return OK; -} - -/** * Reload the configuration. Should never fail. */ static void reloadConfiguration() { @@ -613,7 +426,7 @@ if (-1 == GC_get_configuration_value_number(cfg, "UDP", "MTU", - sizeof(UDPMessage) + P2P_MESSAGE_OVERHEAD + sizeof(MESSAGE_HEADER) + 4, + sizeof(UDPMessage) + P2P_MESSAGE_OVERHEAD + sizeof(MESSAGE_HEADER) + 32, 65500, MESSAGE_SIZE, &mtu)) { Modified: GNUnet/src/transports/udp6.c =================================================================== --- GNUnet/src/transports/udp6.c 2006-07-29 00:22:42 UTC (rev 3149) +++ GNUnet/src/transports/udp6.c 2006-07-29 00:51:47 UTC (rev 3150) @@ -27,11 +27,14 @@ #include "gnunet_util.h" #include "gnunet_protocols.h" #include "gnunet_transport.h" +#include "gnunet_stats_service.h" #include "platform.h" #include "ip6.h" #define DEBUG_UDP6 NO +#include "udp_helper.c" + /** * Host-Address in a UDP6 network. */ @@ -53,63 +56,17 @@ } Host6Address; -/** - * Message-Packet header. - */ -typedef struct { - /** - * this struct is *preceded* by MESSAGE_PARTs - until - * size-sizeof(UDP6Message)! - */ - - /** - * size of the message, in bytes, including this header; max - * 65536-header (network byte order) - */ - unsigned short size; - - /** - * Reserved for alignment, always 0. - */ - unsigned short reserved; - - /** - * What is the identity of the sender (hash of public key) - */ - PeerIdentity sender; - -} UDP6Message; - /* *********** globals ************* */ -/* apis (our advertised API and the core api ) */ -static CoreAPIForTransport * coreAPI; -static TransportAPI udp6API; +static struct GC_Configuration * cfg; -/** - * thread that listens for inbound messages - */ -static PTHREAD_T dispatchThread; +static struct LoadMonitor * load_monitor; -/** - * the socket that we receive all data from - */ -static int udp6_sock; +static struct CIDR6Network * filteredNetworks_ = NULL; -/** - * Semaphore for communication with the - * udp6 server thread. - */ -static Semaphore * serverSignal; -static int udp6_shutdown = YES; +static struct MUTEX * configLock; /** - * configuration - */ -static struct CIDR6Network * filteredNetworks_ = NULL; -static Mutex configLock; - -/** * Get the GNUnet UDP6 port from the configuration, or from * /etc/services if it is not specified in the config file. * @@ -117,24 +74,21 @@ */ static unsigned short getGNUnetUDP6Port() { struct servent * pse; /* pointer to service information entry */ - unsigned short port; + unsigned long long port; - port = (unsigned short) getConfigurationInt("UDP6", - "PORT"); - if (port == 0) { /* try lookup in services */ - if ((pse = getservbyname("gnunet", "udp6"))) - port = ntohs(pse->s_port); + if (-1 == GC_get_configuration_value_number(cfg, + "UDP", + "PORT", + 1, + 65535, + 2086, + &port)) { + if ((pse = getservbyname("gnunet", "udp"))) + port = htons(pse->s_port); else - errexit(_("Cannot determine port to bind to. " - " Define in configuration file in section `%s' under `%s' " - "or in `%s' under %s/%s.\n"), - "UDP6", - "PORT", - "/etc/services", - "udp6", - "gnunet"); + port = 0; } - return port; + return (unsigned short) port; } /** @@ -149,9 +103,13 @@ SOCK_DGRAM, UDP_PROTOCOL_NUMBER); if (sock < 0) - DIE_STRERROR("socket"); + GE_DIE_STRERROR(ectx, + GE_FATAL | GE_ADMIN | GE_IMMEDIATE, + "socket"); if ( SETSOCKOPT(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0 ) - DIE_STRERROR("setsockopt"); + GE_DIE_STRERROR(ectx, + GE_FATAL | GE_ADMIN | GE_IMMEDIATE, + "setsockopt"); if (port != 0) { memset(&sin, 0, sizeof(sin)); sin.sin6_family = AF_INET6; @@ -160,201 +118,41 @@ &in6addr_any, sizeof(IP6addr)); if (BIND(sock, (struct sockaddr *)&sin, sizeof(sin)) < 0) { - LOG_STRERROR(LOG_FATAL, "bind"); - errexit(_("Failed to bind to UDP6 port %d.\n"), - port); + GE_LOG_STRERROR(ectx, + GE_FATAL | GE_ADMIN | GE_IMMEDIATE, + "bind"); + GE_LOG(ectx, + GE_FATAL | GE_ADMIN | GE_IMMEDIATE, + _("Failed to bind to UDP port %d.\n"), + port); + GE_DIE_STRERROR(ectx, + GE_FATAL | GE_USER | GE_IMMEDIATE, + "bind"); } - } /* do not bind if port == 0, then we use - send-only! */ + } /* do not bind if port == 0, then we use send-only! */ return sock; } /** * Check if we are explicitly forbidden to communicate with this IP. */ -static int isBlacklisted(IP6addr * ip) { +static int isBlacklisted(const void * addr, + unsigned int len) { + IP6addr ip; int ret; - MUTEX_LOCK(&configLock); - ret = checkIP6Listed(filteredNetworks_, - ip); - MUTEX_UNLOCK(&configLock); + if (len != sizeof(IP6addr)) + return SYSERR; + memcpy(&ip, + addr, + sizeof(IP6addr)); + MUTEX_LOCK(configLock); + ret = check_ipv6_listed(filteredNetworks_, + ip); + MUTEX_UNLOCK(configLock); return ret; } -/** - * Listen on the given socket and distribute the packets to the UDP6 - * handler. - */ -static void * listenAndDistribute() { - struct sockaddr_in6 incoming; - socklen_t addrlen = sizeof(incoming); - int size; - EncName enc; - P2P_PACKET * mp; - UDP6Message udp6m; - char inet6[INET6_ADDRSTRLEN]; - int error; - int pending; - int ret; - fd_set readSet; - fd_set errorSet; - fd_set writeSet; - - SEMAPHORE_UP(serverSignal); - while (udp6_shutdown == NO) { - FD_ZERO(&readSet); - FD_ZERO(&writeSet); - FD_ZERO(&errorSet); - FD_SET(udp6_sock, &readSet); - ret = SELECT(udp6_sock + 1, &readSet, &writeSet, &errorSet, NULL); - if (ret == -1) { - if (udp6_shutdown == YES) - break; - if (errno == EINTR) - continue; - DIE_STRERROR("select"); - } - if (! FD_ISSET(udp6_sock, &readSet)) - continue; - pending = 0; - /* @todo FIXME in PlibC */ -#ifdef MINGW - error = ioctlsocket(udp_sock, -#else - error = ioctl(udp6_sock, -#endif - FIONREAD, - &pending); - if (error != 0) { - LOG_STRERROR(LOG_ERROR, "ioctl"); - continue; - } - if (pending <= 0) { - LOG(LOG_WARNING, - _("UDP6: select returned, but ioctl reports %d bytes available!\n"), - pending); - if (pending == 0) { - /* maybe empty UDP packet was sent (see report on bug-gnunet, - 5/11/6; read 0 bytes from UDP just to kill potential empty packet! */ - memset(&incoming, - 0, - sizeof(struct sockaddr_in6)); - RECVFROM(udp6_sock, - NULL, - 0, - 0, - (struct sockaddr * )&incoming, - &addrlen); - } - continue; - } - if (pending >= 65536) { - BREAK(); - continue; - } - mp = MALLOC(sizeof(P2P_PACKET)); - mp->msg = MALLOC(pending); - memset(&incoming, - 0, - sizeof(struct sockaddr_in6)); - if (udp6_shutdown == YES) { - FREE(mp->msg); - FREE(mp); - break; - } - size = RECVFROM(udp6_sock, - mp->msg, - pending, - 0, - (struct sockaddr * )&incoming, - &addrlen); - if ( (size < 0) || - (udp6_shutdown == YES) ) { - FREE(mp->msg); - FREE(mp); - if (udp6_shutdown == NO) { - if ( (errno == EINTR) || - (errno == EAGAIN) || - (errno == ECONNREFUSED) ) { - continue; - } - } - if (udp6_shutdown == NO) - LOG_STRERROR(LOG_ERROR, "recvfrom"); - break; /* die/shutdown */ - } - incrementBytesReceived(size); - if ((unsigned int)size <= sizeof(UDP6Message)) { - LOG(LOG_INFO, - _("Received invalid UDP6 message from %s:%d, dropping.\n"), - inet_ntop(AF_INET6, - &incoming, - inet6, - INET6_ADDRSTRLEN), - ntohs(incoming.sin6_port)); - FREE(mp->msg); - FREE(mp); - continue; - } - memcpy(&udp6m, - &((char*)mp->msg)[size - sizeof(UDP6Message)], - sizeof(UDP6Message)); - - IFLOG(LOG_DEBUG, - hash2enc(&udp6m.sender.hashPubKey, - &enc)); -#if DEBUG_UDP6 - LOG(LOG_DEBUG, - "Received %d bytes via UDP6 from %s:%d (%s).\n", - size, - inet_ntop(AF_INET6, - &incoming, - inet6, - INET6_ADDRSTRLEN), - ntohs(incoming.sin6_port), - &enc); -#endif - /* quick test of the packet, if failed, repeat! */ - if (size != ntohs(udp6m.size)) { - LOG(LOG_WARNING, - _("Packet received from %s:%d (UDP6) failed format check."), - inet_ntop(AF_INET6, - &incoming, - inet6, - INET6_ADDRSTRLEN), - ntohs(incoming.sin6_port)); - FREE(mp->msg); - FREE(mp); - continue; - } - GNUNET_ASSERT(sizeof(struct in6_addr) == sizeof(IP6addr)); - if (YES == isBlacklisted((IP6addr*)&incoming.sin6_addr)) { - LOG(LOG_WARNING, - _("%s: Rejected connection from blacklisted address %s.\n"), - "UDP6", - inet_ntop(AF_INET6, - &incoming, - inet6, - INET6_ADDRSTRLEN)); - FREE(mp->msg); - FREE(mp); - continue; - } - /* message ok, fill in mp and pass to core */ - mp->tsession = NULL; - mp->size = ntohs(udp6m.size) - sizeof(UDP6Message); - memcpy(&mp->sender, - &udp6m.sender, - sizeof(PeerIdentity)); - coreAPI->receive(mp); - } - /* shutdown */ - SEMAPHORE_UP(serverSignal); - return NULL; -} - - /* *************** API implementation *************** */ /** @@ -373,7 +171,8 @@ if ( (ntohs(helo->senderAddressSize) != sizeof(Host6Address)) || (ntohs(helo->header.size) != P2P_hello_MESSAGE_size(helo)) || (ntohs(helo->header.type) != p2p_PROTO_hello) || - (YES == isBlacklisted(&haddr->senderIP)) ) + (YES == isBlacklisted(&haddr->senderIP, + sizeof(IP6addr))) ) return SYSERR; /* obviously invalid */ else { #if DEBUG_UDP6 @@ -409,72 +208,24 @@ msg = MALLOC(sizeof(P2P_hello_MESSAGE) + sizeof(Host6Address)); haddr = (Host6Address*) &msg[1]; - if (SYSERR == getPublicIP6Address(&haddr->senderIP)) { + if (SYSERR == getPublicIP6Address(cfg, + ectx, + &haddr->senderIP)) { FREE(msg); - LOG(LOG_WARNING, - _("UDP6: Could not determine my public IPv6 address.\n")); + GE_LOG(ectx, + GE_WARNING, + _("UDP6: Could not determine my public IPv6 address.\n")); return NULL; } haddr->senderPort = htons(port); haddr->reserved = htons(0); msg->senderAddressSize = htons(sizeof(Host6Address)); msg->protocol = htons(UDP6_PROTOCOL_NUMBER); - msg->MTU = htonl(udp6API.mtu); + msg->MTU = htonl(udpAPI.mtu); return msg; } /** - * Establish a connection to a remote node. - * @param helo the hello-Message for the target node - * @param tsessionPtr the session handle that is to be set - * @return OK on success, SYSERR if the operation failed - */ -static int udp6Connect(const P2P_hello_MESSAGE * helo, - TSession ** tsessionPtr) { - TSession * tsession; - Host6Address * haddr; -#if DEBUG_UDP6 - char * tmp; -#endif - - tsession = MALLOC(sizeof(TSession)); - tsession->internal = MALLOC(P2P_hello_MESSAGE_size(helo)); - memcpy(tsession->internal, - helo, - P2P_hello_MESSAGE_size(helo)); - tsession->ttype = udp6API.protocolNumber; - haddr = (Host6Address*) &helo[1]; -#if DEBUG_UDP6 - tmp = MALLOC(INET6_ADDRSTRLEN); - LOG(LOG_DEBUG, - "Connecting via UDP6 to %s:%d.\n", - inet_ntop(AF_INET6, - &haddr->senderIP, - tmp, - INET6_ADDRSTRLEN), - ntohs(haddr->senderPort)); - FREE(tmp); -#endif - (*tsessionPtr) = tsession; - return OK; -} - -/** - * A (core) Session is to be associated with a transport session. The - * transport service may want to know in order to call back on the - * core if the connection is being closed. - * - * @param tsession the session handle passed along - * from the call to receive that was made by the transport - * layer - * @return OK if the session could be associated, - * SYSERR if not. - */ -int udp6Associate(TSession * tsession) { - return SYSERR; /* UDP6 connections can never be associated */ -} - -/** * Send a message to the specified remote node. * * @param tsession the P2P_hello_MESSAGE identifying the remote node @@ -484,26 +235,27 @@ */ static int udp6Send(TSession * tsession, const void * message, - const unsigned int size) { + const unsigned int size, + int importance) { char * msg; - UDP6Message mp; + UDPMessage mp; P2P_hello_MESSAGE * helo; Host6Address * haddr; struct sockaddr_in6 sin; /* an Internet endpoint address */ int ok; - int ssize; + size_t ssize; #if DEBUG_UDP6 char inet6[INET6_ADDRSTRLEN]; #endif - if (udp6_shutdown == YES) + if (udp_sock == NULL) return SYSERR; if (size == 0) { - BREAK(); + GE_BREAK(ectx, 0); return SYSERR; } - if (size > udp6API.mtu) { - BREAK(); + if (size > udpAPI.mtu) { + GE_BREAK(ectx, 0); return SYSERR; } helo = (P2P_hello_MESSAGE*)tsession->internal; @@ -511,14 +263,14 @@ return SYSERR; haddr = (Host6Address*) &helo[1]; - ssize = size + sizeof(UDP6Message); + ssize = size + sizeof(UDPMessage); msg = MALLOC(ssize); - mp.size = htons(ssize); - mp.reserved = 0; + mp.header.size = htons(ssize); + mp.header.type = 0; mp.sender = *coreAPI->myIdentity; memcpy(&msg[size], &mp, - sizeof(UDP6Message)); + sizeof(UDPMessage)); memcpy(msg, message, size); @@ -530,106 +282,81 @@ &haddr->senderIP.addr, sizeof(IP6addr)); #if DEBUG_UDP6 - LOG(LOG_DEBUG, - "Sending message of %d bytes via UDP6 to %s:%d..\n", - ssize, - inet_ntop(AF_INET6, - &sin, - inet6, - INET6_ADDRSTRLEN), - ntohs(sin.sin_port)); + GE_LOG(ectx, + GE_DEBUG, + "Sending message of %u bytes via UDP6 to %s:%d..\n", + ssize, + inet_ntop(AF_INET6, + &sin, + inet6, + INET6_ADDRSTRLEN), + ntohs(sin.sin_port)); #endif - if (ssize == SENDTO(udp6_sock, - msg, - ssize, - 0, /* no flags */ - (struct sockaddr*) &sin, - sizeof(sin))) { + if (YES == socket_send_to(udp_sock, + NC_Nonblocking, + msg, + ssize, + &ssize, + (const char*) &sin, + sizeof(sin))) { ok = OK; + if (stats != NULL) + stats->change(stat_bytesSent, + ssize); } else { - LOG_STRERROR(LOG_WARNING, "sendto"); + GE_LOG_STRERROR(ectx, + GE_WARNING, + "sendto"); + if (stats != NULL) + stats->change(stat_bytesDropped, + ssize); } - incrementBytesSent(ssize); FREE(msg); return ok; } /** - * Disconnect from a remote node. - * - * @param tsession the session that is closed - * @return OK on success, SYSERR if the operation failed - */ -static int udp6Disconnect(TSession * tsession) { - if (tsession != NULL) { - if (tsession->internal != NULL) - FREE(tsession->internal); - FREE(tsession); - } - return OK; -} - -/** * Start the server process to receive inbound traffic. * * @return OK on success, SYSERR if the operation failed */ static int startTransportServer(void) { + int sock; unsigned short port; /* initialize UDP6 network */ port = getGNUnetUDP6Port(); - udp6_sock = passivesock(port); if (port != 0) { - udp6_shutdown = NO; - serverSignal = SEMAPHORE_NEW(0); - if (0 != PTHREAD_CREATE(&dispatchThread, - (PThreadMain) &listenAndDistribute, - NULL, - 4*1024)) + sock = passivesock(port); + if (sock == -1) return SYSERR; - SEMAPHORE_DOWN(serverSignal); - } else - memset(&dispatchThread, - 0, - sizeof(PTHREAD_T)); /* zero-out */ - return OK; -} - -/** - * Shutdown the server process (stop receiving inbound traffic). Maybe - * restarted later! - */ -static int stopTransportServer() { - if (udp6_shutdown == NO) { - /* stop the thread, first set shutdown - to YES, then ensure that the thread - actually sees the flag by sending - a dummy message of 1 char */ - udp6_shutdown = YES; - if (serverSignal != NULL) { - char msg = '\0'; - struct sockaddr_in sin; - void * unused; - - /* send to loopback */ - sin.sin_family = AF_INET; - sin.sin_port = htons(getGNUnetUDP6Port()); - *(int*)&sin.sin_addr = htonl(0x7F000001); /* 127.0.0.1 = localhost */ - SENDTO(udp6_sock, - &msg, - sizeof(msg), - 0, /* no flags */ - (struct sockaddr*) &sin, - sizeof(sin)); - PTHREAD_KILL(&dispatchThread, SIGALRM); /* sometimes LO is firewalled, try alternative */ - SEMAPHORE_DOWN(serverSignal); - SEMAPHORE_FREE(serverSignal); - PTHREAD_JOIN(&dispatchThread, &unused); - } + selector = select_create(ectx, + load_monitor, + sock, + sizeof(IPaddr), + 0, /* timeout */ + &select_message_handler, + NULL, + &select_accept_handler, + NULL, + &select_close_handler, + NULL, + 0 /* memory quota */ ); + if (selector == NULL) + return SYSERR; } - closefile(udp6_sock); - udp6_sock = -1; + sock = SOCKET(PF_INET, SOCK_DGRAM, UDP_PROTOCOL_NUMBER); + if (sock == -1) { + GE_LOG_STRERROR(ectx, + GE_ERROR | GE_ADMIN | GE_BULK, + "socket"); + select_destroy(selector); + selector = NULL; + return SYSERR; + } + udp_sock = socket_create(ectx, + load_monitor, + sock); return OK; } @@ -639,17 +366,21 @@ static void reloadConfiguration(void) { char * ch; - MUTEX_LOCK(&configLock); + MUTEX_LOCK(configLock); FREENONNULL(filteredNetworks_); - ch = getConfigurationString("UDP6", - "BLACKLIST"); - if (ch == NULL) - filteredNetworks_ = parseRoutes6(""); + if (0 != GC_get_configuration_value_string(cfg, + "UDP", + "BLACKLIST", + NULL, + &ch)) + filteredNetworks_ = parse_ipv6_network_specification(ectx, + ""); else { - filteredNetworks_ = parseRoutes6(ch); + filteredNetworks_ = parse_ipv6_network_specification(ectx, + ch); FREE(ch); } - MUTEX_UNLOCK(&configLock); + MUTEX_UNLOCK(configLock); } /** @@ -684,41 +415,46 @@ * returns the udp6 transport API. */ TransportAPI * inittransport_udp6(CoreAPIForTransport * core) { - int mtu; + unsigned long long mtu; - GNUNET_ASSERT(sizeof(UDP6Message) == 68); + GE_ASSERT(ectx, sizeof(UDPMessage) == 68); coreAPI = core; - MUTEX_CREATE(&configLock); + configLock = MUTEX_CREATE(NO); reloadConfiguration(); - mtu = getConfigurationInt("UDP6", - "MTU"); - if (mtu == 0) - mtu = MESSAGE_SIZE; + if (-1 == GC_get_configuration_value_number(cfg, + "UDP", + "MTU", + sizeof(UDPMessage) + P2P_MESSAGE_OVERHEAD + sizeof(MESSAGE_HEADER) + 32, + 65500, + MESSAGE_SIZE, + &mtu)) { + return NULL; + } if (mtu < 1200) - LOG(LOG_ERROR, - _("MTU for `%s' is probably too low (fragmentation not implemented!)\n"), - "UDP6"); + GE_LOG(ectx, + GE_ERROR | GE_USER | GE_IMMEDIATE, + _("MTU %llu for `%s' is probably too low!\n"), + mtu, + "UDP6"); - udp6API.protocolNumber = UDP6_PROTOCOL_NUMBER; - udp6API.mtu = mtu - sizeof(UDP6Message); - udp6API.cost = 19950; - udp6API.verifyHelo = &verifyHelo; - udp6API.createhello = &createhello; - udp6API.connect = &udp6Connect; - udp6API.send = &udp6Send; - udp6API.sendReliable = &udp6Send; /* can't increase reliability */ - udp6API.associate = &udp6Associate; - udp6API.disconnect = &udp6Disconnect; - udp6API.startTransportServer = &startTransportServer; - udp6API.stopTransportServer = &stopTransportServer; - udp6API.reloadConfiguration = &reloadConfiguration; - udp6API.addressToString = &addressToString; + udpAPI.protocolNumber = UDP6_PROTOCOL_NUMBER; + udpAPI.mtu = mtu - sizeof(UDPMessage); + udpAPI.cost = 19950; + udpAPI.verifyHelo = &verifyHelo; + udpAPI.createhello = &createhello; + udpAPI.connect = &udpConnect; + udpAPI.send = &udp6Send; + udpAPI.associate = &udpAssociate; + udpAPI.disconnect = &udpDisconnect; + udpAPI.startTransportServer = &startTransportServer; + udpAPI.stopTransportServer = &stopTransportServer; + udpAPI.addressToString = &addressToString; - return &udp6API; + return &udpAPI; } void donetransport_udp6() { - MUTEX_DESTROY(&configLock); + MUTEX_DESTROY(configLock); FREENONNULL(filteredNetworks_); } Added: GNUnet/src/transports/udp_helper.c =================================================================== --- GNUnet/src/transports/udp_helper.c 2006-07-29 00:22:42 UTC (rev 3149) +++ GNUnet/src/transports/udp_helper.c 2006-07-29 00:51:47 UTC (rev 3150) @@ -0,0 +1,197 @@ +/* + This file is part of GNUnet + (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 2, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file transports/udp_helper.c + * @brief common code for UDP transports + * @author Christian Grothoff + */ + +/** + * Message-Packet header. + */ +typedef struct { + /** + * this struct is *preceded* by MESSAGE_PARTs - until + * size-sizeof(UDPMessage)! + */ + + /** + * size of the message, in bytes, including this header. + */ + MESSAGE_HEADER header; + + /** + * What is the identity of the sender (hash of public key) + */ + PeerIdentity sender; + +} UDPMessage; + +/* *********** globals ************* */ + +static CoreAPIForTransport * coreAPI; + +static TransportAPI udpAPI; + +static Stats_ServiceAPI * stats; + +static int stat_bytesReceived; + +static int stat_bytesSent; + +static int stat_bytesDropped; + +static struct GE_Context * ectx; + +/** + * thread that listens for inbound messages + */ +static struct SelectHandle * selector; + +/** + * the socket that we transmit all data with + */ +static struct SocketHandle * udp_sock; + +/** + * The socket of session has data waiting, process! + * + * This function may only be called if the tcplock is + * already held by the caller. + */ +static int select_message_handler(void * mh_cls, + struct SelectHandle * sh, + struct SocketHandle * sock, + void * sock_ctx, + const MESSAGE_HEADER * msg) { + unsigned int len; + P2P_PACKET * mp; + const UDPMessage * um; + + len = ntohs(msg->size); + if (len <= sizeof(UDPMessage)) { + GE_LOG(ectx, + GE_WARNING | GE_USER | GE_BULK, + _("Received malformed message from udp-peer connection. Closing.\n")); + return SYSERR; + } + um = (const UDPMessage*) msg; + mp = MALLOC(sizeof(P2P_PACKET)); + mp->msg = MALLOC(len - sizeof(UDPMessage)); + memcpy(mp->msg, + &um[1], + len - sizeof(UDPMessage)); + mp->sender = um->sender; + mp->size = len - sizeof(UDPMessage); + mp->tsession = NULL; + coreAPI->receive(mp); + if (stats != NULL) + stats->change(stat_bytesReceived, + len); + return OK; +} + +static void * select_accept_handler(void * ah_cls, + struct SelectHandle * sh, + struct SocketHandle * sock, + const void * addr, + unsigned int addr_len) { + static int nonnullpointer; + return &nonnullpointer; +} + +/** + * Select has been forced to close a connection. + * Free the associated context. + */ +static void select_close_handler(void * ch_cls, + struct SelectHandle * sh, + struct SocketHandle * sock, + void * sock_ctx) { + /* do nothing */ +} + +/** + * Establish a connection to a remote node. + * @param helo the hello-Message for the target node + * @param tsessionPtr the session handle that is to be set + * @return OK on success, SYSERR if the operation failed + */ +static int udpConnect(const P2P_hello_MESSAGE * helo, + TSession ** tsessionPtr) { + TSession * tsession; + + tsession = MALLOC(sizeof(TSession)); + tsession->internal = MALLOC(P2P_hello_MESSAGE_size(helo)); + memcpy(tsession->internal, + helo, + P2P_hello_MESSAGE_size(helo)); + tsession->ttype = udpAPI.protocolNumber; + (*tsessionPtr) = tsession; + return OK; +} + +/** + * A (core) Session is to be associated with a transport session. The + * transport service may want to know in order to call back on the + * core if the connection is being closed. + * + * @param tsession the session handle passed along + * from the call to receive that was made by the transport + * layer + * @return OK if the session could be associated, + * SYSERR if not. + */ +int udpAssociate(TSession * tsession) { + return SYSERR; /* UDP connections can never be associated */ +} + +/** + * Disconnect from a remote node. + * + * @param tsession the session that is closed + * @return OK on success, SYSERR if the operation failed + */ +static int udpDisconnect(TSession * tsession) { + if (tsession != NULL) { + if (tsession->internal != NULL) + FREE(tsession->internal); + FREE(tsession); + } + return OK; +} + +/** + * Shutdown the server process (stop receiving inbound traffic). Maybe + * restarted later! + */ +static int stopTransportServer() { + GE_ASSERT(ectx, udp_sock != NULL); + if (selector != NULL) { + select_destroy(selector); + selector = NULL; + } + socket_destroy(udp_sock); + udp_sock = NULL; + return OK; +} + +/* end of udp_helper.c */ Property changes on: GNUnet/src/transports/udp_helper.c ___________________________________________________________________ Name: svn:eol-style + native _______________________________________________ GNUnet-SVN mailing list GNUnet-SVN@gnu.org http://lists.gnu.org/mailman/listinfo/gnunet-svn