Eric Blake wrote: > > Such piping, where you write from the current process and read into > > the current process, may hang on BSD systems, because some data is > > present in system buffers but the system wants the buffers to be full > > before it continues. A fix for this hang is to enable non-blocking I/O > > and use a loop with select() that alternately reads and writes. See > > <http://git.savannah.gnu.org/gitweb/?p=gettext.git;a=blob;f=gettext-tools/src/msgfilter.c;hb=HEAD#l580> > > Any chance of porting that over to gnulib?
I'm adding a new module 'pipe-filter', that does this. > At any rate, my understanding > is that bison's use of a subpipe is to call m4, which, thanks to the way > diversions are used, produces no output until after all the input has been > consumed. Thus, bison's usage pattern is immune to this particular > deadlock. Still, this does not sound very future-proof. > But you are correct that to avoid deadlock in a generic filter > child application, portable applications cannot write more than PIPE_MAX > bytes without checking whether the read end needs draining, and ... > avoid problems with partial buffers. Yes, these are the two possible deadlock types: when both processes are writing to each other and the pipe buffers in the OS are full, or when both processes are reading from each other and some data is stuck in buffers and is not being returned because either an fflush is missing or a partial read has been disallowed. No generic solution can be found against the missing fflush, but for the other two cases this modules handles it. 2009-07-19 Bruno Haible <br...@clisp.org> New module 'pipe-filter'. * lib/pipe-filter.h: New file. * lib/pipe-filter.c: New file. * modules/pipe-filter: New file. ============================== lib/pipe-filter.h ============================== /* Filtering of data through a subprocess. Copyright (C) 2009 Free Software Foundation, Inc. Written by Bruno Haible <hai...@clisp.cons.org>, 2009. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #ifndef _PIPE_FILTER_H #define _PIPE_FILTER_H #include <stdbool.h> #include <stddef.h> #ifdef __cplusplus extern "C" { #endif /* Piping data through a subprocess in the naïve way - write data to the subprocess and read from the subprocess when you expect it to have produced results - is subject to two kinds of deadlocks: 1) If you write more than PIPE_MAX bytes or, more generally, if you write more bytes than the subprocess can handle at once, the subprocess may write its data and wait on you to read it, but you are currently busy writing. 2) When you don't know ahead of time how many bytes the subprocess will produce, the usual technique of calling read (fd, buf, BUFSIZ) with a fixed BUFSIZ will, on Linux 2.2.17 and on BSD systems, cause the read() call to block until *all* of the buffer has been filled. But the subprocess cannot produce more data until you gave it more input. But you are currently busy reading from it. This module provides a function that pipes data through the subprocess, without risking these deadlocks. */ typedef const void * (*prepare_write_fn) (size_t *num_bytes_p, void *private_data); typedef void (*done_write_fn) (void *data_written, size_t num_bytes_written, void *private_data); typedef void * (*prepare_read_fn) (size_t *num_bytes_p, void *private_data); typedef void (*done_read_fn) (void *data_read, size_t num_bytes_read, void *private_data); /* Create a subprocess and pipe some data through it. Arguments: - progname is the program name used in error messages. - prog_path is the file name of the program to invoke. - prog_argv is a NULL terminated argument list, starting with prog_path as first element. - If null_stderr is true, the subprocess' stderr will be redirected to /dev/null, and the usual error message to stderr will be omitted. This is suitable when the subprocess does not fulfill an important task. - If exit_on_error is true, any error will cause the main process to exit with an error status. If the subprocess does not terminate correctly, exit if exit_on_error is true, otherwise return 127. Data is alternatingly written to the subprocess, through the functions prepare_write and done_write, and read from the subprocess, through the functions prepare_read and done_read. Callback arguments: - prepare_write (&num_bytes, p) must either return a pointer to data that is ready to be written and set num_bytes to the number of bytes ready to be written, or return NULL when no more bytes are to be written. - done_write (data_written, num_bytes_written) is called after num_bytes_written bytes were written. It is guaranteed that num_bytes_written > 0. - prepare_read (&num_bytes, p) must return a pointer to a buffer for data that can be read and set num_bytes to the size of that buffer (must be > 0). - done_read (data_read, num_bytes_read, p) is called after num_bytes_read bytes were read into the buffer. Here p is always the private_data argument. Note that the prepare_write/done_write functions and the prepare_read/done_read functions may be called in different threads than the current thread (depending on platform). But they will not be called after the pipe_through_subprocess function has returned. Return 0 upon success, or (only if exit_on_error is false): - -1 with errno set upon failure, - the positive exit code of the subprocess if that failed. */ extern int pipe_through_subprocess (const char *progname, const char *prog_path, char **prog_argv, bool null_stderr, bool exit_on_error, prepare_write_fn prepare_write, done_write_fn done_write, prepare_read_fn prepare_read, done_read_fn done_read, void *private_data); #ifdef __cplusplus } #endif #endif /* _PIPE_FILTER_H */ ============================== lib/pipe-filter.c ============================== /* Filtering of data through a subprocess. Copyright (C) 2001-2003, 2008-2009 Free Software Foundation, Inc. Written by Bruno Haible <hai...@clisp.cons.org>, 2009. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include <config.h> #include "pipe-filter.h" #include <errno.h> #include <fcntl.h> #include <stdbool.h> #include <stdint.h> #include <stdlib.h> #include <sys/select.h> #include <unistd.h> #include "error.h" #include "gettext.h" #include "pipe.h" #include "wait-process.h" #define _(str) gettext (str) #ifndef SSIZE_MAX # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2)) #endif /* We use a child process, and communicate through a bidirectional pipe. To avoid deadlocks, let the child process decide when it wants to read or to write, and let the parent behave accordingly. The parent uses select() to know whether it must write or read. On platforms without select(), we use non-blocking I/O. (This means the parent is busy looping while waiting for the child. Not good. But hardly any platform lacks select() nowadays.) */ /* On BeOS select() works only on sockets, not on normal file descriptors. */ #ifdef __BEOS__ # undef HAVE_SELECT #endif #ifdef EINTR /* EINTR handling for close(), read(), write(), select(). These functions can return -1/EINTR even though we don't have any signal handlers set up, namely when we get interrupted via SIGSTOP. */ static inline int nonintr_close (int fd) { int retval; do retval = close (fd); while (retval < 0 && errno == EINTR); return retval; } #undef close /* avoid warning related to gnulib module unistd */ #define close nonintr_close static inline ssize_t nonintr_read (int fd, void *buf, size_t count) { ssize_t retval; do retval = read (fd, buf, count); while (retval < 0 && errno == EINTR); return retval; } #define read nonintr_read static inline ssize_t nonintr_write (int fd, const void *buf, size_t count) { ssize_t retval; do retval = write (fd, buf, count); while (retval < 0 && errno == EINTR); return retval; } #undef write /* avoid warning on VMS */ #define write nonintr_write # if HAVE_SELECT static inline int nonintr_select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) { int retval; do retval = select (n, readfds, writefds, exceptfds, timeout); while (retval < 0 && errno == EINTR); return retval; } # undef select /* avoid warning on VMS */ # define select nonintr_select # endif #endif /* Non-blocking I/O. */ #ifndef O_NONBLOCK # define O_NONBLOCK O_NDELAY #endif #if HAVE_SELECT # define IS_EAGAIN(errcode) 0 #else # ifdef EWOULDBLOCK # define IS_EAGAIN(errcode) ((errcode) == EAGAIN || (errcode) == EWOULDBLOCK) # else # define IS_EAGAIN(errcode) ((errcode) == EAGAIN) # endif #endif #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ struct locals { prepare_write_fn prepare_write; done_write_fn done_write; prepare_read_fn prepare_read; done_read_fn done_read; void *private_data; int fd[2]; volatile bool writer_terminated; volatile int writer_final_errno; volatile bool reader_terminated; volatile int reader_final_errno; }; static unsigned int WINAPI writer_thread_func (void *thread_arg) { struct locals *l = (struct locals *) thread_arg; for (;;) { size_t bufsize; const void *buf = l->prepare_write (&bufsize, l->private_data); if (buf != NULL) { ssize_t nwritten; if (bufsize > SSIZE_MAX) bufsize = SSIZE_MAX; nwritten = write (l->fd[1], buf, bufsize); if (nwritten < 0) { l->writer_final_errno = errno; break; } if (nwritten > 0) l->done_write ((void *) buf, nwritten, l->private_data); } else break; } l->writer_terminated = true; _endthreadex (0); /* calls ExitThread (0) */ abort (); } static unsigned int WINAPI reader_thread_func (void *thread_arg) { struct locals *l = (struct locals *) thread_arg; for (;;) { size_t bufsize; void *buf = l->prepare_read (&bufsize, l->private_data); if (!(buf != NULL && bufsize > 0)) /* prepare_read returned wrong values. */ abort (); if (bufsize > SSIZE_MAX) bufsize = SSIZE_MAX; { ssize_t nread = read (l->fd[0], buf, bufsize); if (nread < 0) { l->reader_final_errno = errno; break; } if (nread > 0) l->done_read (buf, nread, l->private_data); else break; } } l->reader_terminated = true; _endthreadex (0); /* calls ExitThread (0) */ abort (); } #endif int pipe_through_subprocess (const char *progname, const char *prog_path, char **prog_argv, bool null_stderr, bool exit_on_error, prepare_write_fn prepare_write, done_write_fn done_write, prepare_read_fn prepare_read, done_read_fn done_read, void *private_data) { pid_t child; int fd[2]; /* Open a bidirectional pipe to a subprocess. */ child = create_pipe_bidi (progname, prog_path, prog_argv, null_stderr, true, exit_on_error, fd); #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ /* Native Woe32 API. */ /* Pipes have a non-blocking mode, see function SetNamedPipeHandleState and the article "Named Pipe Type, Read, and Wait Modes", but Microsoft's documentation discourages its use. So don't use it. Asynchronous I/O is also not suitable because it notifies the caller only about completion of the I/O request, not about intermediate progress. So do the writing and the reading in separate threads. */ { struct locals l; HANDLE handles[2]; #define writer_thread_handle handles[0] #define reader_thread_handle handles[1] bool writer_cleaned_up; bool reader_cleaned_up; l.prepare_write = prepare_write; l.done_write = done_write; l.prepare_read = prepare_read; l.done_read = done_read; l.private_data = private_data; l.fd[0] = fd[0]; l.fd[1] = fd[1]; l.writer_terminated = false; l.writer_final_errno = 0; l.reader_terminated = false; l.reader_final_errno = 0; writer_thread_handle = (HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, &l, 0, NULL); reader_thread_handle = (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, &l, 0, NULL); if (writer_thread_handle == NULL || reader_thread_handle == NULL) { if (exit_on_error) error (EXIT_FAILURE, 0, _("creation of threads failed")); if (reader_thread_handle != NULL) CloseHandle (reader_thread_handle); if (writer_thread_handle != NULL) CloseHandle (writer_thread_handle); goto fail; } writer_cleaned_up = false; reader_cleaned_up = false; for (;;) { DWORD ret; /* Here !(writer_cleaned_up && reader_cleaned_up). */ if (writer_cleaned_up) ret = WaitForSingleObject (reader_thread_handle, INFINITE); else if (reader_cleaned_up) ret = WaitForSingleObject (writer_thread_handle, INFINITE); else ret = WaitForMultipleObjects (2, handles, FALSE, INFINITE); if (!(ret == WAIT_OBJECT_0 + 0 || ret == WAIT_OBJECT_0 + 1)) abort (); if (l.writer_terminated) { /* The writer thread has just terminated. */ l.writer_terminated = false; CloseHandle (writer_thread_handle); if (l.writer_final_errno) { if (exit_on_error) error (EXIT_FAILURE, l.writer_final_errno, _("write to %s subprocess failed"), progname); if (!reader_cleaned_up) { TerminateThread (reader_thread_handle, 1); CloseHandle (reader_thread_handle); } goto fail; } /* Tell the child there is nothing more the parent will send. */ close (fd[1]); writer_cleaned_up = true; } if (l.reader_terminated) { /* The reader thread has just terminated. */ l.reader_terminated = false; CloseHandle (reader_thread_handle); if (l.reader_final_errno) { if (exit_on_error) error (EXIT_FAILURE, l.reader_final_errno, _("read from %s subprocess failed"), progname); if (!writer_cleaned_up) { TerminateThread (writer_thread_handle, 1); CloseHandle (writer_thread_handle); } goto fail; } reader_cleaned_up = true; } if (writer_cleaned_up && reader_cleaned_up) break; } } #else { bool done_writing; /* Enable non-blocking I/O. This permits the read() and write() calls to return -1/EAGAIN without blocking; this is important for polling if HAVE_SELECT is not defined. It also permits the read() and write() calls to return after partial reads/writes; this is important if HAVE_SELECT is defined, because select() only says that some data can be read or written, not how many. Without non-blocking I/O, Linux 2.2.17 and BSD systems prefer to block instead of returning with partial results. */ { int fcntl_flags; if ((fcntl_flags = fcntl (fd[1], F_GETFL, 0)) < 0 || fcntl (fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) < 0 || (fcntl_flags = fcntl (fd[0], F_GETFL, 0)) < 0 || fcntl (fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) < 0) { if (exit_on_error) error (EXIT_FAILURE, errno, _("cannot set up nonblocking I/O to %s subprocess"), progname); goto fail; } } done_writing = false; for (;;) { # if HAVE_SELECT int n; fd_set readfds; fd_set writefds; FD_ZERO (&readfds); FD_SET (fd[0], &readfds); n = fd[0] + 1; if (!done_writing) { FD_ZERO (&writefds); FD_SET (fd[1], &writefds); if (n <= fd[1]) n = fd[1] + 1; } n = select (n, &readfds, (!done_writing ? &writefds : NULL), NULL, NULL); if (n < 0) { if (exit_on_error) error (EXIT_FAILURE, errno, _("communication with %s subprocess failed"), progname); goto fail; } if (!done_writing && FD_ISSET (fd[1], &writefds)) goto try_write; if (FD_ISSET (fd[0], &readfds)) goto try_read; /* How could select() return if none of the two descriptors is ready? */ abort (); # endif /* Attempt to write. */ # if HAVE_SELECT try_write: # endif if (!done_writing) { size_t bufsize; const void *buf = prepare_write (&bufsize, private_data); if (buf != NULL) { ssize_t nwritten; if (bufsize > SSIZE_MAX) bufsize = SSIZE_MAX; nwritten = write (fd[1], buf, bufsize); if (nwritten < 0 && !IS_EAGAIN (errno)) { if (exit_on_error) error (EXIT_FAILURE, errno, _("write to %s subprocess failed"), progname); goto fail; } if (nwritten > 0) done_write ((void *) buf, nwritten, private_data); } else { /* Tell the child there is nothing more the parent will send. */ close (fd[1]); done_writing = true; } } # if HAVE_SELECT continue; # endif /* Attempt to read. */ # if HAVE_SELECT try_read: # endif { size_t bufsize; void *buf = prepare_read (&bufsize, private_data); if (!(buf != NULL && bufsize > 0)) /* prepare_read returned wrong values. */ abort (); { ssize_t nread = read (fd[0], buf, bufsize); if (nread < 0 && !IS_EAGAIN (errno)) { if (exit_on_error) error (EXIT_FAILURE, errno, _("read from %s subprocess failed"), progname); goto fail; } if (nread > 0) done_read (buf, nread, private_data); if (nread == 0 && done_writing) break; } } # if HAVE_SELECT continue; # endif } } #endif close (fd[0]); /* Remove zombie process from process list. */ { int exitstatus = wait_subprocess (child, progname, false, null_stderr, true, exit_on_error, NULL); if (exitstatus != 0 && exit_on_error) error (EXIT_FAILURE, 0, _("%s subprocess terminated with exit code %d"), progname, exitstatus); return exitstatus; } fail: { int saved_errno = errno; close (fd[1]); close (fd[0]); wait_subprocess (child, progname, true, true, true, false, NULL); errno = saved_errno; return -1; } } ============================= modules/pipe-filter ============================= Description: Filtering of data through a subprocess. Files: lib/pipe-filter.h lib/pipe-filter.c Depends-on: pipe wait-process error exit gettext-h stdbool stdint sys_select unistd configure.ac: AC_REQUIRE([AC_C_INLINE]) AC_CHECK_FUNCS([select]) Makefile.am: lib_SOURCES += pipe-filter.c Include: "pipe-filter.h" License: GPL Maintainer: Bruno Haible ===============================================================================