On 01/15/2014 10:52 PM, Alvaro Herrera wrote: > I gave this patch a look. There was a bug that the final bounds check > for int32 range was not done when there was no suffix, so in effect you > could pass numbers larger than UINT_MAX and pg_basebackup would not > complain until the number reached the server via BASE_BACKUP. Maybe > that's fine, given that numbers above 1G will cause a failure on the > server side anyway, but it looked like a bug to me. I tweaked the parse > routine slightly; other than fix the bug, I made it accept fractional > numbers, so you can say 0.5M for instance.
Thanks. > Perhaps we should make sure pg_basebackup rejects numbers larger than 1G > as well. Is there a good place to define the constant, so that both backend and client can use it? I'd say 'include/common' but no existing file seems to be appropriate. I'm not sure if it's worth to add a new file there. > Another thing I found a bit strange was the use of the latch. What this > patch does is create a separate latch which is used for the throttling. > This means that if the walsender process receives a signal, it will not > wake up if it's sleeping in throttling. Perhaps this is okay: as Andres > was quoted upthread as saying, maybe this is not a problem because the > sleep times are typically short anyway. But we're pretty much used to > the idea that whenever a signal is sent, processes act on it > *immediately*. Maybe some admin will not feel comfortable about waiting > some extra 20ms when they cancel their base backups. In any case, > having a secondary latch to sleep on in a process seems weird. Maybe > this should be using MyWalSnd->latch somehow. o.k., MyWalSnd->latch is used now. > You have this interesting THROTTLING_MAX_FREQUENCY constant defined to > 128, with the comment "check this many times per second". > Let's see: if the user requests 1MB/s, this value results in > throttling_sample = 1MB / 128 = 8192. So for every 8kB transferred, we > would stop, check the wall clock time, and if less time has lapsed than > we were supposed to spend transferring those 8kB then we sleep. Isn't a > check every 8kB a bit too frequent? This doesn't seem sensible to me. > I think we should be checking perhaps every tenth of the requested > maximum rate, or something like that, not every 1/128th. > > Now, what the code actually does is not necessarily that, because the > sampling value is clamped to a minimum of 32 kB. But then if we're > going to do that, why use such a large divisor value in the first place? > I propose we set that constant to a smaller value such as 8. I tried to use THROTTLING_SAMPLE_MIN and THROTTLING_MAX_FREQUENCY to control both the minimum and maximum chunk size. It was probably too generic, THROTTLING_SAMPLE_MIN is no longer there. New patch version is attached. // Antonin Houska (Tony)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 7d99976..799d214 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1720,7 +1720,7 @@ The commands accepted in walsender mode are: </varlistentry> <varlistentry> - <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term> + <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal>]</term> <listitem> <para> Instructs the server to start streaming a base backup. @@ -1788,7 +1788,23 @@ The commands accepted in walsender mode are: the waiting and the warning, leaving the client responsible for ensuring the required log is available. </para> - </listitem> + </listitem> + </varlistentry> + <varlistentry> + <term><literal>MAX_RATE</literal> <replaceable>rate</></term> + <listitem> + <para> + Limit (throttle) the maximum amount of data transferred from server + to client per unit of time. The expected unit is bytes per second. + If this option is specified, the value must either be equal to zero + or it must fall within the range from 32 kB through 1 GB (inclusive). + If zero is passed or the option is not specified, no restriction is + imposed on the transfer. + </para> + <para> + <literal>MAX_RATE</literal> does not affect WAL streaming. + </para> + </listitem> </varlistentry> </variablelist> </para> diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index c379df5..2ec81b7 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -189,6 +189,27 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>-r <replaceable class="parameter">rate</replaceable></option></term> + <term><option>--max-rate=<replaceable class="parameter">rate</replaceable></option></term> + <listitem> + <para> + The maximum amount of data transferred from server per second. + The purpose is to limit the impact of <application>pg_basebackup</application> + on the running server. + </para> + <para> + This option always affects transfer of the data directory. Transfer of + WAL files is only affected if the collection method is <literal>fetch</literal>. + </para> + <para> + Valid values are between <literal>32 kB</literal> and <literal>1024 MB</literal> + is expected. Suffixes <literal>k</literal> (kilobytes) and + <literal>M</literal> (megabytes) are accepted. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>-R</option></term> <term><option>--write-recovery-conf</option></term> <listitem> diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index fc35f5b..a0216c1 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -33,6 +33,7 @@ #include "utils/builtins.h" #include "utils/elog.h" #include "utils/ps_status.h" +#include "utils/timestamp.h" #include "pgtar.h" typedef struct @@ -42,6 +43,7 @@ typedef struct bool fastcheckpoint; bool nowait; bool includewal; + uint32 maxrate; } basebackup_options; @@ -59,6 +61,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const void *a, const void *b); +static void throttle(size_t increment); /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; @@ -68,6 +71,35 @@ static bool backup_started_in_recovery = false; */ #define TAR_SEND_SIZE 32768 + +/* + * The maximum amount of data per second - bounds of the user input. + * + * If the maximum should be increased to more than 4 GB, uint64 must + * be introduced for the related variables. However such high values have + * little to do with throttling. + */ +#define MAX_RATE_LOWER 32768 +#define MAX_RATE_UPPER (1024 << 20) + + +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* The actual value, transfer of which may cause sleep. */ +static uint32 throttling_sample; + +/* Amount of data already transfered but not yet throttled. */ +static int32 throttling_counter; + +/* The minimum time required to transfer throttling_sample bytes. */ +static int64 elapsed_min_unit; + +/* The last check of the transfer rate. */ +static int64 throttled_last; + typedef struct { char *oid; @@ -187,6 +219,30 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send tablespace header */ SendBackupHeader(tablespaces); + /* Setup and activate network throttling, if client requested it */ + if (opt->maxrate > 0) + { + throttling_sample = opt->maxrate / THROTTLING_FREQUENCY; + + /* + * opt->maxrate is bytes per second. Thus the expression in + * brackets is microseconds per byte. + */ + elapsed_min_unit = throttling_sample * + ((double) USECS_PER_SEC / opt->maxrate); + + /* Enable throttling. */ + throttling_counter = 0; + + /* The 'real data' starts now (header was ignored). */ + throttled_last = GetCurrentIntegerTimestamp(); + } + else + { + /* Disable throttling. */ + throttling_counter = -1; + } + /* Send off our tablespaces one by one */ foreach(lc, tablespaces) { @@ -414,6 +470,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); + if (len == XLogSegSize) break; } @@ -484,6 +542,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_fast = false; bool o_nowait = false; bool o_wal = false; + bool o_maxrate = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -535,6 +594,29 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->includewal = true; o_wal = true; } + else if (strcmp(defel->defname, "maxrate") == 0) + { + long maxrate; + + if (o_maxrate) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + maxrate = intVal(defel->arg); + + opt->maxrate = (uint32) maxrate; + if (opt->maxrate > 0 && + (opt->maxrate < MAX_RATE_LOWER || opt->maxrate > MAX_RATE_UPPER)) + { + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("transfer rate %u bytes per second is out of range", + opt->maxrate), + errhint("The accepted range is %u through %u kB per second", + MAX_RATE_LOWER >> 10, MAX_RATE_UPPER >> 10))); + } + o_maxrate = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -1071,6 +1153,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); if (len >= statbuf->st_size) { @@ -1092,10 +1175,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, cnt = Min(sizeof(buf), statbuf->st_size - len); pq_putmessage('d', buf, cnt); len += cnt; + throttle(cnt); } } - /* Pad to 512 byte boundary, per tar format requirements */ + /* + * Pad to 512 byte boundary, per tar format requirements. (This small + * piece of data is probably not worth throttling.) + */ pad = ((len + 511) & ~511) - len; if (pad > 0) { @@ -1121,3 +1208,64 @@ _tarWriteHeader(const char *filename, const char *linktarget, pq_putmessage('d', h, 512); } + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(size_t increment) +{ + int64 elapsed, + elapsed_min, + sleep; + int wait_result = 0; + + if (throttling_counter < 0) + return; + + throttling_counter += increment; + if (throttling_counter < throttling_sample) + return; + + /* Time elapsed since the last measuring (and possible wake up). */ + elapsed = GetCurrentIntegerTimestamp() - throttled_last; + /* How much should have elapsed at minimum? */ + elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample); + sleep = elapsed_min - elapsed; + /* Only sleep if the transfer is faster than it should be. */ + if (sleep > 0) + { + Assert(MyWalSnd); + ResetLatch(&MyWalSnd->latch); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(&MyWalSnd->latch, WL_LATCH_SET | WL_TIMEOUT + | WL_POSTMASTER_DEATH, (long) (sleep / 1000)); + } + else + { + /* + * The actual transfer rate is below the limit. Negative value would + * distort the adjustment of throttled_last. + */ + sleep = 0; + } + + /* + * Only a whole multiple of throttling_sample was processed. The rest will + * be done during the next call of this function. + */ + throttling_counter %= throttling_sample; + + /* Once the (possible) sleep has ended, new period starts. */ + if (wait_result | WL_TIMEOUT) + throttled_last += elapsed + sleep; + else if (sleep > 0) + /* Sleep was necessary but might have been interrupted. */ + throttled_last = GetCurrentIntegerTimestamp(); +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 015aa44..5f1f091 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -78,6 +78,7 @@ Node *replication_parse_result; %token K_PROGRESS %token K_FAST %token K_NOWAIT +%token K_MAX_RATE %token K_WAL %token K_TIMELINE @@ -116,7 +117,7 @@ identify_system: ; /* - * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] + * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d] */ base_backup: K_BASE_BACKUP base_backup_opt_list @@ -156,6 +157,11 @@ base_backup_opt: $$ = makeDefElem("nowait", (Node *)makeInteger(TRUE)); } + | K_MAX_RATE UCONST + { + $$ = makeDefElem("maxrate", + (Node *)makeInteger($2)); + } ; /* diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 01e5ac6..74f7a34 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -71,6 +71,7 @@ IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } PROGRESS { return K_PROGRESS; } +MAX_RATE { return K_MAX_RATE; } WAL { return K_WAL; } TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 9d13d57..1d198ad 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -46,6 +46,7 @@ static bool streamwal = false; static bool fastcheckpoint = false; static bool writerecoveryconf = false; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +static uint32 maxrate = 0; /* no limit by default */ /* Progress counters */ static uint64 totalsize; @@ -76,6 +77,7 @@ static PQExpBuffer recoveryconfcontents = NULL; static void usage(void); static void verify_dir_is_empty_or_create(char *dirname); static void progress_report(int tablespacenum, const char *filename); +static uint32 parse_max_rate(char *src); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); @@ -111,6 +113,7 @@ usage(void) printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain (default), tar)\n")); + printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n")); printf(_(" -R, --write-recovery-conf\n" " write recovery.conf after backup\n")); printf(_(" -x, --xlog include required WAL files in backup (fetch mode)\n")); @@ -475,6 +478,86 @@ progress_report(int tablespacenum, const char *filename) fprintf(stderr, "\r"); } +static uint32 +parse_max_rate(char *src) +{ + double factor; + double result; + char *after_num; + + errno = 0; + result = strtod(src, &after_num); + if (src == after_num) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" is not a valid value\n"), + progname, src); + exit(1); + } + if (errno != 0) + { + fprintf(stderr, + _("%s: invalid transfer rate \"%s\": %s\n"), + progname, src, strerror(errno)); + exit(1); + } + + if (result <= 0) + { + /* + * Reject obviously wrong values here. Exact check of the range to be + * done on server. + */ + fprintf(stderr, _("%s: transfer rate must be greater than zero\n"), + progname); + exit(1); + } + + /* + * Evaluate (optional) suffix after skipping over possible whitespace. + */ + factor = 1.0; + while (*after_num != '\0' && isspace(*after_num)) + after_num++; + switch (*after_num) + { + case 'M': + factor = 1048576.0; + after_num++; + break; + case 'k': + factor = 1024.0; + after_num++; + break; + } + + /* The rest can only consist of white space. */ + while (*after_num != '\0') + { + if (!isspace(*after_num)) + { + fprintf(stderr, + _("%s: invalid --max-rate units: \"%s\"\n"), + progname, after_num); + exit(1); + } + after_num++; + } + + if (factor > 1) + result *= factor; + + /* Check the integer range */ + if ((uint64) result != (uint64) ((uint32) result)) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" exceeds integer range\n"), + progname, src); + exit(1); + } + + return (uint32) result; +} /* * Write a piece of tar data @@ -1308,8 +1391,9 @@ BaseBackup(void) char *sysidentifier; uint32 latesttli; uint32 starttli; - char current_path[MAXPGPATH]; + char *basebkp; char escaped_label[MAXPGPATH]; + char *maxrate_clause = NULL; int i; char xlogstart[64]; char xlogend[64]; @@ -1382,15 +1466,20 @@ BaseBackup(void) * Start the actual backup */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); - snprintf(current_path, sizeof(current_path), - "BASE_BACKUP LABEL '%s' %s %s %s %s", - escaped_label, - showprogress ? "PROGRESS" : "", - includewal && !streamwal ? "WAL" : "", - fastcheckpoint ? "FAST" : "", - includewal ? "NOWAIT" : ""); - - if (PQsendQuery(conn, current_path) == 0) + + if (maxrate > 0) + maxrate_clause = psprintf("MAX_RATE %u", maxrate); + + basebkp = + psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s", + escaped_label, + showprogress ? "PROGRESS" : "", + includewal && !streamwal ? "WAL" : "", + fastcheckpoint ? "FAST" : "", + includewal ? "NOWAIT" : "", + maxrate_clause ? maxrate_clause : ""); + + if (PQsendQuery(conn, basebkp) == 0) { fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), progname, "BASE_BACKUP", PQerrorMessage(conn)); @@ -1657,6 +1746,7 @@ main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, + {"max-rate", required_argument, NULL, 'r'}, {"write-recovery-conf", no_argument, NULL, 'R'}, {"xlog", no_argument, NULL, 'x'}, {"xlog-method", required_argument, NULL, 'X'}, @@ -1697,7 +1787,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:F:RxX:l:zZ:d:c:h:p:U:s:wWvP", + while ((c = getopt_long(argc, argv, "D:F:r:RxX:l:zZ:d:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) @@ -1718,6 +1808,9 @@ main(int argc, char **argv) exit(1); } break; + case 'r': + maxrate = parse_max_rate(optarg); + break; case 'R': writerecoveryconf = true; break;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers