On Fri, 2 Dec 2022, Carl Edquist wrote:
Although tee has multiple outputs, you only need to monitor a single
output fd at a time. Because the only case you actually need to catch
is when the final valid output becomes a broken pipe. (So I don't
think it's necessary to poll(2) all the output fds together.)
That is technically true, but I think coupling this to two FDs might
prove a bit inelegant in implementation (since you'd have to decide
which entry from an unorganized list with holes do you pick? any of
them could spontaneously go away), so I'm not sure the implementation
would be cleaner that way.
It'd be best to explain with a patch - I'll plan to send one later for a
concrete example.
Ok it's patch time. Attached! Feedback welcome.
A few notes:
Originally I had in mind to put the read() call inside the poll() loop.
But if we keep this feature as an option, it felt it was a bit easier just
to add the "if (pipe_check) {...}" block before the read().
The new iopoll() function is the core concept here for waiting on an
[input, output] pair of fds - waiting for the input to become ready to
read, or the output to have an error or become a broken pipe.
For Pádraig, I think the same function & approach here could be used in
other filters (cat for example). The stubborn part of me might say, for
platforms that do not natively support poll(2), we could simply leave out
support for this feature. If that's not acceptable, we could add a
select(2)-based fallback for platforms that do not have a native poll(2).
Unique to tee is its multiple outputs. The new get_next_out() helper
simply advances to select the next remaining (ie, not-yet-removed) output.
As described last time, it's sufficient to track a single output at a
time, and perhaps it even simplifies the implementation. It also avoids
the need for a malloc() for the pollfd array before every read().
I moved the somewhat complicated write-failure logic from tee_files() out
to a new helper function, fail_output(), which now also gets called for
broken pipes that we want to remove.
Note also that I make -P imply -p. I think this is necessary, otherwise
an output pipe becoming broken always produces an error. But normally, an
output pipe breaking does not necessarily produce an error, since EOF can
arrive before any further input, and in that case no write is then
attempted into the broken pipe.
Happy hacking / feedback welcome.
Thanks,
Carl
commit cabf34aa748afa676f10a7943de0e25a962d23a4
Author: Carl Edquist <edqu...@cs.wisc.edu>
Date: Mon Dec 5 15:43:24 2022 -0600
tee: add -P/--pipe-check option to remove broken pipe outputs
In case input is intermittent (a tty, pipe, or socket), and all
remaining outputs are pipes (eg, >(cmd) process substitutions), provide
a way to exit early when they have all become broken pipes (and thus
future writes will fail), without having to wait for more input to tee,
to actually encounter the signal or write failure (SIGPIPE/EPIPE).
Broken pipe detection is done by calling poll(2) with an unlimited
timeout, waiting for input to be available, or errors on input or the
first remaining output. The iopoll() function that implements this
could be of general use for other filter utils, even cat for instance.
This would allow broken pipes to "propagate" backwards in a shell
pipeline.
Note also that -P implies -p. This is necessary, otherwise an output
pipe becoming broken always produces an error. But normally, an output
pipe breaking does not produce an error if EOF is hit on input before
any further data, as no write is then attempted into a broken pipe.
* src/tee.c (iopoll): New function implementing broken pipe detection.
(pipe_check, long_options, main): Add -P/--pipe-check option.
(get_next_out): Helper function for finding next valid output.
(tee_files): Add broken pipe detection before calling read().
(tee_files, fail_output): Break out write failure/output removal logic
to helper function.
diff --git a/src/tee.c b/src/tee.c
index 971b768..c17c5c7 100644
--- a/src/tee.c
+++ b/src/tee.c
@@ -20,6 +20,7 @@
#include <sys/types.h>
#include <signal.h>
#include <getopt.h>
+#include <poll.h>
#include "system.h"
#include "argmatch.h"
@@ -37,6 +38,9 @@
proper_name ("Richard M. Stallman"), \
proper_name ("David MacKenzie")
+#define IOPOLL_BROKEN_OUTPUT -2
+#define IOPOLL_ERROR -3
+
static bool tee_files (int nfiles, char **files);
/* If true, append to output files rather than truncating them. */
@@ -45,6 +49,9 @@ static bool append;
/* If true, ignore interrupts. */
static bool ignore_interrupts;
+/* If true, detect if next output becomes broken while waiting for input. */
+static bool pipe_check;
+
enum output_error
{
output_error_sigpipe, /* traditional behavior, sigpipe enabled. */
@@ -61,6 +68,7 @@ static struct option const long_options[] =
{"append", no_argument, NULL, 'a'},
{"ignore-interrupts", no_argument, NULL, 'i'},
{"output-error", optional_argument, NULL, 'p'},
+ {"pipe-check", no_argument, NULL, 'P'},
{GETOPT_HELP_OPTION_DECL},
{GETOPT_VERSION_OPTION_DECL},
{NULL, 0, NULL, 0}
@@ -94,6 +102,8 @@ Copy standard input to each FILE, and also to standard output.\n\
fputs (_("\
-p diagnose errors writing to non pipes\n\
--output-error[=MODE] set behavior on write error. See MODE below\n\
+ -P, --pipe-check detect broken output pipe while waiting for input;\n\
+ implies -p\n\
"), stdout);
fputs (HELP_OPTION_DESCRIPTION, stdout);
fputs (VERSION_OPTION_DESCRIPTION, stdout);
@@ -131,7 +141,7 @@ main (int argc, char **argv)
append = false;
ignore_interrupts = false;
- while ((optc = getopt_long (argc, argv, "aip", long_options, NULL)) != -1)
+ while ((optc = getopt_long (argc, argv, "aiPp", long_options, NULL)) != -1)
{
switch (optc)
{
@@ -143,6 +153,9 @@ main (int argc, char **argv)
ignore_interrupts = true;
break;
+ case 'P':
+ pipe_check = true; /* falls through */
+
case 'p':
if (optarg)
output_error = XARGMATCH ("--output-error", optarg,
@@ -176,6 +189,60 @@ main (int argc, char **argv)
return ok ? EXIT_SUCCESS : EXIT_FAILURE;
}
+/* Wait for fdin to become ready for reading or fdout to become a broken pipe.
+ Return 0 if fdin can be read() without blocking, or IOPOLL_BROKEN_OUTPUT if
+ fdout becomes a broken pipe, otherwise IOPOLL_ERROR if there is a poll()
+ error. */
+
+static int
+iopoll(int fdin, int fdout)
+{
+ struct pollfd pfds[2] = {{fdin, POLLIN, 0}, {fdout, 0, 0}};
+
+ while (poll (pfds, 2, -1) > 0 || errno == EINTR)
+ {
+ if (pfds[0].revents) /* input available or pipe closed indicating EOF; */
+ return 0; /* should now be able to read() without blocking */
+ if (pfds[1].revents) /* POLLERR, POLLHUP, or POLLNVAL */
+ return IOPOLL_BROKEN_OUTPUT; /* output error or broken pipe */
+ }
+
+ return IOPOLL_ERROR; /* poll error */
+}
+
+/* Return the index of the first non-NULL descriptor after first_out,
+ or -1 if all are NULL. */
+
+static int
+get_next_out(FILE **descriptors, int nfiles, int first_out)
+{
+ for (first_out++; first_out <= nfiles; first_out++)
+ if (descriptors[first_out])
+ return first_out;
+ return -1; /* no outputs remaining */
+}
+
+/* Remove descriptors[i] due to write failure or broken pipe.
+ Return true if this indicates a reportable error. */
+
+static bool
+fail_output(FILE **descriptors, char **files, int i)
+{
+ int w_errno = errno;
+ bool fail = errno != EPIPE || (output_error == output_error_exit
+ || output_error == output_error_warn);
+ if (descriptors[i] == stdout)
+ clearerr (stdout); /* Avoid redundant close_stdout diagnostic. */
+ if (fail)
+ {
+ error (output_error == output_error_exit
+ || output_error == output_error_exit_nopipe,
+ w_errno, "%s", quotef (files[i]));
+ }
+ descriptors[i] = NULL;
+ return fail;
+}
+
/* Copy the standard input into each of the NFILES files in FILES
and into the standard output. As a side effect, modify FILES[-1].
Return true if successful. */
@@ -188,6 +255,7 @@ tee_files (int nfiles, char **files)
char buffer[BUFSIZ];
ssize_t bytes_read = 0;
int i;
+ int first_out = 0;
bool ok = true;
char const *mode_string =
(O_BINARY
@@ -228,6 +296,26 @@ tee_files (int nfiles, char **files)
while (n_outputs)
{
+ if (pipe_check)
+ {
+ int err = iopoll (STDIN_FILENO, fileno (descriptors[first_out]));
+
+ if (err == IOPOLL_BROKEN_OUTPUT)
+ {
+ errno = EPIPE; /* behave like write produced EPIPE */
+ if (fail_output (descriptors, files, first_out))
+ ok = false;
+ n_outputs--;
+ first_out = get_next_out (descriptors, nfiles, first_out);
+ continue;
+ }
+ else if (err == IOPOLL_ERROR)
+ {
+ error (0, errno, _("poll error"));
+ ok = false;
+ }
+ }
+
bytes_read = read (STDIN_FILENO, buffer, sizeof buffer);
if (bytes_read < 0 && errno == EINTR)
continue;
@@ -240,21 +328,11 @@ tee_files (int nfiles, char **files)
if (descriptors[i]
&& fwrite (buffer, bytes_read, 1, descriptors[i]) != 1)
{
- int w_errno = errno;
- bool fail = errno != EPIPE || (output_error == output_error_exit
- || output_error == output_error_warn);
- if (descriptors[i] == stdout)
- clearerr (stdout); /* Avoid redundant close_stdout diagnostic. */
- if (fail)
- {
- error (output_error == output_error_exit
- || output_error == output_error_exit_nopipe,
- w_errno, "%s", quotef (files[i]));
- }
- descriptors[i] = NULL;
- if (fail)
+ if (fail_output (descriptors, files, i))
ok = false;
n_outputs--;
+ if (first_out == i)
+ first_out = get_next_out (descriptors, nfiles, first_out);
}
}