Thanks for checking. The new version addresses your findings. // Antonin Houska (Tony)
On 12/09/2013 03:49 PM, Fujii Masao wrote: > On Fri, Dec 6, 2013 at 6:43 PM, Boszormenyi Zoltan <z...@cybertec.at> wrote: >> Hi, >> >> 2013-12-05 15:36 keltezéssel, Antonin Houska írta: >> >>> On 12/02/2013 02:23 PM, Boszormenyi Zoltan wrote: >>>> >>>> Hi, >>>> >>>> I am reviewing your patch. >>> >>> Thanks. New version attached. >> >> >> I have reviewed and tested it and marked it as ready for committer. > > Here are the review comments: > > + <term><option>-r</option></term> > + <term><option>--max-rate</option></term> > > You need to add something like <replaceable > class="parameter">rate</replaceable>. > > + The purpose is to limit impact of > <application>pg_basebackup</application> > + on a running master server. > > s/"master server"/"server" because we can take a backup from also the standby. > > I think that it's better to document the default value and the accepted range > of > the rate that we can specify. > > You need to change the protocol.sgml because you changed BASE_BACKUP > replication command. > > + printf(_(" -r, --max-rate maximum transfer rate to > transfer data directory\n")); > > You need to add something like =RATE just after --max-rate. > > + result = strtol(src, &after_num, 0); > > errno should be set to 0 just before calling strtol(). > > + if (errno_copy == ERANGE || result != (uint64) ((uint32) result)) > + { > + fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer > range\n"), progname, src); > + exit(1); > + } > > We can move this check after the check of "src == after_num" like > parse_int() in guc.c does. > If we do this, the local variable 'errno_copy' is no longer necessary. > > I think that it's better to output the hint message like "Valid units for > the transfer rate are \"k\" and \"M\"." when a user specified wrong unit. > > + /* > + * THROTTLING_SAMPLE_MIN / MAX_RATE_LOWER (in seconds) should be the > + * longest possible time to sleep. Thus the cast to long is safe. > + */ > + pg_usleep((long) sleep); > > It's better to use the latch here so that we can interrupt immediately. > > Regards, >
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 0b2e60e..2f24fff 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -1719,7 +1719,7 @@ The commands accepted in walsender mode are: </varlistentry> <varlistentry> - <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term> + <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal>]</term> <listitem> <para> Instructs the server to start streaming a base backup. @@ -1787,7 +1787,23 @@ The commands accepted in walsender mode are: the waiting and the warning, leaving the client responsible for ensuring the required log is available. </para> - </listitem> + </listitem> + </varlistentry> + <varlistentry> + <term><literal>MAX_RATE</literal></term> + <listitem> + <para> + Limit the maximum amount of data transferred to client per unit of + time. The expected unit is bytes per second. If + <literal>MAX_RATE</literal> is passed, it must be either equal to + zero or fall to range 32 kB through 1 GB (inclusive). If zero is + passed or the option is not passed at all, no restriction is imposed + on the transfer. + </para> + <para> + <literal>MAX_RATE</literal> does not affect WAL streaming. + </para> + </listitem> </varlistentry> </variablelist> </para> diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index c379df5..caede77 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -189,6 +189,28 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>-r <replaceable class="parameter">rate</replaceable></option></term> + <term><option>--max-rate=<replaceable class="parameter">rate</replaceable></option></term> + <listitem> + <para> + The maximum amount of data transferred from server per second. + The purpose is to limit impact of <application>pg_basebackup</application> + on running server. + </para> + <para> + This option always affects transfer of the data directory. Transfer of + WAL files is only affected if the collection method is <literal>fetch</literal>. + </para> + <para> + Value between <literal>32 kB</literal> and <literal>1024 MB</literal> + is expected. Suffixes <literal>k</literal> (kilobytes) and + <literal>M</literal> (megabytes) are accepted. For example: + <literal>10M</literal> + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>-R</option></term> <term><option>--write-recovery-conf</option></term> <listitem> diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index ba8d173..e3ff2cf 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -30,9 +30,11 @@ #include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/elog.h" #include "utils/ps_status.h" +#include "utils/timestamp.h" #include "pgtar.h" typedef struct @@ -42,6 +44,7 @@ typedef struct bool fastcheckpoint; bool nowait; bool includewal; + uint32 maxrate; } basebackup_options; @@ -59,6 +62,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const void *a, const void *b); +static void throttle(size_t increment); /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; @@ -68,6 +72,42 @@ static bool backup_started_in_recovery = false; */ #define TAR_SEND_SIZE 32768 + +/* + * The maximum amount of data per second - bounds of the user input. + * + * If the maximum should be increased to more than 4 GB, uint64 must + * be introduced for the related variables. However such high values have + * little to do with throttling. + */ +#define MAX_RATE_LOWER 32768 +#define MAX_RATE_UPPER (1024 << 20) + +/* + * Transfer rate is only measured when this number of bytes has been sent. + * (Too frequent checks would impose too high CPU overhead.) + * + * The default value is used unless it'd result in too frequent checks. + */ +#define THROTTLING_SAMPLE_MIN 32768 + +/* The maximum number of checks per second. */ +#define THROTTLING_MAX_FREQUENCY 128 + +/* The actual value, transfer of which may cause sleep. */ +static uint32 throttling_sample; + +/* Amount of data already transfered but not yet throttled. */ +static int32 throttling_counter; + +/* The minimum time required to transfer throttling_sample bytes. */ +static int64 elapsed_min_unit; + +/* The last check of the transfer rate. */ +static int64 throttled_last; + +static Latch throttling_latch; + typedef struct { char *oid; @@ -171,6 +211,35 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) /* Send tablespace header */ SendBackupHeader(tablespaces); + if (opt->maxrate > 0) + { + throttling_sample = opt->maxrate / THROTTLING_MAX_FREQUENCY; + + /* Don't measure too small pieces of data. */ + if (throttling_sample < THROTTLING_SAMPLE_MIN) + throttling_sample = THROTTLING_SAMPLE_MIN; + + /* + * opt->maxrate is bytes per second. Thus the expression in + * brackets is microseconds per byte. + */ + elapsed_min_unit = throttling_sample * + ((double) USECS_PER_SEC / opt->maxrate); + + /* Enable throttling. */ + throttling_counter = 0; + + /* The 'real data' starts now (header was ignored). */ + throttled_last = GetCurrentIntegerTimestamp(); + + InitLatch(&throttling_latch); + } + else + { + /* Disable throttling. */ + throttling_counter = -1; + } + /* Send off our tablespaces one by one */ foreach(lc, tablespaces) { @@ -398,6 +467,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); + if (len == XLogSegSize) break; } @@ -468,6 +539,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_fast = false; bool o_nowait = false; bool o_wal = false; + bool o_maxrate = false; MemSet(opt, 0, sizeof(*opt)); foreach(lopt, options) @@ -519,6 +591,29 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->includewal = true; o_wal = true; } + else if (strcmp(defel->defname, "maxrate") == 0) + { + long maxrate; + + if (o_maxrate) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + maxrate = intVal(defel->arg); + + opt->maxrate = (uint32) maxrate; + if (opt->maxrate > 0 && + (opt->maxrate < MAX_RATE_LOWER || opt->maxrate > MAX_RATE_UPPER)) + { + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("transfer rate %u bytes per second is out of range", + opt->maxrate), + errhint("The accepted range is %u through %u kB per second", + MAX_RATE_LOWER >> 10, MAX_RATE_UPPER >> 10))); + } + o_maxrate = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -1019,6 +1114,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, (errmsg("base backup could not send data, aborting backup"))); len += cnt; + throttle(cnt); if (len >= statbuf->st_size) { @@ -1040,10 +1136,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf, cnt = Min(sizeof(buf), statbuf->st_size - len); pq_putmessage('d', buf, cnt); len += cnt; + throttle(cnt); } } - /* Pad to 512 byte boundary, per tar format requirements */ + /* + * Pad to 512 byte boundary, per tar format requirements. (This small + * piece of data is probably not worth throttling.) + */ pad = ((len + 511) & ~511) - len; if (pad > 0) { @@ -1069,3 +1169,52 @@ _tarWriteHeader(const char *filename, const char *linktarget, pq_putmessage('d', h, 512); } + +static void +throttle(size_t increment) +{ + int64 elapsed, + elapsed_min, + sleep; + + if (throttling_counter < 0) + return; + + throttling_counter += increment; + if (throttling_counter < throttling_sample) + return; + + /* Time elapsed since the last measuring (and possible wake up). */ + elapsed = GetCurrentIntegerTimestamp() - throttled_last; + /* How much should have elapsed at minimum? */ + elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample); + sleep = elapsed_min - elapsed; + /* Only sleep if the transfer is faster than it should be. */ + if (sleep > 0) + { + ResetLatch(&throttling_latch); + /* + * THROTTLING_SAMPLE_MIN / MAX_RATE_LOWER (in seconds) should be the + * longest possible time to sleep. Thus the cast to long is safe. + */ + WaitLatch(&throttling_latch, WL_TIMEOUT | WL_POSTMASTER_DEATH, + (long) (sleep / 1000)); + } + else + { + + /* + * The actual transfer rate is below the limit. Negative value would + * distort the adjustment of throttled_last. + */ + sleep = 0; + } + + /* + * Only the whole multiples of throttling_sample processed. The rest will + * be done during the next call of this function. + */ + throttling_counter %= throttling_sample; + /* Once the (possible) sleep ends, new period starts. */ + throttled_last += elapsed + sleep; +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 8c83780..1c2c31c 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -78,6 +78,7 @@ Node *replication_parse_result; %token K_PROGRESS %token K_FAST %token K_NOWAIT +%token K_MAX_RATE %token K_WAL %token K_TIMELINE @@ -116,7 +117,7 @@ identify_system: ; /* - * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] + * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d] */ base_backup: K_BASE_BACKUP base_backup_opt_list @@ -156,6 +157,11 @@ base_backup_opt: $$ = makeDefElem("nowait", (Node *)makeInteger(TRUE)); } + | K_MAX_RATE UCONST + { + $$ = makeDefElem("maxrate", + (Node *)makeInteger($2)); + } ; /* diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 3d930f1..b2d5e3b 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -71,6 +71,7 @@ IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } LABEL { return K_LABEL; } NOWAIT { return K_NOWAIT; } PROGRESS { return K_PROGRESS; } +MAX_RATE { return K_MAX_RATE; } WAL { return K_WAL; } TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 6706c0c..997b48ee 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -46,6 +46,7 @@ bool streamwal = false; bool fastcheckpoint = false; bool writerecoveryconf = false; int standby_message_timeout = 10 * 1000; /* 10 sec = default */ +uint32 maxrate = 0; /* No limit by default. */ /* Progress counters */ static uint64 totalsize; @@ -76,6 +77,7 @@ static PQExpBuffer recoveryconfcontents = NULL; static void usage(void); static void verify_dir_is_empty_or_create(char *dirname); static void progress_report(int tablespacenum, const char *filename); +static uint32 parse_max_rate(char *src); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); @@ -111,6 +113,7 @@ usage(void) printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain (default), tar)\n")); + printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n")); printf(_(" -R, --write-recovery-conf\n" " write recovery.conf after backup\n")); printf(_(" -x, --xlog include required WAL files in backup (fetch mode)\n")); @@ -476,6 +479,91 @@ progress_report(int tablespacenum, const char *filename) } +static uint32 +parse_max_rate(char *src) +{ + int factor; + char *after_num; + int64 result; + + errno = 0; + result = strtol(src, &after_num, 0); + if (src == after_num) + { + fprintf(stderr, _("%s: transfer rate \"%s\" is not a valid integer value\n"), progname, src); + exit(1); + } + if (errno == ERANGE) + { + fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src); + exit(1); + } + + + /* + * Evaluate (optional) suffix. + * + * after_num should now be right behind the numeric value. + */ + factor = 1; + switch (*after_num) + { + /* + * Only the following suffixes are allowed. It's not too useful to + * restrict the rate to gigabytes: such a rate will probably bring + * significant impact on the master anyway, so the throttling + * won't help much. + */ + case 'M': + factor <<= 10; + case 'k': + factor <<= 10; + after_num++; + break; + + default: + + /* + * If there's no suffix, byte is the unit. Possible invalid letter + * will be detected later. + */ + break; + } + + /* The rest can only consist of white space. */ + while (*after_num != '\0') + { + if (!isspace(*after_num)) + { + fprintf(stderr, _("%s: valid units for the transfer rate are \"k\" and \"M\"\n"), progname); + exit(1); + } + after_num++; + } + + if (result <= 0) + { + /* + * Reject obviously wrong values here. Exact check of the range to be + * done on server. + */ + fprintf(stderr, _("%s: transfer rate must be greater than zero\n"), progname); + exit(1); + } + if (factor > 1) + { + result *= factor; + /* Check the integer range once again. */ + if (result != (uint64) ((uint32) result)) + { + fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src); + exit(1); + } + } + return (uint32) result; +} + + /* * Write a piece of tar data */ @@ -1310,6 +1398,7 @@ BaseBackup(void) uint32 starttli; char current_path[MAXPGPATH]; char escaped_label[MAXPGPATH]; + char maxrate_clause[MAXPGPATH]; int i; char xlogstart[64]; char xlogend[64]; @@ -1382,13 +1471,18 @@ BaseBackup(void) * Start the actual backup */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); + if (maxrate > 0) + snprintf(maxrate_clause, sizeof(maxrate_clause), "MAX_RATE %u", maxrate); + else + maxrate_clause[0] = '\0'; snprintf(current_path, sizeof(current_path), - "BASE_BACKUP LABEL '%s' %s %s %s %s", + "BASE_BACKUP LABEL '%s' %s %s %s %s %s", escaped_label, showprogress ? "PROGRESS" : "", includewal && !streamwal ? "WAL" : "", fastcheckpoint ? "FAST" : "", - includewal ? "NOWAIT" : ""); + includewal ? "NOWAIT" : "", + maxrate_clause); if (PQsendQuery(conn, current_path) == 0) { @@ -1657,6 +1751,7 @@ main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, + {"max-rate", required_argument, NULL, 'r'}, {"write-recovery-conf", no_argument, NULL, 'R'}, {"xlog", no_argument, NULL, 'x'}, {"xlog-method", required_argument, NULL, 'X'}, @@ -1697,7 +1792,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:F:RxX:l:zZ:d:c:h:p:U:s:wWvP", + while ((c = getopt_long(argc, argv, "D:F:r:RxX:l:zZ:d:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) @@ -1718,6 +1813,9 @@ main(int argc, char **argv) exit(1); } break; + case 'r': + maxrate = parse_max_rate(optarg); + break; case 'R': writerecoveryconf = true; break;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers