Hello, as the discuttion on async fetching on postgres_fdw, FETCH with data-size limitation would be useful to get memory usage stability of postgres_fdw.
Is such a feature and syntax could be allowed to be added? == Postgres_fdw fetches tuples from remote servers using cursor. The transfer gets faster as the number of fetch decreases. On the other hand buffer size for the fetched tuples widely varies according to their average length. 100 tuples per fetch is quite small for short tuples but larger fetch size will easily cause memory exhaustion. However, there's no way to know it in advance. One means to settle the contradiction would be a FETCH which sends result limiting by size, not the number of tuples. So I'd like to propose this. This patch is a POC for the feature. For exapmle, FETCH 10000 LIMIT 1000000 FROM c1; This FETCH retrieves up to 10000 tuples but cut out just after the total tuple length exceeds 1MB. (It does not literally "LIMIT" in that sense) The syntax added by this patch is described as following. FETCH [FORWARD|BACKWARD] <ALL|SignedIconst> LIMIT Iconst [FROM|IN] curname The "data size" to be compared with the LIMIT size is the summation of the result of the following expression. The appropriateness of it should be arguable. [if tupleslot has tts_tuple] HEAPTUPLESIZE + slot->tts_tuple->t_len [else] HEAPTUPLESIZE + heap_compute_data_size(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull); ======================== This patch does following changes, - This patch adds the parameter "size" to following functions (standard_)ExecutorRun / ExecutePlan / RunFromStore PortalRun / PortalRunSelect / PortalRunFetch / DoPortalRunFetch - The core is in StandardExecutorRun and RunFromStore. Simplly sum up the sent tuple length and compare against the given limit. - struct FetchStmt and EState has new member. - The modifications in gram.y affects on ecpg parser. I think I could fix them but with no confidence :( - Modified the corespondence parts of the changes above in auto_explain and pg_stat_statments only in parameter list. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
>From 6f1dd6998ba312c3552f137365e3a3118b7935be Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horiguchi.kyot...@lab.ntt.co.jp> Date: Wed, 21 Jan 2015 17:18:09 +0900 Subject: [PATCH] Size limitation feature of FETCH v0 --- contrib/auto_explain/auto_explain.c | 8 +-- contrib/pg_stat_statements/pg_stat_statements.c | 8 +-- src/backend/commands/copy.c | 2 +- src/backend/commands/createas.c | 2 +- src/backend/commands/explain.c | 2 +- src/backend/commands/extension.c | 2 +- src/backend/commands/matview.c | 2 +- src/backend/commands/portalcmds.c | 3 +- src/backend/commands/prepare.c | 2 +- src/backend/executor/execMain.c | 35 +++++++-- src/backend/executor/execUtils.c | 1 + src/backend/executor/functions.c | 2 +- src/backend/executor/spi.c | 4 +- src/backend/parser/gram.y | 59 +++++++++++++++ src/backend/tcop/postgres.c | 2 + src/backend/tcop/pquery.c | 95 +++++++++++++++++-------- src/include/executor/executor.h | 8 +-- src/include/nodes/execnodes.h | 1 + src/include/nodes/parsenodes.h | 1 + src/include/tcop/pquery.h | 3 +- src/interfaces/ecpg/preproc/Makefile | 2 +- src/interfaces/ecpg/preproc/ecpg.addons | 63 ++++++++++++++++ 22 files changed, 248 insertions(+), 59 deletions(-) diff --git a/contrib/auto_explain/auto_explain.c b/contrib/auto_explain/auto_explain.c index 2a184ed..f121a33 100644 --- a/contrib/auto_explain/auto_explain.c +++ b/contrib/auto_explain/auto_explain.c @@ -57,7 +57,7 @@ void _PG_fini(void); static void explain_ExecutorStart(QueryDesc *queryDesc, int eflags); static void explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, - long count); + long count, long size); static void explain_ExecutorFinish(QueryDesc *queryDesc); static void explain_ExecutorEnd(QueryDesc *queryDesc); @@ -232,15 +232,15 @@ explain_ExecutorStart(QueryDesc *queryDesc, int eflags) * ExecutorRun hook: all we need do is track nesting depth */ static void -explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) +explain_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size) { nesting_level++; PG_TRY(); { if (prev_ExecutorRun) - prev_ExecutorRun(queryDesc, direction, count); + prev_ExecutorRun(queryDesc, direction, count, size); else - standard_ExecutorRun(queryDesc, direction, count); + standard_ExecutorRun(queryDesc, direction, count, size); nesting_level--; } PG_CATCH(); diff --git a/contrib/pg_stat_statements/pg_stat_statements.c b/contrib/pg_stat_statements/pg_stat_statements.c index 2629bfc..a68c11d 100644 --- a/contrib/pg_stat_statements/pg_stat_statements.c +++ b/contrib/pg_stat_statements/pg_stat_statements.c @@ -282,7 +282,7 @@ static void pgss_post_parse_analyze(ParseState *pstate, Query *query); static void pgss_ExecutorStart(QueryDesc *queryDesc, int eflags); static void pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, - long count); + long count, long size); static void pgss_ExecutorFinish(QueryDesc *queryDesc); static void pgss_ExecutorEnd(QueryDesc *queryDesc); static void pgss_ProcessUtility(Node *parsetree, const char *queryString, @@ -863,15 +863,15 @@ pgss_ExecutorStart(QueryDesc *queryDesc, int eflags) * ExecutorRun hook: all we need do is track nesting depth */ static void -pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count) +pgss_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count, long size) { nested_level++; PG_TRY(); { if (prev_ExecutorRun) - prev_ExecutorRun(queryDesc, direction, count); + prev_ExecutorRun(queryDesc, direction, count, size); else - standard_ExecutorRun(queryDesc, direction, count); + standard_ExecutorRun(queryDesc, direction, count, size); nested_level--; } PG_CATCH(); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 0e604b7..b6e6523 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -1915,7 +1915,7 @@ CopyTo(CopyState cstate) else { /* run the plan --- the dest receiver will send tuples */ - ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L); + ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, 0L); processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index abc0fe8..c5c4478 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -192,7 +192,7 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString, dir = ForwardScanDirection; /* run the plan */ - ExecutorRun(queryDesc, dir, 0L); + ExecutorRun(queryDesc, dir, 0L, 0L); /* save the rowcount if we're given a completionTag to fill */ if (completionTag) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 7cfc9bb..2c23e9b 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -489,7 +489,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, dir = ForwardScanDirection; /* run the plan */ - ExecutorRun(queryDesc, dir, 0L); + ExecutorRun(queryDesc, dir, 0L, 0L); /* run cleanup too */ ExecutorFinish(queryDesc); diff --git a/src/backend/commands/extension.c b/src/backend/commands/extension.c index 3b95552..f624567 100644 --- a/src/backend/commands/extension.c +++ b/src/backend/commands/extension.c @@ -736,7 +736,7 @@ execute_sql_string(const char *sql, const char *filename) dest, NULL, 0); ExecutorStart(qdesc, 0); - ExecutorRun(qdesc, ForwardScanDirection, 0); + ExecutorRun(qdesc, ForwardScanDirection, 0, 0); ExecutorFinish(qdesc); ExecutorEnd(qdesc); diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 74415b8..6530ecb 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -360,7 +360,7 @@ refresh_matview_datafill(DestReceiver *dest, Query *query, ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS); /* run the plan */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L); + ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L); /* and clean up */ ExecutorFinish(queryDesc); diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c index 2794537..255c86e 100644 --- a/src/backend/commands/portalcmds.c +++ b/src/backend/commands/portalcmds.c @@ -177,6 +177,7 @@ PerformPortalFetch(FetchStmt *stmt, nprocessed = PortalRunFetch(portal, stmt->direction, stmt->howMany, + stmt->howLarge, dest); /* Return command status if wanted */ @@ -375,7 +376,7 @@ PersistHoldablePortal(Portal portal) true); /* Fetch the result set into the tuplestore */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L); + ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L); (*queryDesc->dest->rDestroy) (queryDesc->dest); queryDesc->dest = NULL; diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 71b08f0..31799f5 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -291,7 +291,7 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause, */ PortalStart(portal, paramLI, eflags, GetActiveSnapshot()); - (void) PortalRun(portal, count, false, dest, dest, completionTag); + (void) PortalRun(portal, count, 0L, false, dest, dest, completionTag); PortalDrop(portal, false); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b9f21c5..9cecc1d 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -78,6 +78,7 @@ static void ExecutePlan(EState *estate, PlanState *planstate, CmdType operation, bool sendTuples, long numberTuples, + long sizeTuples, ScanDirection direction, DestReceiver *dest); static bool ExecCheckRTEPerms(RangeTblEntry *rte); @@ -248,17 +249,17 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) */ void ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count) + ScanDirection direction, long count, long size) { if (ExecutorRun_hook) - (*ExecutorRun_hook) (queryDesc, direction, count); + (*ExecutorRun_hook) (queryDesc, direction, count, size); else - standard_ExecutorRun(queryDesc, direction, count); + standard_ExecutorRun(queryDesc, direction, count, size); } void standard_ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count) + ScanDirection direction, long count, long size) { EState *estate; CmdType operation; @@ -310,6 +311,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, operation, sendTuples, count, + size, direction, dest); @@ -1450,22 +1452,26 @@ ExecutePlan(EState *estate, CmdType operation, bool sendTuples, long numberTuples, + long sizeTuples, ScanDirection direction, DestReceiver *dest) { TupleTableSlot *slot; long current_tuple_count; + long sent_size; /* * initialize local variables */ current_tuple_count = 0; - + sent_size = 0; /* * Set the direction. */ estate->es_direction = direction; + estate->es_stoppedbysize = false; + /* * Loop until we've processed the proper number of tuples from the plan. */ @@ -1520,6 +1526,25 @@ ExecutePlan(EState *estate, current_tuple_count++; if (numberTuples && numberTuples == current_tuple_count) break; + + /* Count the size of tuples we've sent */ + if (slot->tts_tuple) + sent_size += HEAPTUPLESIZE + slot->tts_tuple->t_len; + else + { + sent_size += HEAPTUPLESIZE + + heap_compute_data_size(slot->tts_tupleDescriptor, + slot->tts_values, + slot->tts_isnull); + } + + /* Quit when the size limit will be exceeded by this tuple */ + if (sizeTuples > 0 && sizeTuples < sent_size) + { + estate->es_stoppedbysize = true; + break; + } + } } diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 32697dd..ff2c395 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -133,6 +133,7 @@ CreateExecutorState(void) estate->es_rowMarks = NIL; estate->es_processed = 0; + estate->es_stoppedbysize = false; estate->es_lastoid = InvalidOid; estate->es_top_eflags = 0; diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 84be37c..d64e908 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -850,7 +850,7 @@ postquel_getnext(execution_state *es, SQLFunctionCachePtr fcache) /* Run regular commands to completion unless lazyEval */ long count = (es->lazyEval) ? 1L : 0L; - ExecutorRun(es->qd, ForwardScanDirection, count); + ExecutorRun(es->qd, ForwardScanDirection, count, 0L); /* * If we requested run to completion OR there was no tuple returned, diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 4b86e91..cb30cfb 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -2369,7 +2369,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, long tcount) ExecutorStart(queryDesc, eflags); - ExecutorRun(queryDesc, ForwardScanDirection, tcount); + ExecutorRun(queryDesc, ForwardScanDirection, tcount, 0L); _SPI_current->processed = queryDesc->estate->es_processed; _SPI_current->lastoid = queryDesc->estate->es_lastoid; @@ -2447,7 +2447,7 @@ _SPI_cursor_operation(Portal portal, FetchDirection direction, long count, /* Run the cursor */ nfetched = PortalRunFetch(portal, direction, - count, + count, 0L, dest); /* diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 36dac29..3a139ed 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -520,6 +520,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <str> opt_existing_window_name %type <boolean> opt_if_not_exists +%type <ival> opt_fetch_limit + /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -6021,6 +6023,15 @@ fetch_args: cursor_name n->howMany = $1; $$ = (Node *)n; } + | SignedIconst LIMIT Iconst opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $5; + n->direction = FETCH_FORWARD; + n->howMany = $1; + n->howLarge = $3; + $$ = (Node *)n; + } | ALL opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6029,6 +6040,15 @@ fetch_args: cursor_name n->howMany = FETCH_ALL; $$ = (Node *)n; } + | ALL LIMIT Iconst opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $5; + n->direction = FETCH_FORWARD; + n->howMany = FETCH_ALL; + n->howLarge = $3; + $$ = (Node *)n; + } | FORWARD opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6045,6 +6065,15 @@ fetch_args: cursor_name n->howMany = $2; $$ = (Node *)n; } + | FORWARD SignedIconst LIMIT Iconst opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $6; + n->direction = FETCH_FORWARD; + n->howMany = $2; + n->howLarge = $4; + $$ = (Node *)n; + } | FORWARD ALL opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6053,6 +6082,15 @@ fetch_args: cursor_name n->howMany = FETCH_ALL; $$ = (Node *)n; } + | FORWARD ALL LIMIT Iconst opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $6; + n->direction = FETCH_FORWARD; + n->howMany = FETCH_ALL; + n->howLarge = $4; + $$ = (Node *)n; + } | BACKWARD opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6069,6 +6107,15 @@ fetch_args: cursor_name n->howMany = $2; $$ = (Node *)n; } + | BACKWARD SignedIconst LIMIT Iconst opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $6; + n->direction = FETCH_BACKWARD; + n->howMany = $2; + n->howLarge = $4; + $$ = (Node *)n; + } | BACKWARD ALL opt_from_in cursor_name { FetchStmt *n = makeNode(FetchStmt); @@ -6077,6 +6124,15 @@ fetch_args: cursor_name n->howMany = FETCH_ALL; $$ = (Node *)n; } + | BACKWARD ALL LIMIT Iconst opt_from_in cursor_name + { + FetchStmt *n = makeNode(FetchStmt); + n->portalname = $6; + n->direction = FETCH_BACKWARD; + n->howMany = FETCH_ALL; + n->howLarge = $4; + $$ = (Node *)n; + } ; from_in: FROM {} @@ -6087,6 +6143,9 @@ opt_from_in: from_in {} | /* EMPTY */ {} ; +opt_fetch_limit: LIMIT Iconst { $$ = $2;} + | /* EMPTY */ { $$ = 0; } + ; /***************************************************************************** * diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 8f74353..55f062b 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -1043,6 +1043,7 @@ exec_simple_query(const char *query_string) */ (void) PortalRun(portal, FETCH_ALL, + 0, isTopLevel, receiver, receiver, @@ -1928,6 +1929,7 @@ exec_execute_message(const char *portal_name, long max_rows) completed = PortalRun(portal, max_rows, + 0, true, /* always top level */ receiver, receiver, diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 9c14e8a..5f68cc7 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -16,6 +16,7 @@ #include "postgres.h" #include "access/xact.h" +#include "access/htup_details.h" #include "commands/prepare.h" #include "executor/tstoreReceiver.h" #include "miscadmin.h" @@ -39,9 +40,10 @@ static void ProcessQuery(PlannedStmt *plan, DestReceiver *dest, char *completionTag); static void FillPortalStore(Portal portal, bool isTopLevel); -static uint32 RunFromStore(Portal portal, ScanDirection direction, long count, +static uint32 RunFromStore(Portal portal, ScanDirection direction, + long count, long size, bool *stoppedbysize, DestReceiver *dest); -static long PortalRunSelect(Portal portal, bool forward, long count, +static long PortalRunSelect(Portal portal, bool forward, long count, long size, DestReceiver *dest); static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel, DestReceiver *dest, char *completionTag); @@ -51,6 +53,7 @@ static void PortalRunMulti(Portal portal, bool isTopLevel, static long DoPortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, DestReceiver *dest); static void DoPortalRewind(Portal portal); @@ -182,7 +185,7 @@ ProcessQuery(PlannedStmt *plan, /* * Run the plan to completion. */ - ExecutorRun(queryDesc, ForwardScanDirection, 0L); + ExecutorRun(queryDesc, ForwardScanDirection, 0L, 0L); /* * Build command completion status string, if caller wants one. @@ -703,7 +706,7 @@ PortalSetResultFormat(Portal portal, int nFormats, int16 *formats) * suspended due to exhaustion of the count parameter. */ bool -PortalRun(Portal portal, long count, bool isTopLevel, +PortalRun(Portal portal, long count, long size, bool isTopLevel, DestReceiver *dest, DestReceiver *altdest, char *completionTag) { @@ -787,7 +790,7 @@ PortalRun(Portal portal, long count, bool isTopLevel, /* * Now fetch desired portion of results. */ - nprocessed = PortalRunSelect(portal, true, count, dest); + nprocessed = PortalRunSelect(portal, true, count, size, dest); /* * If the portal result contains a command tag and the caller @@ -892,11 +895,13 @@ static long PortalRunSelect(Portal portal, bool forward, long count, + long size, DestReceiver *dest) { QueryDesc *queryDesc; ScanDirection direction; uint32 nprocessed; + bool stoppedbysize; /* * NB: queryDesc will be NULL if we are fetching from a held cursor or a @@ -939,12 +944,14 @@ PortalRunSelect(Portal portal, count = 0; if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, count, dest); + nprocessed = RunFromStore(portal, direction, count, + size, &stoppedbysize, dest); else { PushActiveSnapshot(queryDesc->snapshot); - ExecutorRun(queryDesc, direction, count); + ExecutorRun(queryDesc, direction, count, size); nprocessed = queryDesc->estate->es_processed; + stoppedbysize = queryDesc->estate->es_stoppedbysize; PopActiveSnapshot(); } @@ -954,8 +961,9 @@ PortalRunSelect(Portal portal, if (nprocessed > 0) portal->atStart = false; /* OK to go backward now */ - if (count == 0 || - (unsigned long) nprocessed < (unsigned long) count) + if ((count == 0 || + (unsigned long) nprocessed < (unsigned long) count) && + !stoppedbysize) portal->atEnd = true; /* we retrieved 'em all */ oldPos = portal->portalPos; portal->portalPos += nprocessed; @@ -982,12 +990,14 @@ PortalRunSelect(Portal portal, count = 0; if (portal->holdStore) - nprocessed = RunFromStore(portal, direction, count, dest); + nprocessed = RunFromStore(portal, direction, count, + size, &stoppedbysize, dest); else { PushActiveSnapshot(queryDesc->snapshot); - ExecutorRun(queryDesc, direction, count); + ExecutorRun(queryDesc, direction, count, size); nprocessed = queryDesc->estate->es_processed; + stoppedbysize = queryDesc->estate->es_stoppedbysize; PopActiveSnapshot(); } @@ -998,8 +1008,9 @@ PortalRunSelect(Portal portal, portal->atEnd = false; /* OK to go forward now */ portal->portalPos++; /* adjust for endpoint case */ } - if (count == 0 || - (unsigned long) nprocessed < (unsigned long) count) + if ((count == 0 || + (unsigned long) nprocessed < (unsigned long) count) && + !stoppedbysize) { portal->atStart = true; /* we retrieved 'em all */ portal->portalPos = 0; @@ -1089,10 +1100,13 @@ FillPortalStore(Portal portal, bool isTopLevel) */ static uint32 RunFromStore(Portal portal, ScanDirection direction, long count, - DestReceiver *dest) + long size_limit, bool *stoppedbysize, DestReceiver *dest) { long current_tuple_count = 0; TupleTableSlot *slot; + long sent_size = 0; + + *stoppedbysize = false; slot = MakeSingleTupleTableSlot(portal->tupDesc); @@ -1133,6 +1147,25 @@ RunFromStore(Portal portal, ScanDirection direction, long count, current_tuple_count++; if (count && count == current_tuple_count) break; + + /* Count the size of tuples we've sent */ + if (slot->tts_tuple) + sent_size += HEAPTUPLESIZE + slot->tts_tuple->t_len; + else + { + sent_size += HEAPTUPLESIZE + + heap_compute_data_size(slot->tts_tupleDescriptor, + slot->tts_values, + slot->tts_isnull); + } + + /* Quit when the size limit will be exceeded by this tuple */ + if (current_tuple_count > 0 && + size_limit > 0 && size_limit < sent_size) + { + *stoppedbysize = true; + break; + } } } @@ -1385,6 +1418,7 @@ long PortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, DestReceiver *dest) { long result; @@ -1422,7 +1456,7 @@ PortalRunFetch(Portal portal, switch (portal->strategy) { case PORTAL_ONE_SELECT: - result = DoPortalRunFetch(portal, fdirection, count, dest); + result = DoPortalRunFetch(portal, fdirection, count, size, dest); break; case PORTAL_ONE_RETURNING: @@ -1439,7 +1473,7 @@ PortalRunFetch(Portal portal, /* * Now fetch desired portion of results. */ - result = DoPortalRunFetch(portal, fdirection, count, dest); + result = DoPortalRunFetch(portal, fdirection, count, size, dest); break; default: @@ -1484,6 +1518,7 @@ static long DoPortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, DestReceiver *dest) { bool forward; @@ -1526,7 +1561,7 @@ DoPortalRunFetch(Portal portal, { DoPortalRewind(portal); if (count > 1) - PortalRunSelect(portal, true, count - 1, + PortalRunSelect(portal, true, count - 1, 0L, None_Receiver); } else @@ -1536,13 +1571,13 @@ DoPortalRunFetch(Portal portal, if (portal->atEnd) pos++; /* need one extra fetch if off end */ if (count <= pos) - PortalRunSelect(portal, false, pos - count + 1, + PortalRunSelect(portal, false, pos - count + 1, 0L, None_Receiver); else if (count > pos + 1) - PortalRunSelect(portal, true, count - pos - 1, + PortalRunSelect(portal, true, count - pos - 1, 0L, None_Receiver); } - return PortalRunSelect(portal, true, 1L, dest); + return PortalRunSelect(portal, true, 1L, 0L, dest); } else if (count < 0) { @@ -1553,17 +1588,17 @@ DoPortalRunFetch(Portal portal, * (Is it worth considering case where count > half of size of * query? We could rewind once we know the size ...) */ - PortalRunSelect(portal, true, FETCH_ALL, None_Receiver); + PortalRunSelect(portal, true, FETCH_ALL, 0L, None_Receiver); if (count < -1) - PortalRunSelect(portal, false, -count - 1, None_Receiver); - return PortalRunSelect(portal, false, 1L, dest); + PortalRunSelect(portal, false, -count - 1, 0, None_Receiver); + return PortalRunSelect(portal, false, 1L, 0L, dest); } else { /* count == 0 */ /* Rewind to start, return zero rows */ DoPortalRewind(portal); - return PortalRunSelect(portal, true, 0L, dest); + return PortalRunSelect(portal, true, 0L, 0L, dest); } break; case FETCH_RELATIVE: @@ -1573,8 +1608,8 @@ DoPortalRunFetch(Portal portal, * Definition: advance count-1 rows, return next row (if any). */ if (count > 1) - PortalRunSelect(portal, true, count - 1, None_Receiver); - return PortalRunSelect(portal, true, 1L, dest); + PortalRunSelect(portal, true, count - 1, 0L, None_Receiver); + return PortalRunSelect(portal, true, 1L, 0L, dest); } else if (count < 0) { @@ -1583,8 +1618,8 @@ DoPortalRunFetch(Portal portal, * any). */ if (count < -1) - PortalRunSelect(portal, false, -count - 1, None_Receiver); - return PortalRunSelect(portal, false, 1L, dest); + PortalRunSelect(portal, false, -count - 1, 0L, None_Receiver); + return PortalRunSelect(portal, false, 1L, 0L, dest); } else { @@ -1630,7 +1665,7 @@ DoPortalRunFetch(Portal portal, */ if (on_row) { - PortalRunSelect(portal, false, 1L, None_Receiver); + PortalRunSelect(portal, false, 1L, 0L, None_Receiver); /* Set up to fetch one row forward */ count = 1; forward = true; @@ -1652,7 +1687,7 @@ DoPortalRunFetch(Portal portal, return result; } - return PortalRunSelect(portal, forward, count, dest); + return PortalRunSelect(portal, forward, count, size, dest); } /* diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 40fde83..64a02c3 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -80,8 +80,8 @@ extern PGDLLIMPORT ExecutorStart_hook_type ExecutorStart_hook; /* Hook for plugins to get control in ExecutorRun() */ typedef void (*ExecutorRun_hook_type) (QueryDesc *queryDesc, - ScanDirection direction, - long count); + ScanDirection direction, + long count, long size); extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook; /* Hook for plugins to get control in ExecutorFinish() */ @@ -176,9 +176,9 @@ extern TupleTableSlot *ExecFilterJunk(JunkFilter *junkfilter, extern void ExecutorStart(QueryDesc *queryDesc, int eflags); extern void standard_ExecutorStart(QueryDesc *queryDesc, int eflags); extern void ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count); + ScanDirection direction, long count, long size); extern void standard_ExecutorRun(QueryDesc *queryDesc, - ScanDirection direction, long count); + ScanDirection direction, long count, long size); extern void ExecutorFinish(QueryDesc *queryDesc); extern void standard_ExecutorFinish(QueryDesc *queryDesc); extern void ExecutorEnd(QueryDesc *queryDesc); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 41288ed..d963286 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -376,6 +376,7 @@ typedef struct EState List *es_rowMarks; /* List of ExecRowMarks */ uint32 es_processed; /* # of tuples processed */ + bool es_stoppedbysize; /* true if processing stopped by size */ Oid es_lastoid; /* last oid processed (by INSERT) */ int es_top_eflags; /* eflags passed to ExecutorStart */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b1dfa85..9e18331 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -2223,6 +2223,7 @@ typedef struct FetchStmt NodeTag type; FetchDirection direction; /* see above */ long howMany; /* number of rows, or position argument */ + long howLarge; /* total bytes of rows */ char *portalname; /* name of portal (cursor) */ bool ismove; /* TRUE if MOVE */ } FetchStmt; diff --git a/src/include/tcop/pquery.h b/src/include/tcop/pquery.h index 8073a6e..afffe86 100644 --- a/src/include/tcop/pquery.h +++ b/src/include/tcop/pquery.h @@ -33,13 +33,14 @@ extern void PortalStart(Portal portal, ParamListInfo params, extern void PortalSetResultFormat(Portal portal, int nFormats, int16 *formats); -extern bool PortalRun(Portal portal, long count, bool isTopLevel, +extern bool PortalRun(Portal portal, long count, long size, bool isTopLevel, DestReceiver *dest, DestReceiver *altdest, char *completionTag); extern long PortalRunFetch(Portal portal, FetchDirection fdirection, long count, + long size, DestReceiver *dest); #endif /* PQUERY_H */ diff --git a/src/interfaces/ecpg/preproc/Makefile b/src/interfaces/ecpg/preproc/Makefile index 1ecc405..b492fa7 100644 --- a/src/interfaces/ecpg/preproc/Makefile +++ b/src/interfaces/ecpg/preproc/Makefile @@ -48,7 +48,7 @@ ecpg: $(OBJS) | submake-libpgport preproc.o: pgc.c preproc.h: preproc.c ; -preproc.c: BISONFLAGS += -d +preproc.c: BISONFLAGS += -r all -d preproc.y: ../../../backend/parser/gram.y parse.pl ecpg.addons ecpg.header ecpg.tokens ecpg.trailer ecpg.type $(PERL) $(srcdir)/parse.pl $(srcdir) < $< > $@ diff --git a/src/interfaces/ecpg/preproc/ecpg.addons b/src/interfaces/ecpg/preproc/ecpg.addons index b3b36cf..bdccb68 100644 --- a/src/interfaces/ecpg/preproc/ecpg.addons +++ b/src/interfaces/ecpg/preproc/ecpg.addons @@ -220,13 +220,46 @@ ECPG: fetch_argsNEXTopt_from_incursor_name addon ECPG: fetch_argsPRIORopt_from_incursor_name addon ECPG: fetch_argsFIRST_Popt_from_incursor_name addon ECPG: fetch_argsLAST_Popt_from_incursor_name addon + add_additional_variables($3, false); + if ($3[0] == ':') + { + free($3); + $3 = mm_strdup("$0"); + } ECPG: fetch_argsALLopt_from_incursor_name addon +ECPG: fetch_argsFORWARDopt_from_incursor_name addon +ECPG: fetch_argsBACKWARDopt_from_incursor_name addon add_additional_variables($3, false); if ($3[0] == ':') { free($3); $3 = mm_strdup("$0"); } +ECPG: fetch_argsALLLIMITIconstopt_from_incursor_name addon + add_additional_variables($5, false); + if ($5[0] == ':') + { + free($5); + $5 = mm_strdup("$0"); + } + if ($3[0] == '$') + { + free($3); + $3 = mm_strdup("$0"); + } +ECPG: fetch_argsFORWARDALLLIMITIconstopt_from_incursor_name addon +ECPG: fetch_argsBACKWARDALLLIMITIconstopt_from_incursor_name addon + add_additional_variables($6, false); + if ($6[0] == ':') + { + free($6); + $6 = mm_strdup("$0"); + } + if ($4[0] == '$') + { + free($4); + $4 = mm_strdup("$0"); + } ECPG: fetch_argsSignedIconstopt_from_incursor_name addon add_additional_variables($3, false); if ($3[0] == ':') @@ -234,11 +267,41 @@ ECPG: fetch_argsSignedIconstopt_from_incursor_name addon free($3); $3 = mm_strdup("$0"); } +ECPG: fetch_argsSignedIconstLIMITIconstopt_from_incursor_name addon + add_additional_variables($5, false); + if ($5[0] == ':') + { + free($5); + $5 = mm_strdup("$0"); + } if ($1[0] == '$') { free($1); $1 = mm_strdup("$0"); } + if ($3[0] == '$') + { + free($3); + $3 = mm_strdup("$0"); + } +ECPG: fetch_argsFORWARDSignedIconstLIMITIconstopt_from_incursor_name addon +ECPG: fetch_argsBACKWARDSignedIconstLIMITIconstopt_from_incursor_name addon + add_additional_variables($6, false); + if ($6[0] == ':') + { + free($6); + $6 = mm_strdup("$0"); + } + if ($2[0] == '$') + { + free($2); + $2 = mm_strdup("$0"); + } + if ($4[0] == '$') + { + free($4); + $4 = mm_strdup("$0"); + } ECPG: fetch_argsFORWARDALLopt_from_incursor_name addon ECPG: fetch_argsBACKWARDALLopt_from_incursor_name addon add_additional_variables($4, false); -- 2.1.0.GIT
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers