On 01/31/2014 06:26 AM, Fujii Masao wrote: >> Is there a good place to define the constant, so that both backend and >> client can use it? I'd say 'include/common' but no existing file seems >> to be appropriate. I'm not sure if it's worth to add a new file there. > > If there is no good place to define them, it's okay to define them > also in client side > for now. > + <term>BASE_BACKUP [<literal>LABEL</literal> > <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] > [<literal>FAST</literal>] [<literal>WAL</literal>] > [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal>]</term> > > It's better to add something like <replaceable>'rate'</replaceable> just after > <literal>MAX_RATE</literal>. > > + <para> > + <literal>MAX_RATE</literal> does not affect WAL streaming. > + </para> > > I don't think that this paragraph is required here because BASE_BACKUP is > basically independent from WAL streaming. > > Why did you choose "bytes per second" as a valid rate which we can specify? > Since the minimum rate is 32kB, isn't it better to use "KB per second" for > that? > If we do that, we can easily increase the maximum rate from 1GB to very large > number in the future if required.
The attached version addresses all the comments above. > + wait_result = WaitLatch(&MyWalSnd->latch, WL_LATCH_SET | WL_TIMEOUT > + | WL_POSTMASTER_DEATH, (long) (sleep / 1000)); > > If WL_POSTMASTER_DEATH is triggered, we should exit immediately like > other process does? This is not a problem of this patch. This problem exists > also in current master. But ISTM it's better to solve that together. Thought? Once we're careful about not missing signals, I think PM death should be noticed too. The backup functionality itself would probably manage to finish without postmaster, however it's executed under walsender process. Question is where !PostmasterIsAlive() check should be added. I think it should go to the main loop of perform_base_backup(), but that's probably not in the scope of this patch. Do you think that my patch should only add a comment like "Don't forget to set WL_POSTMASTER_DEATH flag when making basebackup.c sensitive to PM death?" // Antonin Houska (Tony)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 832524e..704b653 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1772,7 +1772,7 @@ The commands accepted in walsender mode are: </varlistentry> <varlistentry> - <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term> + <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal> <replaceable>'rate'</replaceable>]</term> <listitem> <para> Instructs the server to start streaming a base backup. @@ -1840,7 +1840,20 @@ The commands accepted in walsender mode are: the waiting and the warning, leaving the client responsible for ensuring the required log is available. </para> - </listitem> + </listitem> + </varlistentry> + <varlistentry> + <term><literal>MAX_RATE</literal> <replaceable>rate</></term> + <listitem> + <para> + Limit (throttle) the maximum amount of data transferred from server + to client per unit of time. The expected unit is kilobytes per second. + If this option is specified, the value must either be equal to zero + or it must fall within the range from 32 kB through 1 GB (inclusive). + If zero is passed or the option is not specified, no restriction is + imposed on the transfer. + </para> + </listitem> </varlistentry> </variablelist> </para> diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index c379df5..f8866db 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -189,6 +189,27 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>-r <replaceable class="parameter">rate</replaceable></option></term> + <term><option>--max-rate=<replaceable class="parameter">rate</replaceable></option></term> + <listitem> + <para> + The maximum amount of data transferred from server per second. + The purpose is to limit the impact of <application>pg_basebackup</application> + on the running server. + </para> + <para> + This option always affects transfer of the data directory. Transfer of + WAL files is only affected if the collection method is <literal>fetch</literal>. + </para> + <para> + Valid values are between <literal>32 kB</literal> and <literal>1024 MB</literal>. + Suffixes <literal>k</literal> (kilobytes) and <literal>M</literal> + (megabytes) are accepted. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>-R</option></term> <term><option>--write-recovery-conf</option></term> <listitem> diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 06e54bc..1395542 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -34,6 +34,7 @@ #include "utils/builtins.h" #include "utils/elog.h" #include "utils/ps_status.h" +#include "utils/timestamp.h" #include "pgtar.h" typedef struct @@ -43,6 +44,7 @@ typedef struct bool fastcheckpoint; bool nowait; bool includewal; + uint32 maxrate; } basebackup_options; @@ -60,6 +62,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const void *a, const void *b); +static void throttle(size_t increment); /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; @@ -72,6 +75,31 @@ static char *statrelpath = NULL; */ #define TAR_SEND_SIZE 32768 + +/* + * The maximum amount of data (kB per second) - bounds of the user input. + */ +#define MAX_RATE_LOWER 32 +#define MAX_RATE_UPPER 1048576 + + +/* + * How frequently to throttle, as a fraction of the specified rate-second. + */ +#define THROTTLING_FREQUENCY 8 + +/* The actual number of bytes, transfer of which may cause sleep. */ +static uint64 throttling_sample; + +/* Amount of data already transfered but not yet throttled. */ +static int64 throttling_counter; + +/* The minimum time required to transfer throttling_sample bytes. */ +static int64 elapsed_min_unit; + +/* The last check of the transfer rate. */ +static int64 throttled_last; + typedef struct { char *oid; @@ -203,6 +231,29 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send tablespace header */ SendBackupHeader(tablespaces); + /* Setup and activate network throttling, if client requested it */ + if (opt->maxrate > 0) + { + throttling_sample = opt->maxrate * 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample + * bytes to be transfered. + */ + elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + /* Enable throttling. */ + throttling_counter = 0; + + /* The 'real data' starts now (header was ignored). */ + throttled_last = GetCurrentIntegerTimestamp(); + } + else + { + /* Disable throttling. */ + throttling_counter = -1; + } + /* Send off our tablespaces one by one */ foreach(lc, tablespaces) { @@ -430,6 +481,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); + if (len == XLogSegSize) break; } @@ -500,6 +553,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_fast = false; bool o_nowait = false; bool o_wal = false; + bool o_maxrate = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -551,6 +605,28 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->includewal = true; o_wal = true; } + else if (strcmp(defel->defname, "maxrate") == 0) + { + long maxrate; + + if (o_maxrate) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + maxrate = intVal(defel->arg); + + opt->maxrate = (uint32) maxrate; + if (opt->maxrate < MAX_RATE_LOWER || opt->maxrate > MAX_RATE_UPPER) + { + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("transfer rate %u kilobytes per second is out of range", + opt->maxrate), + errhint("The accepted range is %u through %u kB per second", + MAX_RATE_LOWER, MAX_RATE_UPPER))); + } + o_maxrate = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -1104,6 +1180,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); if (len >= statbuf->st_size) { @@ -1125,10 +1202,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, cnt = Min(sizeof(buf), statbuf->st_size - len); pq_putmessage('d', buf, cnt); len += cnt; + throttle(cnt); } } - /* Pad to 512 byte boundary, per tar format requirements */ + /* + * Pad to 512 byte boundary, per tar format requirements. (This small + * piece of data is probably not worth throttling.) + */ pad = ((len + 511) & ~511) - len; if (pad > 0) { @@ -1154,3 +1235,64 @@ _tarWriteHeader(const char *filename, const char *linktarget, pq_putmessage('d', h, 512); } + +/* + * Increment the network transfer counter by the given number of bytes, + * and sleep if necessary to comply with the requested network transfer + * rate. + */ +static void +throttle(size_t increment) +{ + int64 elapsed, + elapsed_min, + sleep; + int wait_result = 0; + + if (throttling_counter < 0) + return; + + throttling_counter += increment; + if (throttling_counter < throttling_sample) + return; + + /* Time elapsed since the last measuring (and possible wake up). */ + elapsed = GetCurrentIntegerTimestamp() - throttled_last; + /* How much should have elapsed at minimum? */ + elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample); + sleep = elapsed_min - elapsed; + /* Only sleep if the transfer is faster than it should be. */ + if (sleep > 0) + { + Assert(MyWalSnd); + ResetLatch(&MyWalSnd->latch); + + /* + * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be + * the maximum time to sleep. Thus the cast to long is safe. + */ + wait_result = WaitLatch(&MyWalSnd->latch, WL_LATCH_SET | WL_TIMEOUT + | WL_POSTMASTER_DEATH, (long) (sleep / 1000)); + } + else + { + /* + * The actual transfer rate is below the limit. Negative value would + * distort the adjustment of throttled_last. + */ + sleep = 0; + } + + /* + * Only a whole multiple of throttling_sample was processed. The rest will + * be done during the next call of this function. + */ + throttling_counter %= throttling_sample; + + /* Once the (possible) sleep has ended, new period starts. */ + if (wait_result & WL_TIMEOUT) + throttled_last += elapsed + sleep; + else if (sleep > 0) + /* Sleep was necessary but might have been interrupted. */ + throttled_last = GetCurrentIntegerTimestamp(); +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index c3f4a24..ba8f885 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -69,6 +69,7 @@ Node *replication_parse_result; %token K_PROGRESS %token K_FAST %token K_NOWAIT +%token K_MAX_RATE %token K_WAL %token K_TIMELINE %token K_PHYSICAL @@ -113,7 +114,7 @@ identify_system: ; /* - * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] + * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d] */ base_backup: K_BASE_BACKUP base_backup_opt_list @@ -157,6 +158,11 @@ base_backup_opt: $$ = makeDefElem("nowait", (Node *)makeInteger(TRUE)); } + | K_MAX_RATE UCONST + { + $$ = makeDefElem("maxrate", + (Node *)makeInteger($2)); + } ; /* CREATE_REPLICATION_SLOT SLOT slot PHYSICAL */ diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 24195a5..ca32aa6 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -86,6 +86,7 @@ IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } PROGRESS { return K_PROGRESS; } +MAX_RATE { return K_MAX_RATE; } WAL { return K_WAL; } TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 9d13d57..0dfb60c 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -46,6 +46,14 @@ static bool streamwal = false; static bool fastcheckpoint = false; static bool writerecoveryconf = false; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +static int32 maxrate = 0; /* no limit by default */ + +/* + * Minimum and maximum values of maxrate (kB per second), to check user input + * on client side. + */ +#define MAX_RATE_LOWER 32 +#define MAX_RATE_UPPER 1048576 /* Progress counters */ static uint64 totalsize; @@ -76,6 +84,7 @@ static PQExpBuffer recoveryconfcontents = NULL; static void usage(void); static void verify_dir_is_empty_or_create(char *dirname); static void progress_report(int tablespacenum, const char *filename); +static int32 parse_max_rate(char *src); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); @@ -111,6 +120,7 @@ usage(void) printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain (default), tar)\n")); + printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n")); printf(_(" -R, --write-recovery-conf\n" " write recovery.conf after backup\n")); printf(_(" -x, --xlog include required WAL files in backup (fetch mode)\n")); @@ -475,6 +485,101 @@ progress_report(int tablespacenum, const char *filename) fprintf(stderr, "\r"); } +static int32 +parse_max_rate(char *src) +{ + double result; + char *after_num; + char *suffix = NULL; + + errno = 0; + result = strtod(src, &after_num); + if (src == after_num) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" is not a valid value\n"), + progname, src); + exit(1); + } + if (errno != 0) + { + fprintf(stderr, + _("%s: invalid transfer rate \"%s\": %s\n"), + progname, src, strerror(errno)); + exit(1); + } + + if (result <= 0) + { + /* + * Reject obviously wrong values here. + */ + fprintf(stderr, _("%s: transfer rate must be greater than zero\n"), + progname); + exit(1); + } + + /* + * Evaluate suffix, after skipping over possible whitespace. + */ + while (*after_num != '\0' && isspace(*after_num)) + after_num++; + + if (*after_num == '\0') + { + fprintf(stderr, + _("%s: missing --max-rate units\n"), progname); + exit(1); + } + + suffix = after_num; + if (*after_num == 'k') + { + /* kilobyte is the expected unit. */ + after_num++; + } + else if (*after_num == 'M') + { + after_num++; + result *= 1024.0; + } + + /* The rest can only consist of white space. */ + while (*after_num != '\0' && isspace(*after_num)) + after_num++; + + if (*after_num != '\0') + { + fprintf(stderr, + _("%s: invalid --max-rate units: \"%s\"\n"), + progname, suffix); + exit(1); + } + + + /* Valid integer? */ + if ((uint64) result != (uint64) ((uint32) result)) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" exceeds integer range\n"), + progname, src); + exit(1); + } + + /* + * The range is checked on server side too, but let's avoid server + * connection if non-sensible value has been passed. + */ + if (result < MAX_RATE_LOWER || result > MAX_RATE_UPPER) + { + fprintf(stderr, + _("%s: transfer rate \"%s\" is out of range\n"), + progname, src); + exit(1); + } + + return (int32) result; +} /* * Write a piece of tar data @@ -1308,8 +1413,9 @@ BaseBackup(void) char *sysidentifier; uint32 latesttli; uint32 starttli; - char current_path[MAXPGPATH]; + char *basebkp; char escaped_label[MAXPGPATH]; + char *maxrate_clause = NULL; int i; char xlogstart[64]; char xlogend[64]; @@ -1382,15 +1488,20 @@ BaseBackup(void) * Start the actual backup */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); - snprintf(current_path, sizeof(current_path), - "BASE_BACKUP LABEL '%s' %s %s %s %s", - escaped_label, - showprogress ? "PROGRESS" : "", - includewal && !streamwal ? "WAL" : "", - fastcheckpoint ? "FAST" : "", - includewal ? "NOWAIT" : ""); - if (PQsendQuery(conn, current_path) == 0) + if (maxrate > 0) + maxrate_clause = psprintf("MAX_RATE %u", maxrate); + + basebkp = + psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s", + escaped_label, + showprogress ? "PROGRESS" : "", + includewal && !streamwal ? "WAL" : "", + fastcheckpoint ? "FAST" : "", + includewal ? "NOWAIT" : "", + maxrate_clause ? maxrate_clause : ""); + + if (PQsendQuery(conn, basebkp) == 0) { fprintf(stderr, _("%s: could not send replication command \"%s\": %s"), progname, "BASE_BACKUP", PQerrorMessage(conn)); @@ -1657,6 +1768,7 @@ main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, + {"max-rate", required_argument, NULL, 'r'}, {"write-recovery-conf", no_argument, NULL, 'R'}, {"xlog", no_argument, NULL, 'x'}, {"xlog-method", required_argument, NULL, 'X'}, @@ -1697,7 +1809,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:F:RxX:l:zZ:d:c:h:p:U:s:wWvP", + while ((c = getopt_long(argc, argv, "D:F:r:RxX:l:zZ:d:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) @@ -1718,6 +1830,9 @@ main(int argc, char **argv) exit(1); } break; + case 'r': + maxrate = parse_max_rate(optarg); + break; case 'R': writerecoveryconf = true; break;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers