Use send/recv for socket stream instead of read/write. Use event handle for polling on socket stream. Check windows specific return code.
Signed-off-by: Linda Sun <l...@vmware.com> --- lib/automake.mk | 7 +- lib/latch-unix.c | 87 +++++++++++++++ lib/latch.c | 87 --------------- lib/stream-fd-unix.c | 268 +++++++++++++++++++++++++++++++++++++++++++++ lib/stream-fd-windows.c | 278 +++++++++++++++++++++++++++++++++++++++++++++++ lib/stream-fd.c | 268 --------------------------------------------- lib/stream-fd.h | 3 + lib/stream.c | 4 + 8 files changed, 644 insertions(+), 358 deletions(-) create mode 100644 lib/latch-unix.c delete mode 100644 lib/latch.c create mode 100644 lib/stream-fd-unix.c create mode 100644 lib/stream-fd-windows.c delete mode 100644 lib/stream-fd.c diff --git a/lib/automake.mk b/lib/automake.mk index 4ecd61d..8ccac04 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -188,7 +188,6 @@ lib_libopenvswitch_la_SOURCES = \ lib/sset.h \ lib/stp.c \ lib/stp.h \ - lib/stream-fd.c \ lib/stream-fd.h \ lib/stream-provider.h \ lib/stream-ssl.h \ @@ -240,11 +239,13 @@ if WIN32 lib_libopenvswitch_la_SOURCES += \ lib/daemon-windows.c \ lib/getopt_long.c \ - lib/latch-windows.c + lib/latch-windows.c \ + lib/stream-fd-windows.c else lib_libopenvswitch_la_SOURCES += \ lib/daemon.c \ - lib/latch.c + lib/latch-unix.c \ + lib/stream-fd-unix.c endif EXTRA_DIST += \ diff --git a/lib/latch-unix.c b/lib/latch-unix.c new file mode 100644 index 0000000..20a6575 --- /dev/null +++ b/lib/latch-unix.c @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2013 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> + +#include "latch.h" +#include <errno.h> +#include <poll.h> +#include <unistd.h> +#include "poll-loop.h" +#include "socket-util.h" + +/* Initializes 'latch' as initially unset. */ +void +latch_init(struct latch *latch) +{ + xpipe_nonblocking(latch->fds); +} + +/* Destroys 'latch'. */ +void +latch_destroy(struct latch *latch) +{ + close(latch->fds[0]); + close(latch->fds[1]); +} + +/* Resets 'latch' to the unset state. Returns true if 'latch' was previously + * set, false otherwise. */ +bool +latch_poll(struct latch *latch) +{ + char buffer[_POSIX_PIPE_BUF]; + + return read(latch->fds[0], buffer, sizeof buffer) > 0; +} + +/* Sets 'latch'. + * + * Calls are not additive: a single latch_poll() clears out any number of + * latch_set(). */ +void +latch_set(struct latch *latch) +{ + ignore(write(latch->fds[1], "", 1)); +} + +/* Returns true if 'latch' is set, false otherwise. Does not reset 'latch' + * to the unset state. */ +bool +latch_is_set(const struct latch *latch) +{ + struct pollfd pfd; + int retval; + + pfd.fd = latch->fds[0]; + pfd.events = POLLIN; + do { + retval = poll(&pfd, 1, 0); + } while (retval < 0 && errno == EINTR); + + return pfd.revents & POLLIN; +} + +/* Causes the next poll_block() to wake up when 'latch' is set. + * + * ('where' is used in debug logging. Commonly one would use latch_wait() to + * automatically provide the caller's source file and line number for + * 'where'.) */ +void +latch_wait_at(const struct latch *latch, const char *where) +{ + poll_fd_wait_at(latch->fds[0], 0, POLLIN, where); +} diff --git a/lib/latch.c b/lib/latch.c deleted file mode 100644 index 20a6575..0000000 --- a/lib/latch.c +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2013 Nicira, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <config.h> - -#include "latch.h" -#include <errno.h> -#include <poll.h> -#include <unistd.h> -#include "poll-loop.h" -#include "socket-util.h" - -/* Initializes 'latch' as initially unset. */ -void -latch_init(struct latch *latch) -{ - xpipe_nonblocking(latch->fds); -} - -/* Destroys 'latch'. */ -void -latch_destroy(struct latch *latch) -{ - close(latch->fds[0]); - close(latch->fds[1]); -} - -/* Resets 'latch' to the unset state. Returns true if 'latch' was previously - * set, false otherwise. */ -bool -latch_poll(struct latch *latch) -{ - char buffer[_POSIX_PIPE_BUF]; - - return read(latch->fds[0], buffer, sizeof buffer) > 0; -} - -/* Sets 'latch'. - * - * Calls are not additive: a single latch_poll() clears out any number of - * latch_set(). */ -void -latch_set(struct latch *latch) -{ - ignore(write(latch->fds[1], "", 1)); -} - -/* Returns true if 'latch' is set, false otherwise. Does not reset 'latch' - * to the unset state. */ -bool -latch_is_set(const struct latch *latch) -{ - struct pollfd pfd; - int retval; - - pfd.fd = latch->fds[0]; - pfd.events = POLLIN; - do { - retval = poll(&pfd, 1, 0); - } while (retval < 0 && errno == EINTR); - - return pfd.revents & POLLIN; -} - -/* Causes the next poll_block() to wake up when 'latch' is set. - * - * ('where' is used in debug logging. Commonly one would use latch_wait() to - * automatically provide the caller's source file and line number for - * 'where'.) */ -void -latch_wait_at(const struct latch *latch, const char *where) -{ - poll_fd_wait_at(latch->fds[0], 0, POLLIN, where); -} diff --git a/lib/stream-fd-unix.c b/lib/stream-fd-unix.c new file mode 100644 index 0000000..8062d2d --- /dev/null +++ b/lib/stream-fd-unix.c @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2014 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include "stream-fd.h" +#include <errno.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> +#include "fatal-signal.h" +#include "poll-loop.h" +#include "socket-util.h" +#include "util.h" +#include "stream-provider.h" +#include "stream.h" +#include "vlog.h" + +VLOG_DEFINE_THIS_MODULE(stream_fd); + +/* Active file descriptor stream. */ + +struct stream_fd +{ + struct stream stream; + int fd; +}; + +static const struct stream_class stream_fd_class; + +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); + +static void maybe_unlink_and_free(char *path); + +/* Creates a new stream named 'name' that will send and receive data on 'fd' + * and stores a pointer to the stream in '*streamp'. Initial connection status + * 'connect_status' is interpreted as described for stream_init(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +int +new_fd_stream(const char *name, int fd, int connect_status, + struct stream **streamp) +{ + struct stream_fd *s; + + s = xmalloc(sizeof *s); + stream_init(&s->stream, &stream_fd_class, connect_status, name); + s->fd = fd; + *streamp = &s->stream; + return 0; +} + +static struct stream_fd * +stream_fd_cast(struct stream *stream) +{ + stream_assert_class(stream, &stream_fd_class); + return CONTAINER_OF(stream, struct stream_fd, stream); +} + +static void +fd_close(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + close(s->fd); + free(s); +} + +static int +fd_connect(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + return check_connection_completion(s->fd); +} + +static ssize_t +fd_recv(struct stream *stream, void *buffer, size_t n) +{ + struct stream_fd *s = stream_fd_cast(stream); + ssize_t retval; + + retval = read(s->fd, buffer, n); + return retval >= 0 ? retval : -errno; +} + +static ssize_t +fd_send(struct stream *stream, const void *buffer, size_t n) +{ + struct stream_fd *s = stream_fd_cast(stream); + ssize_t retval; + + retval = write(s->fd, buffer, n); + return (retval > 0 ? retval + : retval == 0 ? -EAGAIN + : -errno); +} + +static void +fd_wait(struct stream *stream, enum stream_wait_type wait) +{ + struct stream_fd *s = stream_fd_cast(stream); + switch (wait) { + case STREAM_CONNECT: + case STREAM_SEND: + poll_fd_wait(s->fd, POLLOUT); + break; + + case STREAM_RECV: + poll_fd_wait(s->fd, POLLIN); + break; + + default: + OVS_NOT_REACHED(); + } +} + +static const struct stream_class stream_fd_class = { + "fd", /* name */ + false, /* needs_probes */ + NULL, /* open */ + fd_close, /* close */ + fd_connect, /* connect */ + fd_recv, /* recv */ + fd_send, /* send */ + NULL, /* run */ + NULL, /* run_wait */ + fd_wait, /* wait */ +}; + +/* Passive file descriptor stream. */ + +struct fd_pstream +{ + struct pstream pstream; + int fd; + int (*accept_cb)(int fd, const struct sockaddr_storage *, size_t ss_len, + struct stream **); + int (*set_dscp_cb)(int fd, uint8_t dscp); + char *unlink_path; +}; + +static const struct pstream_class fd_pstream_class; + +static struct fd_pstream * +fd_pstream_cast(struct pstream *pstream) +{ + pstream_assert_class(pstream, &fd_pstream_class); + return CONTAINER_OF(pstream, struct fd_pstream, pstream); +} + +/* Creates a new pstream named 'name' that will accept new socket connections + * on 'fd' and stores a pointer to the stream in '*pstreamp'. + * + * When a connection has been accepted, 'accept_cb' will be called with the new + * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'. + * accept_cb must return 0 if the connection is successful, in which case it + * must initialize '*streamp' to the new stream, or a positive errno value on + * error. In either case accept_cb takes ownership of the 'fd' passed in. + * + * When '*pstreamp' is closed, then 'unlink_path' (if nonnull) will be passed + * to fatal_signal_unlink_file_now() and freed with free(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +int +new_fd_pstream(const char *name, int fd, + int (*accept_cb)(int fd, const struct sockaddr_storage *ss, + size_t ss_len, struct stream **streamp), + int (*set_dscp_cb)(int fd, uint8_t dscp), + char *unlink_path, struct pstream **pstreamp) +{ + struct fd_pstream *ps = xmalloc(sizeof *ps); + pstream_init(&ps->pstream, &fd_pstream_class, name); + ps->fd = fd; + ps->accept_cb = accept_cb; + ps->set_dscp_cb = set_dscp_cb; + ps->unlink_path = unlink_path; + *pstreamp = &ps->pstream; + return 0; +} + +static void +pfd_close(struct pstream *pstream) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + close(ps->fd); + maybe_unlink_and_free(ps->unlink_path); + free(ps); +} + +static int +pfd_accept(struct pstream *pstream, struct stream **new_streamp) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + struct sockaddr_storage ss; + socklen_t ss_len = sizeof ss; + int new_fd; + int retval; + + new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); + if (new_fd < 0) { + retval = errno; + if (retval != EAGAIN) { + VLOG_DBG_RL(&rl, "accept: %s", ovs_strerror(retval)); + } + return retval; + } + + retval = set_nonblocking(new_fd); + if (retval) { + close(new_fd); + return retval; + } + + return ps->accept_cb(new_fd, &ss, ss_len, new_streamp); +} + +static void +pfd_wait(struct pstream *pstream) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + poll_fd_wait(ps->fd, POLLIN); +} + +static int +pfd_set_dscp(struct pstream *pstream, uint8_t dscp) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + if (ps->set_dscp_cb) { + return ps->set_dscp_cb(ps->fd, dscp); + } + return 0; +} + +static const struct pstream_class fd_pstream_class = { + "pstream", + false, + NULL, + pfd_close, + pfd_accept, + pfd_wait, + pfd_set_dscp, +}; + +/* Helper functions. */ +static void +maybe_unlink_and_free(char *path) +{ + if (path) { + fatal_signal_unlink_file_now(path); + free(path); + } +} diff --git a/lib/stream-fd-windows.c b/lib/stream-fd-windows.c new file mode 100644 index 0000000..b2aa67d --- /dev/null +++ b/lib/stream-fd-windows.c @@ -0,0 +1,278 @@ +/* + * Copyright (c) 2014 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include "stream-fd.h" +#include <errno.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> +#include <winsock2.h> + +#include <sys/types.h> +#include <unistd.h> +#include "fatal-signal.h" +#include "poll-loop.h" +#include "socket-util.h" +#include "util.h" +#include "stream-provider.h" +#include "stream.h" +#include "vlog.h" + +VLOG_DEFINE_THIS_MODULE(stream_fd_windows); + +/* Active file descriptor stream. */ + +struct stream_fd +{ + struct stream stream; + int fd; + HANDLE wevent; +}; + +static const struct stream_class stream_fd_class; + +static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); + +/* Creates a new stream named 'name' that will send and receive data on 'fd' + * and stores a pointer to the stream in '*streamp'. Initial connection status + * 'connect_status' is interpreted as described for stream_init(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +int +new_fd_stream(const char *name, int fd, int connect_status, + struct stream **streamp) +{ + struct stream_fd *s; + + s = xmalloc(sizeof *s); + stream_init(&s->stream, &stream_fd_class, connect_status, name); + s->fd = fd; + s->wevent = CreateEvent(NULL, FALSE, FALSE, NULL); + WSAEventSelect(s->fd, s->wevent, FD_ALL_EVENTS); + *streamp = &s->stream; + return 0; +} + +static struct stream_fd * +stream_fd_cast(struct stream *stream) +{ + stream_assert_class(stream, &stream_fd_class); + return CONTAINER_OF(stream, struct stream_fd, stream); +} + +static void +fd_close(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + WSAEventSelect(s->fd, NULL, 0); + CloseHandle(s->wevent); + closesocket(s->fd); + free(s); +} + +static int +fd_connect(struct stream *stream) +{ + struct stream_fd *s = stream_fd_cast(stream); + return check_connection_completion(s->fd); +} + +static ssize_t +fd_recv(struct stream *stream, void *buffer, size_t n) +{ + struct stream_fd *s = stream_fd_cast(stream); + ssize_t retval; + + retval = recv(s->fd, buffer, n, 0); + if (retval < 0) { + retval = -WSAGetLastError(); + } + if (retval == - WSAEWOULDBLOCK) { + return -EAGAIN; + } + return retval; +} + +static ssize_t +fd_send(struct stream *stream, const void *buffer, size_t n) +{ + struct stream_fd *s = stream_fd_cast(stream); + ssize_t retval; + + retval = send(s->fd, buffer, n, 0); + if (retval < 0) { + retval = -WSAGetLastError(); + } + if (retval == -WSAEWOULDBLOCK) { + return -EAGAIN; + } + + return retval; +} + +static void +fd_wait(struct stream *stream, enum stream_wait_type wait) +{ + struct stream_fd *s = stream_fd_cast(stream); + switch (wait) { + case STREAM_CONNECT: + case STREAM_SEND: + poll_fd_wait_event(s->fd, s->wevent, POLLOUT); + break; + + case STREAM_RECV: + poll_fd_wait_event(s->fd, s->wevent, POLLIN); + break; + + default: + NOT_REACHED(); + } +} + +static const struct stream_class stream_fd_class = { + "fd", /* name */ + false, /* needs_probes */ + NULL, /* open */ + fd_close, /* close */ + fd_connect, /* connect */ + fd_recv, /* recv */ + fd_send, /* send */ + NULL, /* run */ + NULL, /* run_wait */ + fd_wait, /* wait */ +}; + +/* Passive file descriptor stream. */ + +struct fd_pstream +{ + struct pstream pstream; + int fd; + HANDLE wevent; + int (*accept_cb)(int fd, const struct sockaddr *, size_t sa_len, + struct stream **); + int (*set_dscp_cb)(int fd, uint8_t dscp); + char *unlink_path; +}; + +static const struct pstream_class fd_pstream_class; + +static struct fd_pstream * +fd_pstream_cast(struct pstream *pstream) +{ + pstream_assert_class(pstream, &fd_pstream_class); + return CONTAINER_OF(pstream, struct fd_pstream, pstream); +} + +/* Creates a new pstream named 'name' that will accept new socket connections + * on 'fd' and stores a pointer to the stream in '*pstreamp'. + * + * When a connection has been accepted, 'accept_cb' will be called with the new + * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'. + * accept_cb must return 0 if the connection is successful, in which case it + * must initialize '*streamp' to the new stream, or a positive errno value on + * error. In either case accept_cb takes ownership of the 'fd' passed in. + * + * When '*pstreamp' is closed, then 'unlink_path' (if nonnull) will be passed + * to fatal_signal_unlink_file_now() and freed with free(). + * + * Returns 0 if successful, otherwise a positive errno value. (The current + * implementation never fails.) */ +int +new_fd_pstream(const char *name, int fd, + int (*accept_cb)(int fd, const struct sockaddr *sa, + size_t sa_len, struct stream **streamp), + int (*set_dscp_cb)(int fd, uint8_t dscp), + char *unlink_path, struct pstream **pstreamp) +{ + struct fd_pstream *ps = xmalloc(sizeof *ps); + pstream_init(&ps->pstream, &fd_pstream_class, name); + ps->fd = fd; + ps->wevent = CreateEvent(NULL, FALSE, FALSE, NULL); + WSAEventSelect(ps->fd, ps->wevent, FD_ALL_EVENTS); + ps->accept_cb = accept_cb; + ps->set_dscp_cb = set_dscp_cb; + ps->unlink_path = unlink_path; + *pstreamp = &ps->pstream; + return 0; +} + +static void +pfd_close(struct pstream *pstream) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + closesocket(ps->fd); + free(ps); +} + +static int +pfd_accept(struct pstream *pstream, struct stream **new_streamp) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + struct sockaddr_storage ss; + socklen_t ss_len = sizeof ss; + int new_fd; + int retval; + + + new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); + if (new_fd < 0) { + retval = WSAGetLastError(); + if (retval == WSAEWOULDBLOCK) { + return EAGAIN; + } + return retval; + } + + retval = set_nonblocking(new_fd); + if (retval) { + closesocket(new_fd); + return retval; + } + + return ps->accept_cb(new_fd, (const struct sockaddr *) &ss, ss_len, + new_streamp); +} + +static void +pfd_wait(struct pstream *pstream) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + poll_fd_wait_event(ps->fd, ps->wevent, POLLIN); +} + +static int +pfd_set_dscp(struct pstream *pstream, uint8_t dscp) +{ + struct fd_pstream *ps = fd_pstream_cast(pstream); + if (ps->set_dscp_cb) { + return ps->set_dscp_cb(ps->fd, dscp); + } + return 0; +} + +static const struct pstream_class fd_pstream_class = { + "pstream", + false, + NULL, + pfd_close, + pfd_accept, + pfd_wait, + pfd_set_dscp, +}; + diff --git a/lib/stream-fd.c b/lib/stream-fd.c deleted file mode 100644 index 8062d2d..0000000 --- a/lib/stream-fd.c +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2014 Nicira, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <config.h> -#include "stream-fd.h" -#include <errno.h> -#include <poll.h> -#include <stdlib.h> -#include <string.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <unistd.h> -#include "fatal-signal.h" -#include "poll-loop.h" -#include "socket-util.h" -#include "util.h" -#include "stream-provider.h" -#include "stream.h" -#include "vlog.h" - -VLOG_DEFINE_THIS_MODULE(stream_fd); - -/* Active file descriptor stream. */ - -struct stream_fd -{ - struct stream stream; - int fd; -}; - -static const struct stream_class stream_fd_class; - -static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); - -static void maybe_unlink_and_free(char *path); - -/* Creates a new stream named 'name' that will send and receive data on 'fd' - * and stores a pointer to the stream in '*streamp'. Initial connection status - * 'connect_status' is interpreted as described for stream_init(). - * - * Returns 0 if successful, otherwise a positive errno value. (The current - * implementation never fails.) */ -int -new_fd_stream(const char *name, int fd, int connect_status, - struct stream **streamp) -{ - struct stream_fd *s; - - s = xmalloc(sizeof *s); - stream_init(&s->stream, &stream_fd_class, connect_status, name); - s->fd = fd; - *streamp = &s->stream; - return 0; -} - -static struct stream_fd * -stream_fd_cast(struct stream *stream) -{ - stream_assert_class(stream, &stream_fd_class); - return CONTAINER_OF(stream, struct stream_fd, stream); -} - -static void -fd_close(struct stream *stream) -{ - struct stream_fd *s = stream_fd_cast(stream); - close(s->fd); - free(s); -} - -static int -fd_connect(struct stream *stream) -{ - struct stream_fd *s = stream_fd_cast(stream); - return check_connection_completion(s->fd); -} - -static ssize_t -fd_recv(struct stream *stream, void *buffer, size_t n) -{ - struct stream_fd *s = stream_fd_cast(stream); - ssize_t retval; - - retval = read(s->fd, buffer, n); - return retval >= 0 ? retval : -errno; -} - -static ssize_t -fd_send(struct stream *stream, const void *buffer, size_t n) -{ - struct stream_fd *s = stream_fd_cast(stream); - ssize_t retval; - - retval = write(s->fd, buffer, n); - return (retval > 0 ? retval - : retval == 0 ? -EAGAIN - : -errno); -} - -static void -fd_wait(struct stream *stream, enum stream_wait_type wait) -{ - struct stream_fd *s = stream_fd_cast(stream); - switch (wait) { - case STREAM_CONNECT: - case STREAM_SEND: - poll_fd_wait(s->fd, POLLOUT); - break; - - case STREAM_RECV: - poll_fd_wait(s->fd, POLLIN); - break; - - default: - OVS_NOT_REACHED(); - } -} - -static const struct stream_class stream_fd_class = { - "fd", /* name */ - false, /* needs_probes */ - NULL, /* open */ - fd_close, /* close */ - fd_connect, /* connect */ - fd_recv, /* recv */ - fd_send, /* send */ - NULL, /* run */ - NULL, /* run_wait */ - fd_wait, /* wait */ -}; - -/* Passive file descriptor stream. */ - -struct fd_pstream -{ - struct pstream pstream; - int fd; - int (*accept_cb)(int fd, const struct sockaddr_storage *, size_t ss_len, - struct stream **); - int (*set_dscp_cb)(int fd, uint8_t dscp); - char *unlink_path; -}; - -static const struct pstream_class fd_pstream_class; - -static struct fd_pstream * -fd_pstream_cast(struct pstream *pstream) -{ - pstream_assert_class(pstream, &fd_pstream_class); - return CONTAINER_OF(pstream, struct fd_pstream, pstream); -} - -/* Creates a new pstream named 'name' that will accept new socket connections - * on 'fd' and stores a pointer to the stream in '*pstreamp'. - * - * When a connection has been accepted, 'accept_cb' will be called with the new - * socket fd 'fd' and the remote address of the connection 'sa' and 'sa_len'. - * accept_cb must return 0 if the connection is successful, in which case it - * must initialize '*streamp' to the new stream, or a positive errno value on - * error. In either case accept_cb takes ownership of the 'fd' passed in. - * - * When '*pstreamp' is closed, then 'unlink_path' (if nonnull) will be passed - * to fatal_signal_unlink_file_now() and freed with free(). - * - * Returns 0 if successful, otherwise a positive errno value. (The current - * implementation never fails.) */ -int -new_fd_pstream(const char *name, int fd, - int (*accept_cb)(int fd, const struct sockaddr_storage *ss, - size_t ss_len, struct stream **streamp), - int (*set_dscp_cb)(int fd, uint8_t dscp), - char *unlink_path, struct pstream **pstreamp) -{ - struct fd_pstream *ps = xmalloc(sizeof *ps); - pstream_init(&ps->pstream, &fd_pstream_class, name); - ps->fd = fd; - ps->accept_cb = accept_cb; - ps->set_dscp_cb = set_dscp_cb; - ps->unlink_path = unlink_path; - *pstreamp = &ps->pstream; - return 0; -} - -static void -pfd_close(struct pstream *pstream) -{ - struct fd_pstream *ps = fd_pstream_cast(pstream); - close(ps->fd); - maybe_unlink_and_free(ps->unlink_path); - free(ps); -} - -static int -pfd_accept(struct pstream *pstream, struct stream **new_streamp) -{ - struct fd_pstream *ps = fd_pstream_cast(pstream); - struct sockaddr_storage ss; - socklen_t ss_len = sizeof ss; - int new_fd; - int retval; - - new_fd = accept(ps->fd, (struct sockaddr *) &ss, &ss_len); - if (new_fd < 0) { - retval = errno; - if (retval != EAGAIN) { - VLOG_DBG_RL(&rl, "accept: %s", ovs_strerror(retval)); - } - return retval; - } - - retval = set_nonblocking(new_fd); - if (retval) { - close(new_fd); - return retval; - } - - return ps->accept_cb(new_fd, &ss, ss_len, new_streamp); -} - -static void -pfd_wait(struct pstream *pstream) -{ - struct fd_pstream *ps = fd_pstream_cast(pstream); - poll_fd_wait(ps->fd, POLLIN); -} - -static int -pfd_set_dscp(struct pstream *pstream, uint8_t dscp) -{ - struct fd_pstream *ps = fd_pstream_cast(pstream); - if (ps->set_dscp_cb) { - return ps->set_dscp_cb(ps->fd, dscp); - } - return 0; -} - -static const struct pstream_class fd_pstream_class = { - "pstream", - false, - NULL, - pfd_close, - pfd_accept, - pfd_wait, - pfd_set_dscp, -}; - -/* Helper functions. */ -static void -maybe_unlink_and_free(char *path) -{ - if (path) { - fatal_signal_unlink_file_now(path); - free(path); - } -} diff --git a/lib/stream-fd.h b/lib/stream-fd.h index 8f595a9..9ae061a 100644 --- a/lib/stream-fd.h +++ b/lib/stream-fd.h @@ -12,6 +12,9 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Note on windows platform, stream fd can only handle sockets, on unix any + * fd is acceptable. */ #ifndef STREAM_FD_H diff --git a/lib/stream.c b/lib/stream.c index b69f03c..1dfecf0 100644 --- a/lib/stream.c +++ b/lib/stream.c @@ -51,7 +51,9 @@ enum stream_state { static const struct stream_class *stream_classes[] = { &tcp_stream_class, +#ifndef _WIN32 &unix_stream_class, +#endif #ifdef HAVE_OPENSSL &ssl_stream_class, #endif @@ -59,7 +61,9 @@ static const struct stream_class *stream_classes[] = { static const struct pstream_class *pstream_classes[] = { &ptcp_pstream_class, +#ifndef _WIN32 &punix_pstream_class, +#endif #ifdef HAVE_OPENSSL &pssl_pstream_class, #endif -- 1.7.9.5 _______________________________________________ dev mailing list dev@openvswitch.org http://openvswitch.org/mailman/listinfo/dev