On 05.01.2011 15:54, Magnus Hagander wrote:
* Suggestion from Heikki: perhaps at some point we're going to need a full bison grammar for walsender commands.
Here's a patch for this (Also available at g...@github.com:hlinnaka/postgres.git, branch "streaming_base"). I thought I know our bison/flex magic pretty well by now, but it turned out to take much longer than I thought. But here it is.
I'm not 100% sure if this is worth the trouble quite yet. It adds quite a lot of boilerplate code.. OTOH, having a bison grammar file makes it easier to see what exactly the grammar is, and I like that. It's not too bad with three commands yet, but if it expands much further a bison grammar is a must.
At first I tried using the backend lexer for this, but it couldn't parse the xlog-start location in the "START_REPLICATION 0/47000000" command. In hindsight that may have been a badly chosen syntax. But as you pointed out on IM, the lexer needed to handle this limited set of commands is very small, so I wrote a dedicated flex lexer instead that can handle it.
-- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com
*** a/src/backend/replication/Makefile --- b/src/backend/replication/Makefile *************** *** 12,17 **** subdir = src/backend/replication top_builddir = ../../.. include $(top_builddir)/src/Makefile.global ! OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o include $(top_srcdir)/src/backend/common.mk --- 12,40 ---- top_builddir = ../../.. include $(top_builddir)/src/Makefile.global ! OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ ! repl_gram.o include $(top_srcdir)/src/backend/common.mk + + # repl_scanner is compiled as part of repl_gram + repl_gram.o: repl_scanner.c + + # See notes in src/backend/parser/Makefile about the following two rules + + repl_gram.c: repl_gram.y + ifdef BISON + $(BISON) -d $(BISONFLAGS) -o $@ $< + else + @$(missing) bison $< $@ + endif + + repl_scanner.c: repl_scanner.l + ifdef FLEX + $(FLEX) $(FLEXFLAGS) -o'$@' $< + else + @$(missing) flex $< $@ + endif + + # repl_gram.c and repl_scanner.c are in the distribution tarball, so + # they are not cleaned here. *** a/src/backend/replication/basebackup.c --- b/src/backend/replication/basebackup.c *************** *** 56,81 **** base_backup_cleanup(int code, Datum arg) * CopyOut format. */ void ! SendBaseBackup(const char *options) { DIR *dir; struct dirent *de; - char *backup_label = strchr(options, ';'); - bool progress = false; - - if (backup_label == NULL) - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid base backup options: %s", options))); - backup_label++; /* Walk past the semicolon */ - - /* Currently the only option string supported is PROGRESS */ - if (strncmp(options, "PROGRESS", 8) == 0) - progress = true; - else if (options[0] != ';') - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid base backup options: %s", options))); /* Make sure we can open the directory with tablespaces in it */ dir = AllocateDir("pg_tblspc"); --- 56,65 ---- * CopyOut format. */ void ! SendBaseBackup(const char *backup_label, bool progress) { DIR *dir; struct dirent *de; /* Make sure we can open the directory with tablespaces in it */ dir = AllocateDir("pg_tblspc"); *** /dev/null --- b/src/backend/replication/repl_gram.y *************** *** 0 **** --- 1,135 ---- + %{ + /*------------------------------------------------------------------------- + * + * repl_gram.y - Parser for the replication commands + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/replication/repl_gram.y + * + *------------------------------------------------------------------------- + */ + + #include "postgres.h" + + #include "nodes/nodes.h" + #include "nodes/replnodes.h" + #include "replication/walsender.h" + + /* Result of the parsing is returned here */ + Node *replication_parse_result; + + /* Location tracking support --- simpler than bison's default */ + #define YYLLOC_DEFAULT(Current, Rhs, N) \ + do { \ + if (N) \ + (Current) = (Rhs)[1]; \ + else \ + (Current) = (Rhs)[0]; \ + } while (0) + + /* + * Bison doesn't allocate anything that needs to live across parser calls, + * so we can easily have it use palloc instead of malloc. This prevents + * memory leaks if we error out during parsing. Note this only works with + * bison >= 2.0. However, in bison 1.875 the default is to use alloca() + * if possible, so there's not really much problem anyhow, at least if + * you're building with gcc. + */ + #define YYMALLOC palloc + #define YYFREE pfree + + #define parser_yyerror(msg) replication_yyerror(msg, yyscanner) + #define parser_errposition(pos) replication_scanner_errposition(pos) + + %} + + %expect 0 + %name-prefix="replication_yy" + + %union { + char *str; + bool boolval; + + XLogRecPtr recptr; + Node *node; + } + + /* Non-keyword tokens */ + %token <str> SCONST + %token <recptr> RECPTR + + /* Keyword tokens. */ + %token K_BASE_BACKUP + %token K_IDENTIFY_SYSTEM + %token K_PROGRESS + %token K_START_REPLICATION + + %type <node> command + %type <node> base_backup start_replication identify_system + %type <boolval> opt_progress + + %% + + firstcmd: command opt_semicolon + { + replication_parse_result = $1; + } + ; + + opt_semicolon: ';' + | /* EMPTY */ + ; + + command: + identify_system + | base_backup + | start_replication + ; + + /* + * IDENTIFY_SYSTEM + */ + identify_system: + K_IDENTIFY_SYSTEM + { + $$ = (Node *) makeNode(IdentifySystemCmd); + } + ; + + /* + * BASE_BACKUP <label> [PROGRESS] + */ + base_backup: + K_BASE_BACKUP SCONST opt_progress + { + BaseBackupCmd *cmd = (BaseBackupCmd *) makeNode(BaseBackupCmd); + + cmd->label = $2; + cmd->progress = $3; + + $$ = (Node *) cmd; + } + + opt_progress: K_PROGRESS { $$ = true; } + | /* EMPTY */ { $$ = false; } + + /* + * START_REPLICATION %X/%X + */ + start_replication: + K_START_REPLICATION RECPTR + { + StartReplicationCmd *cmd; + + cmd = makeNode(StartReplicationCmd); + cmd->startpoint = $2; + + $$ = (Node *) cmd; + } + %% + + #include "repl_scanner.c" *** /dev/null --- b/src/backend/replication/repl_scanner.l *************** *** 0 **** --- 1,164 ---- + %{ + /*------------------------------------------------------------------------- + * + * repl_scanner.l + * a lexical scanner for the replication commands + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/replication/repl_scanner.l + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */ + #undef fprintf + #define fprintf(file, fmt, msg) ereport(ERROR, (errmsg_internal("%s", msg))) + + /* Handle to the buffer that the lexer uses internally */ + static YY_BUFFER_STATE scanbufhandle; + + static StringInfoData litbuf; + + static void startlit(void); + static char *litbufdup(void); + static void addlit(char *ytext, int yleng); + static void addlitchar(unsigned char ychar); + + %} + + %option 8bit + %option never-interactive + %option nodefault + %option noinput + %option nounput + %option noyywrap + %option warn + %option prefix="replication_yy" + + %x xq + + /* Extended quote + * xqdouble implements embedded quote, '''' + */ + xqstart {quote} + xqdouble {quote}{quote} + xqinside [^']+ + + hexdigit [0-9A-Za-z]+ + + quote ' + quotestop {quote} + + %% + + BASE_BACKUP { return K_BASE_BACKUP; } + IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; } + PROGRESS { return K_PROGRESS; } + START_REPLICATION { return K_START_REPLICATION; } + "," { return ','; } + ";" { return ';'; } + + [\n] ; + [\t] ; + " " ; + + {hexdigit}+\/{hexdigit}+ { + yylval.str = pstrdup(yytext); + if (sscanf(yytext, "%X/%X", &yylval.recptr.xlogid, &yylval.recptr.xrecoff) != 2) + yyerror("invalid streaming start location"); + return RECPTR; + } + + {xqstart} { + BEGIN(xq); + startlit(); + } + <xq>{quotestop} { + yyless(1); + BEGIN(INITIAL); + yylval.str = litbufdup(); + return SCONST; + } + <xq>{xqdouble} { + addlitchar('\''); + } + <xq>{xqinside} { + addlit(yytext, yyleng); + } + + <xq><<EOF>> { yyerror("unterminated quoted string"); } + + + <<EOF>> { + yyterminate(); + } + + . { + elog(ERROR, "syntax error: unexpected character \"%s\"", yytext); + } + %% + + + static void + startlit(void) + { + initStringInfo(&litbuf); + } + + static char * + litbufdup(void) + { + return litbuf.data; + } + + static void + addlit(char *ytext, int yleng) + { + appendBinaryStringInfo(&litbuf, ytext, yleng); + } + + static void + addlitchar(unsigned char ychar) + { + appendStringInfoChar(&litbuf, ychar); + } + + void + yyerror(const char *message) + { + elog(ERROR, "%s", message); + } + + + void + replication_scanner_init(const char *str) + { + Size slen = strlen(str); + char *scanbuf; + + /* + * Might be left over after ereport() + */ + if (YY_CURRENT_BUFFER) + yy_delete_buffer(YY_CURRENT_BUFFER); + + /* + * Make a scan buffer with special termination needed by flex. + */ + scanbuf = (char *) palloc(slen + 2); + memcpy(scanbuf, str, slen); + scanbuf[slen] = scanbuf[slen + 1] = YY_END_OF_BUFFER_CHAR; + scanbufhandle = yy_scan_buffer(scanbuf, slen + 2); + } + + void + replication_scanner_finish() + { + yy_delete_buffer(scanbufhandle); + scanbufhandle = NULL; + } *** a/src/backend/replication/walsender.c --- b/src/backend/replication/walsender.c *************** *** 43,48 **** --- 43,49 ---- #include "libpq/pqformat.h" #include "libpq/pqsignal.h" #include "miscadmin.h" + #include "nodes/replnodes.h" #include "replication/basebackup.h" #include "replication/walprotocol.h" #include "replication/walsender.h" *************** *** 97,102 **** static void WalSndXLogSendHandler(SIGNAL_ARGS); --- 98,104 ---- static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ + static bool HandleReplicationCommand(const char *cmd_string); static int WalSndLoop(void); static void InitWalSnd(void); static void WalSndHandshake(void); *************** *** 213,330 **** WalSndHandshake(void) case 'Q': /* Query message */ { const char *query_string; - XLogRecPtr recptr; query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); ! if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) ! { ! StringInfoData buf; ! char sysid[32]; ! char tli[11]; ! ! /* ! * Reply with a result set with one row, two columns. ! * First col is system ID, and second is timeline ID ! */ ! ! snprintf(sysid, sizeof(sysid), UINT64_FORMAT, ! GetSystemIdentifier()); ! snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); ! ! /* Send a RowDescription message */ ! pq_beginmessage(&buf, 'T'); ! pq_sendint(&buf, 2, 2); /* 2 fields */ ! ! /* first field */ ! pq_sendstring(&buf, "systemid"); /* col name */ ! pq_sendint(&buf, 0, 4); /* table oid */ ! pq_sendint(&buf, 0, 2); /* attnum */ ! pq_sendint(&buf, TEXTOID, 4); /* type oid */ ! pq_sendint(&buf, -1, 2); /* typlen */ ! pq_sendint(&buf, 0, 4); /* typmod */ ! pq_sendint(&buf, 0, 2); /* format code */ ! ! /* second field */ ! pq_sendstring(&buf, "timeline"); /* col name */ ! pq_sendint(&buf, 0, 4); /* table oid */ ! pq_sendint(&buf, 0, 2); /* attnum */ ! pq_sendint(&buf, INT4OID, 4); /* type oid */ ! pq_sendint(&buf, 4, 2); /* typlen */ ! pq_sendint(&buf, 0, 4); /* typmod */ ! pq_sendint(&buf, 0, 2); /* format code */ ! pq_endmessage(&buf); ! ! /* Send a DataRow message */ ! pq_beginmessage(&buf, 'D'); ! pq_sendint(&buf, 2, 2); /* # of columns */ ! pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ ! pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); ! pq_sendint(&buf, strlen(tli), 4); /* col2 len */ ! pq_sendbytes(&buf, (char *) tli, strlen(tli)); ! pq_endmessage(&buf); ! ! /* Send CommandComplete and ReadyForQuery messages */ ! EndCommand("SELECT", DestRemote); ! ReadyForQuery(DestRemote); ! /* ReadyForQuery did pq_flush for us */ ! } ! else if (sscanf(query_string, "START_REPLICATION %X/%X", ! &recptr.xlogid, &recptr.xrecoff) == 2) ! { ! StringInfoData buf; ! ! /* ! * Check that we're logging enough information in the ! * WAL for log-shipping. ! * ! * NOTE: This only checks the current value of ! * wal_level. Even if the current setting is not ! * 'minimal', there can be old WAL in the pg_xlog ! * directory that was created with 'minimal'. So this ! * is not bulletproof, the purpose is just to give a ! * user-friendly error message that hints how to ! * configure the system correctly. ! */ ! if (wal_level == WAL_LEVEL_MINIMAL) ! ereport(FATAL, ! (errcode(ERRCODE_CANNOT_CONNECT_NOW), ! errmsg("standby connections not allowed because wal_level=minimal"))); ! ! /* Send a CopyBothResponse message, and start streaming */ ! pq_beginmessage(&buf, 'W'); ! pq_sendbyte(&buf, 0); ! pq_sendint(&buf, 0, 2); ! pq_endmessage(&buf); ! pq_flush(); ! ! /* ! * Initialize position to the received one, then the ! * xlog records begin to be shipped from that position ! */ ! sentPtr = recptr; ! ! /* break out of the loop */ replication_started = true; - } - else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0) - { - /* Command is BASE_BACKUP <options>;<label> */ - SendBaseBackup(query_string + strlen("BASE_BACKUP ")); - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ - } - else - { - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby query string: %s", query_string))); - } - break; } case 'X': /* standby is closing the connection */ --- 215,228 ---- case 'Q': /* Query message */ { const char *query_string; query_string = pq_getmsgstring(&input_message); pq_getmsgend(&input_message); ! if (HandleReplicationCommand(query_string)) replication_started = true; } + break; case 'X': /* standby is closing the connection */ *************** *** 346,351 **** WalSndHandshake(void) --- 244,399 ---- } /* + * Execute an incoming replication command. + */ + static bool + HandleReplicationCommand(const char *cmd_string) + { + bool replication_started = false; + int parse_rc; + Node *cmd_node; + MemoryContext cmd_context; + MemoryContext old_context; + + elog(DEBUG1, "received replication command: %s", cmd_string); + + cmd_context = AllocSetContextCreate(CurrentMemoryContext, + "Replication command context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + old_context = MemoryContextSwitchTo(cmd_context); + + replication_scanner_init(cmd_string); + parse_rc = replication_yyparse(); + if (parse_rc != 0) + elog(ERROR, "replication command parser returned %d", parse_rc); + + cmd_node = replication_parse_result; + + switch(cmd_node->type) + { + case T_IdentifySystemCmd: + { + StringInfoData buf; + char sysid[32]; + char tli[11]; + + /* + * Reply with a result set with one row, two columns. + * First col is system ID, and second is timeline ID + */ + + snprintf(sysid, sizeof(sysid), UINT64_FORMAT, + GetSystemIdentifier()); + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "systemid"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "timeline"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, INT4OID, 4); /* type oid */ + pq_sendint(&buf, 4, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ + pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); + pq_sendint(&buf, strlen(tli), 4); /* col2 len */ + pq_sendbytes(&buf, (char *) tli, strlen(tli)); + pq_endmessage(&buf); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ + + break; + } + + case T_StartReplicationCmd: + { + StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; + StringInfoData buf; + + /* + * Check that we're logging enough information in the + * WAL for log-shipping. + * + * NOTE: This only checks the current value of + * wal_level. Even if the current setting is not + * 'minimal', there can be old WAL in the pg_xlog + * directory that was created with 'minimal'. So this + * is not bulletproof, the purpose is just to give a + * user-friendly error message that hints how to + * configure the system correctly. + */ + if (wal_level == WAL_LEVEL_MINIMAL) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("standby connections not allowed because wal_level=minimal"))); + + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + pq_flush(); + + /* + * Initialize position to the received one, then the + * xlog records begin to be shipped from that position + */ + sentPtr = cmd->startpoint; + + /* break out of the loop */ + replication_started = true; + break; + } + + case T_BaseBackupCmd: + { + BaseBackupCmd *cmd = (BaseBackupCmd *) cmd_node; + /* Command is BASE_BACKUP <options>;<label> */ + SendBaseBackup(cmd->label, cmd->progress); + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + /* ReadyForQuery did pq_flush for us */ + break; + } + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby query string: %s", cmd_string))); + } + + /* done */ + MemoryContextSwitchTo(old_context); + MemoryContextDelete(cmd_context); + + return replication_started; + } + + /* * Check if the remote end has closed the connection. */ static void *** a/src/include/nodes/nodes.h --- b/src/include/nodes/nodes.h *************** *** 355,360 **** typedef enum NodeTag --- 355,368 ---- T_SecLabelStmt, T_CreateForeignTableStmt, + + /* + * TAGS FOR REPLICATION COMMAND NODES (replnodes.h) + */ + T_IdentifySystemCmd = 850, + T_BaseBackupCmd, + T_StartReplicationCmd, + /* * TAGS FOR PARSE TREE NODES (parsenodes.h) */ *** a/src/include/replication/basebackup.h --- b/src/include/replication/basebackup.h *************** *** 12,17 **** #ifndef _BASEBACKUP_H #define _BASEBACKUP_H ! extern void SendBaseBackup(const char *options); #endif /* _BASEBACKUP_H */ --- 12,17 ---- #ifndef _BASEBACKUP_H #define _BASEBACKUP_H ! extern void SendBaseBackup(const char *backup_label, bool progress); #endif /* _BASEBACKUP_H */ *** a/src/include/replication/walsender.h --- b/src/include/replication/walsender.h *************** *** 13,18 **** --- 13,19 ---- #define _WALSENDER_H #include "access/xlog.h" + #include "nodes/nodes.h" #include "storage/latch.h" #include "storage/spin.h" *************** *** 54,57 **** extern Size WalSndShmemSize(void); --- 55,70 ---- extern void WalSndShmemInit(void); extern void WalSndWakeup(void); + /* + * Internal functions for parsing the replication grammar, in repl_gram.y and + * repl_scanner.l + */ + extern int replication_yyparse(void); + extern int replication_yylex(void); + extern void replication_yyerror(const char *str); + extern void replication_scanner_init(const char *query_string); + extern void replication_scanner_finish(void); + + extern Node *replication_parse_result; + #endif /* _WALSENDER_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers