On Thu, Feb 02, 2012 at 04:51:37PM +0900, Kyotaro HORIGUCHI wrote: > Hello, This is new version of dblink.c > > - Memory is properly freed when realloc returns NULL in storeHandler(). > > - The bug that free() in finishStoreInfo() will be fed with > garbage pointer when malloc for sinfo->valbuflen fails is > fixed.
Thanks, merged. I also did some minor coding style cleanups. Tagging it Ready For Committer. I don't see any notable issues with the patch anymore. There is some potential for experimenting with more aggressive optimizations on dblink side, but I'd like to get a nod from a committer for libpq changes first. -- marko
*** a/src/interfaces/libpq/exports.txt --- b/src/interfaces/libpq/exports.txt *************** *** 160,162 **** PQconnectStartParams 157 --- 160,164 ---- PQping 158 PQpingParams 159 PQlibVersion 160 + PQsetRowProcessor 161 + PQsetRowProcessorErrMsg 162 *** a/src/interfaces/libpq/fe-connect.c --- b/src/interfaces/libpq/fe-connect.c *************** *** 2693,2698 **** makeEmptyPGconn(void) --- 2693,2701 ---- conn->wait_ssl_try = false; #endif + /* set default row processor */ + PQsetRowProcessor(conn, NULL, NULL); + /* * We try to send at least 8K at a time, which is the usual size of pipe * buffers on Unix systems. That way, when we are sending a large amount *************** *** 2711,2718 **** makeEmptyPGconn(void) --- 2714,2726 ---- initPQExpBuffer(&conn->errorMessage); initPQExpBuffer(&conn->workBuffer); + /* set up initial row buffer */ + conn->rowBufLen = 32; + conn->rowBuf = (PGrowValue *)malloc(conn->rowBufLen * sizeof(PGrowValue)); + if (conn->inBuffer == NULL || conn->outBuffer == NULL || + conn->rowBuf == NULL || PQExpBufferBroken(&conn->errorMessage) || PQExpBufferBroken(&conn->workBuffer)) { *************** *** 2812,2817 **** freePGconn(PGconn *conn) --- 2820,2827 ---- free(conn->inBuffer); if (conn->outBuffer) free(conn->outBuffer); + if (conn->rowBuf) + free(conn->rowBuf); termPQExpBuffer(&conn->errorMessage); termPQExpBuffer(&conn->workBuffer); *************** *** 5076,5078 **** PQregisterThreadLock(pgthreadlock_t newhandler) --- 5086,5089 ---- return prev; } + *** a/src/interfaces/libpq/fe-exec.c --- b/src/interfaces/libpq/fe-exec.c *************** *** 66,71 **** static PGresult *PQexecFinish(PGconn *conn); --- 66,72 ---- static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target); static int check_field_number(const PGresult *res, int field_num); + static int pqAddRow(PGresult *res, void *param, PGrowValue *columns); /* ---------------- *************** *** 160,165 **** PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) --- 161,167 ---- result->curBlock = NULL; result->curOffset = 0; result->spaceLeft = 0; + result->rowProcessorErrMsg = NULL; if (conn) { *************** *** 701,707 **** pqClearAsyncResult(PGconn *conn) if (conn->result) PQclear(conn->result); conn->result = NULL; - conn->curTuple = NULL; } /* --- 703,708 ---- *************** *** 756,762 **** pqPrepareAsyncResult(PGconn *conn) */ res = conn->result; conn->result = NULL; /* handing over ownership to caller */ - conn->curTuple = NULL; /* just in case */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else --- 757,762 ---- *************** *** 828,833 **** pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...) --- 828,900 ---- } /* + * PQsetRowProcessor + * Set function that copies column data out from network buffer. + */ + void + PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param) + { + conn->rowProcessor = (func ? func : pqAddRow); + conn->rowProcessorParam = param; + } + + /* + * PQsetRowProcessorErrMsg + * Set the error message pass back to the caller of RowProcessor. + * + * You can replace the previous message by alternative mes, or clear + * it with NULL. + */ + void + PQsetRowProcessorErrMsg(PGresult *res, char *msg) + { + if (msg) + res->rowProcessorErrMsg = pqResultStrdup(res, msg); + else + res->rowProcessorErrMsg = NULL; + } + + /* + * pqAddRow + * add a row to the PGresult structure, growing it if necessary + * Returns TRUE if OK, FALSE if not enough memory to add the row. + */ + static int + pqAddRow(PGresult *res, void *param, PGrowValue *columns) + { + PGresAttValue *tup; + int nfields = res->numAttributes; + int i; + + tup = (PGresAttValue *) + pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); + if (tup == NULL) + return FALSE; + + for (i = 0 ; i < nfields ; i++) + { + tup[i].len = columns[i].len; + if (tup[i].len == NULL_LEN) + { + tup[i].value = res->null_field; + } + else + { + bool isbinary = (res->attDescs[i].format != 0); + tup[i].value = (char *)pqResultAlloc(res, tup[i].len + 1, isbinary); + if (tup[i].value == NULL) + return FALSE; + + memcpy(tup[i].value, columns[i].value, tup[i].len); + /* We have to terminate this ourselves */ + tup[i].value[tup[i].len] = '\0'; + } + } + + return pqAddTuple(res, tup); + } + + /* * pqAddTuple * add a row pointer to the PGresult structure, growing it if necessary * Returns TRUE if OK, FALSE if not enough memory to add the row *************** *** 1223,1229 **** PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result = NULL; - conn->curTuple = NULL; /* ready to send command message */ return true; --- 1290,1295 ---- *** a/src/interfaces/libpq/fe-misc.c --- b/src/interfaces/libpq/fe-misc.c *************** *** 219,224 **** pqGetnchar(char *s, size_t len, PGconn *conn) --- 219,243 ---- } /* + * pqGetnchar: + * skip len bytes in input buffer. + */ + int + pqSkipnchar(size_t len, PGconn *conn) + { + if (len > (size_t) (conn->inEnd - conn->inCursor)) + return EOF; + + conn->inCursor += len; + + if (conn->Pfdebug) + fprintf(conn->Pfdebug, "From backend (%lu skipped)\n", + (unsigned long) len); + + return 0; + } + + /* * pqPutnchar: * write exactly len bytes to the current message */ *** a/src/interfaces/libpq/fe-protocol2.c --- b/src/interfaces/libpq/fe-protocol2.c *************** *** 703,721 **** failure: /* * parseInput subroutine to read a 'B' or 'D' (row data) message. ! * We add another tuple to the existing PGresult structure. * Returns: 0 if completed message, EOF if error or not enough data yet. * * Note that if we run out of data, we have to suspend and reprocess ! * the message after more data is received. We keep a partially constructed ! * tuple in conn->curTuple, and avoid reallocating already-allocated storage. */ static int getAnotherTuple(PGconn *conn, bool binary) { PGresult *result = conn->result; int nfields = result->numAttributes; ! PGresAttValue *tup; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64]; /* used unless it doesn't fit */ --- 703,720 ---- /* * parseInput subroutine to read a 'B' or 'D' (row data) message. ! * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error or not enough data yet. * * Note that if we run out of data, we have to suspend and reprocess ! * the message after more data is received. */ static int getAnotherTuple(PGconn *conn, bool binary) { PGresult *result = conn->result; int nfields = result->numAttributes; ! PGrowValue *rowbuf; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64]; /* used unless it doesn't fit */ *************** *** 727,754 **** getAnotherTuple(PGconn *conn, bool binary) int bitcnt; /* number of bits examined in current byte */ int vlen; /* length of the current field value */ result->binary = binary; ! /* Allocate tuple space if first time for this data message */ ! if (conn->curTuple == NULL) { ! conn->curTuple = (PGresAttValue *) ! pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); ! if (conn->curTuple == NULL) ! goto outOfMemory; ! MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); ! ! /* ! * If it's binary, fix the column format indicators. We assume the ! * backend will consistently send either B or D, not a mix. ! */ ! if (binary) ! { ! for (i = 0; i < nfields; i++) ! result->attDescs[i].format = 1; ! } } - tup = conn->curTuple; /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; --- 726,752 ---- int bitcnt; /* number of bits examined in current byte */ int vlen; /* length of the current field value */ + /* resize row buffer if needed */ + if (nfields > conn->rowBufLen) + { + rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue)); + if (!rowbuf) + goto rowProcessError; + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; + } + else + { + rowbuf = conn->rowBuf; + } + result->binary = binary; ! if (binary) { ! for (i = 0; i < nfields; i++) ! result->attDescs[i].format = 1; } /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; *************** *** 757,763 **** getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) ! goto outOfMemory; } if (pqGetnchar(bitmap, nbytes, conn)) --- 755,761 ---- { bitmap = (char *) malloc(nbytes); if (!bitmap) ! goto rowProcessError; } if (pqGetnchar(bitmap, nbytes, conn)) *************** *** 771,804 **** getAnotherTuple(PGconn *conn, bool binary) for (i = 0; i < nfields; i++) { if (!(bmap & 0200)) ! { ! /* if the field value is absent, make it a null string */ ! tup[i].value = result->null_field; ! tup[i].len = NULL_LEN; ! } else { - /* get the value length (the first four bytes are for length) */ - if (pqGetInt(&vlen, 4, conn)) - goto EOFexit; if (!binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - goto EOFexit; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } /* advance the bitmap stuff */ bitcnt++; if (bitcnt == BITS_PER_BYTE) --- 769,797 ---- for (i = 0; i < nfields; i++) { if (!(bmap & 0200)) ! vlen = NULL_LEN; ! else if (pqGetInt(&vlen, 4, conn)) ! goto EOFexit; else { if (!binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; } + + /* + * rowbuf[i].value always points to the next address of the + * length field even if the value is NULL, to allow safe + * size estimates and data copy. + */ + rowbuf[i].value = conn->inBuffer + conn->inCursor; + rowbuf[i].len = vlen; + + /* Skip the value */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + goto EOFexit; + /* advance the bitmap stuff */ bitcnt++; if (bitcnt == BITS_PER_BYTE) *************** *** 811,827 **** getAnotherTuple(PGconn *conn, bool binary) bmap <<= 1; } ! /* Success! Store the completed tuple in the result */ ! if (!pqAddTuple(result, tup)) ! goto outOfMemory; ! /* and reset for a new message */ ! conn->curTuple = NULL; if (bitmap != std_bitmap) free(bitmap); return 0; ! outOfMemory: /* Replace partially constructed result with an error result */ /* --- 804,820 ---- bmap <<= 1; } ! /* Success! Pass the completed row values to rowProcessor */ ! if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf)) ! goto rowProcessError; if (bitmap != std_bitmap) free(bitmap); + return 0; ! rowProcessError: ! /* Replace partially constructed result with an error result */ /* *************** *** 829,838 **** outOfMemory: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); ! printfPQExpBuffer(&conn->errorMessage, ! libpq_gettext("out of memory for query result\n")); /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can * do to recover... */ --- 822,838 ---- * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); ! resetPQExpBuffer(&conn->errorMessage); /* + * If error message is passed from RowProcessor, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + result->rowProcessorErrMsg ? + result->rowProcessorErrMsg : + libpq_gettext("out of memory for query result\n")); + /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can * do to recover... */ *** a/src/interfaces/libpq/fe-protocol3.c --- b/src/interfaces/libpq/fe-protocol3.c *************** *** 613,646 **** failure: /* * parseInput subroutine to read a 'D' (row data) message. ! * We add another tuple to the existing PGresult structure. * Returns: 0 if completed message, EOF if error or not enough data yet. * * Note that if we run out of data, we have to suspend and reprocess ! * the message after more data is received. We keep a partially constructed ! * tuple in conn->curTuple, and avoid reallocating already-allocated storage. */ static int getAnotherTuple(PGconn *conn, int msgLength) { PGresult *result = conn->result; int nfields = result->numAttributes; ! PGresAttValue *tup; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) - { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - } - tup = conn->curTuple; - /* Get the field count and make sure it's what we expect */ if (pqGetInt(&tupnfields, 2, conn)) return EOF; --- 613,634 ---- /* * parseInput subroutine to read a 'D' (row data) message. ! * It fills rowbuf with column pointers and then calls row processor. * Returns: 0 if completed message, EOF if error or not enough data yet. * * Note that if we run out of data, we have to suspend and reprocess ! * the message after more data is received. */ static int getAnotherTuple(PGconn *conn, int msgLength) { PGresult *result = conn->result; int nfields = result->numAttributes; ! PGrowValue *rowbuf; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; /* Get the field count and make sure it's what we expect */ if (pqGetInt(&tupnfields, 2, conn)) return EOF; *************** *** 656,661 **** getAnotherTuple(PGconn *conn, int msgLength) --- 644,663 ---- return 0; } + /* resize row buffer if needed */ + if (nfields > conn->rowBufLen) + { + rowbuf = realloc(conn->rowBuf, nfields * sizeof(PGrowValue)); + if (!rowbuf) + goto rowProcessError; + conn->rowBuf = rowbuf; + conn->rowBufLen = nfields; + } + else + { + rowbuf = conn->rowBuf; + } + /* Scan the fields */ for (i = 0; i < nfields; i++) { *************** *** 663,710 **** getAnotherTuple(PGconn *conn, int msgLength) if (pqGetInt(&vlen, 4, conn)) return EOF; if (vlen == -1) ! { ! /* null field */ ! tup[i].value = result->null_field; ! tup[i].len = NULL_LEN; ! continue; ! } ! if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - bool isbinary = (result->attDescs[i].format != 0); ! tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); ! if (tup[i].value == NULL) ! goto outOfMemory; ! } ! tup[i].len = vlen; ! /* read in the value */ ! if (vlen > 0) ! if (pqGetnchar((char *) (tup[i].value), vlen, conn)) ! return EOF; ! /* we have to terminate this ourselves */ ! tup[i].value[vlen] = '\0'; } ! /* Success! Store the completed tuple in the result */ ! if (!pqAddTuple(result, tup)) ! goto outOfMemory; ! /* and reset for a new message */ ! conn->curTuple = NULL; return 0; ! outOfMemory: /* * Replace partially constructed result with an error result. First * discard the old result to try to win back some memory. */ pqClearAsyncResult(conn); ! printfPQExpBuffer(&conn->errorMessage, ! libpq_gettext("out of memory for query result\n")); pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ --- 665,710 ---- if (pqGetInt(&vlen, 4, conn)) return EOF; if (vlen == -1) ! vlen = NULL_LEN; ! else if (vlen < 0) vlen = 0; ! /* ! * rowbuf[i].value always points to the next address of the ! * length field even if the value is NULL, to allow safe ! * size estimates and data copy. ! */ ! rowbuf[i].value = conn->inBuffer + conn->inCursor; ! rowbuf[i].len = vlen; ! ! /* Skip to the next length field */ ! if (vlen > 0 && pqSkipnchar(vlen, conn)) ! return EOF; } ! /* Success! Pass the completed row values to rowProcessor */ ! if (!conn->rowProcessor(result, conn->rowProcessorParam, rowbuf)) ! goto rowProcessError; return 0; ! rowProcessError: /* * Replace partially constructed result with an error result. First * discard the old result to try to win back some memory. */ pqClearAsyncResult(conn); ! resetPQExpBuffer(&conn->errorMessage); ! ! /* ! * If error message is passed from addTupleFunc, set it into ! * PGconn, assume out of memory if not. ! */ ! appendPQExpBufferStr(&conn->errorMessage, ! result->rowProcessorErrMsg ? ! result->rowProcessorErrMsg : ! libpq_gettext("out of memory for query result\n")); pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ *** a/src/interfaces/libpq/libpq-fe.h --- b/src/interfaces/libpq/libpq-fe.h *************** *** 149,154 **** typedef struct pgNotify --- 149,165 ---- struct pgNotify *next; /* list link */ } PGnotify; + /* PGrowValue points a column value of in network buffer. + * Value is a string without null termination and length len. + * NULL is represented as len < 0, value points then to place + * where value would have been. + */ + typedef struct pgRowValue + { + int len; /* length in bytes of the value */ + char *value; /* actual value, without null termination */ + } PGrowValue; + /* Function types for notice-handling callbacks */ typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res); typedef void (*PQnoticeProcessor) (void *arg, const char *message); *************** *** 416,421 **** extern PGPing PQping(const char *conninfo); --- 427,463 ---- extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); + /* + * Typedef for alternative row processor. + * + * Columns array will contain PQnfields() entries, each one + * pointing to particular column data in network buffer. + * This function is supposed to copy data out from there + * and store somewhere. NULL is signified with len<0. + * + * This function must return 1 for success and must return 0 for + * failure and may set error message by PQsetRowProcessorErrMsg. It + * is assumed by caller as out of memory when the error message is not + * set on failure. This function is assumed not to throw any + * exception. + */ + typedef int (*PQrowProcessor)(PGresult *res, void *param, + PGrowValue *columns); + + /* + * Set alternative row data processor for PGconn. + * + * By registering this function, pg_result disables its own result + * store and calls it for rows one by one. + * + * func is row processor function. See the typedef RowProcessor. + * + * rowProcessorParam is the contextual variable that passed to + * RowProcessor. + */ + extern void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, + void *rowProcessorParam); + /* Force the write buffer to be written (or at least try) */ extern int PQflush(PGconn *conn); *************** *** 454,459 **** extern char *PQcmdTuples(PGresult *res); --- 496,502 ---- extern char *PQgetvalue(const PGresult *res, int tup_num, int field_num); extern int PQgetlength(const PGresult *res, int tup_num, int field_num); extern int PQgetisnull(const PGresult *res, int tup_num, int field_num); + extern void PQsetRowProcessorErrMsg(PGresult *res, char *msg); extern int PQnparams(const PGresult *res); extern Oid PQparamtype(const PGresult *res, int param_num); *** a/src/interfaces/libpq/libpq-int.h --- b/src/interfaces/libpq/libpq-int.h *************** *** 209,214 **** struct pg_result --- 209,217 ---- PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytes remaining in block */ + + /* temp etorage for message from row processor callback */ + char *rowProcessorErrMsg; }; /* PGAsyncStatusType defines the state of the query-execution state machine */ *************** *** 398,404 **** struct pg_conn /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ - PGresAttValue *curTuple; /* tuple currently being read */ #ifdef USE_SSL bool allow_ssl_try; /* Allowed to try SSL negotiation */ --- 401,406 ---- *************** *** 443,448 **** struct pg_conn --- 445,458 ---- /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + + /* + * Read column data from network buffer. + */ + PQrowProcessor rowProcessor;/* Function pointer */ + void *rowProcessorParam; /* Contextual parameter for rowProcessor */ + PGrowValue *rowBuf; /* Buffer for passing values to rowProcessor */ + int rowBufLen; /* Number of columns allocated in rowBuf */ }; /* PGcancel stores all data necessary to cancel a connection. A copy of this *************** *** 560,565 **** extern int pqGets(PQExpBuffer buf, PGconn *conn); --- 570,576 ---- extern int pqGets_append(PQExpBuffer buf, PGconn *conn); extern int pqPuts(const char *s, PGconn *conn); extern int pqGetnchar(char *s, size_t len, PGconn *conn); + extern int pqSkipnchar(size_t len, PGconn *conn); extern int pqPutnchar(const char *s, size_t len, PGconn *conn); extern int pqGetInt(int *result, size_t bytes, PGconn *conn); extern int pqPutInt(int value, size_t bytes, PGconn *conn);
*** a/doc/src/sgml/libpq.sgml --- b/doc/src/sgml/libpq.sgml *************** *** 7233,7238 **** int PQisthreadsafe(); --- 7233,7443 ---- </sect1> + <sect1 id="libpq-altrowprocessor"> + <title>Alternative row processor</title> + + <indexterm zone="libpq-altrowprocessor"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, rows are stored into <type>PQresult</type> + until full resultset is received. Then such completely-filled + <type>PQresult</type> is passed to user. This behaviour can be + changed by registering alternative row processor function, + that will see each row data as soon as it is received + from network. It has the option of processing the data + immediately, or storing it into custom container. + </para> + + <para> + Note - as row processor sees rows as they arrive, it cannot know + whether the SQL statement actually finishes successfully on server + or not. So some care must be taken to get proper + transactionality. + </para> + + <variablelist> + <varlistentry id="libpq-pqsetrowprocessor"> + <term> + <function>PQsetRowProcessor</function> + <indexterm> + <primary>PQsetRowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a callback function to process each row. + <synopsis> + void PQsetRowProcessor(PGconn *conn, PQrowProcessor func, void *param); + </synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the row processor function. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Storage handler function to set. NULL means to use the + default processor. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqrowprocessor"> + <term> + <type>PQrowProcessor</type> + <indexterm> + <primary>PQrowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the row processor callback function. + <synopsis> + int (*PQrowProcessor)(PGresult *res, void *param, PGrowValue *columns); + + typedef struct + { + int len; /* length in bytes of the value */ + char *value; /* actual value, without null termination */ + } PGrowValue; + </synopsis> + </para> + + <para> + The <parameter>columns</parameter> array will have PQnfields() + elements, each one pointing to column value in network buffer. + </para> + + <para> + This function must process or copy row values away from network + buffer before it returns, as next row might overwrite them. + </para> + + <para> + This function must return 1 for success, and 0 for failure. + On failure this function should set the error message + with <function>PGsetRowProcessorErrMsg</function> if the cause + is other than out of memory. This funcion must not throw any + exception. + </para> + <variablelist> + <varlistentry> + + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>param</parameter></term> + <listitem> + <para> + Extra parameter that was given to <function>PQsetRowProcessor</function>. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>columns</parameter></term> + <listitem> + <para> + Column values of the row to process. Column values + are located in network buffer, the processor must + copy them out from there. + </para> + <para> + Column values are not null-terminated, so processor cannot + use C string functions on them directly. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqsetrowprocessorerrmsg"> + <term> + <function>PQsetRowProcessorErrMsg</function> + <indexterm> + <primary>PQsetRowProcessorErrMsg</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred + in <type>PQrowProcessor</type>. If this message is not set, the + caller assumes the error to be out of memory. + <synopsis> + void PQsetRowProcessorErrMsg(PGresult *res, char *msg) + </synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + passed to <type>PQrowProcessor</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>mes</parameter></term> + <listitem> + <para> + Error message. This will be copied internally so there is + no need to care of the scope. + </para> + <para> + If <parameter>res</parameter> already has a message previously + set, it will be overritten. Set NULL to cancel the the costom + message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title>
*** a/contrib/dblink/dblink.c --- b/contrib/dblink/dblink.c *************** *** 63,73 **** typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor */ } remoteConn; /* * Internal declarations */ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); - static void materializeResult(FunctionCallInfo fcinfo, PGresult *res); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn *rconn); --- 63,85 ---- bool newXactForCursor; /* Opened a transaction for a cursor */ } remoteConn; + typedef struct storeInfo + { + Tuplestorestate *tuplestore; + int nattrs; + MemoryContext oldcontext; + AttInMetadata *attinmeta; + char** valbuf; + int *valbuflen; + bool error_occurred; + bool nummismatch; + ErrorData *edata; + } storeInfo; + /* * Internal declarations */ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn *rconn); *************** *** 90,95 **** static char *escape_param_str(const char *from); --- 102,111 ---- static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); + static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); + static void finishStoreInfo(storeInfo *sinfo); + static int storeHandler(PGresult *res, void *param, PGrowValue *columns); + /* Global */ static remoteConn *pconn = NULL; *************** *** 503,508 **** dblink_fetch(PG_FUNCTION_ARGS) --- 519,525 ---- char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; *************** *** 559,573 **** dblink_fetch(PG_FUNCTION_ARGS) --- 576,611 ---- appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even though we are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } *************** *** 579,586 **** dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } - materializeResult(fcinfo, res); return (Datum) 0; } --- 617,624 ---- (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); return (Datum) 0; } *************** *** 640,645 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) --- 678,684 ---- remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) *************** *** 715,878 **** dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc = NULL; /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } ! /* if needed, close the connection to the database and cleanup */ ! if (freeconn) ! PQfinish(conn); ! if (!res || ! (PQresultStatus(res) != PGRES_COMMAND_OK && ! PQresultStatus(res) != PGRES_TUPLES_OK)) { ! dblink_res_error(conname, res, "could not execute query", fail); ! return (Datum) 0; } - materializeResult(fcinfo, res); return (Datum) 0; } - /* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */ static void ! materializeResult(FunctionCallInfo fcinfo, PGresult *res) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; ! Assert(rsinfo->returnMode == SFRM_Materialize); ! PG_TRY(); { ! TupleDesc tupdesc; ! bool is_sql_cmd = false; ! int ntuples; ! int nfields; ! if (PQresultStatus(res) == PGRES_COMMAND_OK) ! { ! is_sql_cmd = true; ! /* ! * need a tuple descriptor representing one TEXT column to return ! * the command status string as our result tuple ! */ ! tupdesc = CreateTemplateTupleDesc(1, false); ! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", ! TEXTOID, -1, 0); ! ntuples = 1; ! nfields = 1; ! } ! else ! { ! Assert(PQresultStatus(res) == PGRES_TUPLES_OK); ! is_sql_cmd = false; ! /* get a tuple descriptor for our result type */ ! switch (get_call_result_type(fcinfo, NULL, &tupdesc)) ! { ! case TYPEFUNC_COMPOSITE: ! /* success */ ! break; ! case TYPEFUNC_RECORD: ! /* failed to determine actual type of RECORD */ ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("function returning record called in context " ! "that cannot accept type record"))); ! break; ! default: ! /* result type isn't composite */ ! elog(ERROR, "return type must be a row type"); ! break; ! } ! /* make sure we have a persistent copy of the tupdesc */ ! tupdesc = CreateTupleDescCopy(tupdesc); ! ntuples = PQntuples(res); ! nfields = PQnfields(res); } ! /* ! * check result and tuple descriptor have the same number of columns ! */ ! if (nfields != tupdesc->natts) ! ereport(ERROR, ! (errcode(ERRCODE_DATATYPE_MISMATCH), ! errmsg("remote query result rowtype does not match " ! "the specified FROM clause rowtype"))); ! if (ntuples > 0) ! { ! AttInMetadata *attinmeta; ! Tuplestorestate *tupstore; ! MemoryContext oldcontext; ! int row; ! char **values; ! ! attinmeta = TupleDescGetAttInMetadata(tupdesc); ! ! oldcontext = MemoryContextSwitchTo( ! rsinfo->econtext->ecxt_per_query_memory); ! tupstore = tuplestore_begin_heap(true, false, work_mem); ! rsinfo->setResult = tupstore; ! rsinfo->setDesc = tupdesc; ! MemoryContextSwitchTo(oldcontext); ! values = (char **) palloc(nfields * sizeof(char *)); ! /* put all tuples into the tuplestore */ ! for (row = 0; row < ntuples; row++) ! { ! HeapTuple tuple; ! if (!is_sql_cmd) ! { ! int i; ! for (i = 0; i < nfields; i++) ! { ! if (PQgetisnull(res, row, i)) ! values[i] = NULL; ! else ! values[i] = PQgetvalue(res, row, i); ! } ! } else ! { ! values[0] = PQcmdStatus(res); ! } ! ! /* build the tuple and put it into the tuplestore. */ ! tuple = BuildTupleFromCStrings(attinmeta, values); ! tuplestore_puttuple(tupstore, tuple); } ! /* clean up and return the tuplestore */ ! tuplestore_donestoring(tupstore); } ! PQclear(res); } PG_CATCH(); { ! /* be sure to release the libpq result */ ! PQclear(res); ! PG_RE_THROW(); } PG_END_TRY(); } /* --- 754,993 ---- rsinfo->setResult = NULL; rsinfo->setDesc = NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQsetRowProcessor(conn, storeHandler, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else res = PQgetResult(conn); ! finishStoreInfo(&storeinfo); ! /* NULL res from async get means we're all done with the results */ ! if (res || !is_async) { ! if (freeconn) ! PQfinish(conn); ! ! if (!res || ! (PQresultStatus(res) != PGRES_COMMAND_OK && ! PQresultStatus(res) != PGRES_TUPLES_OK)) ! { ! /* finishStoreInfo saves the fields referred to below. */ ! if (storeinfo.nummismatch) ! { ! /* This is only for backward compatibility */ ! ereport(ERROR, ! (errcode(ERRCODE_DATATYPE_MISMATCH), ! errmsg("remote query result rowtype does not match " ! "the specified FROM clause rowtype"))); ! } ! else if (storeinfo.edata) ! ReThrowError(storeinfo.edata); ! ! dblink_res_error(conname, res, "could not execute query", fail); ! return (Datum) 0; ! } } + PQclear(res); return (Datum) 0; } static void ! initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo) { ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + int i; ! switch (get_call_result_type(fcinfo, NULL, &tupdesc)) ! { ! case TYPEFUNC_COMPOSITE: ! /* success */ ! break; ! case TYPEFUNC_RECORD: ! /* failed to determine actual type of RECORD */ ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("function returning record called in context " ! "that cannot accept type record"))); ! break; ! default: ! /* result type isn't composite */ ! elog(ERROR, "return type must be a row type"); ! break; ! } ! sinfo->oldcontext = MemoryContextSwitchTo( ! rsinfo->econtext->ecxt_per_query_memory); ! ! /* make sure we have a persistent copy of the tupdesc */ ! tupdesc = CreateTupleDescCopy(tupdesc); ! ! sinfo->error_occurred = FALSE; ! sinfo->nummismatch = FALSE; ! sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); ! sinfo->edata = NULL; ! sinfo->nattrs = tupdesc->natts; ! sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); ! sinfo->valbuf = NULL; ! sinfo->valbuflen = NULL; ! ! /* Preallocate memory of same size with c string array for values. */ ! sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*)); ! if (sinfo->valbuf) ! sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int)); ! if (sinfo->valbuflen == NULL) { ! if (sinfo->valbuf) ! free(sinfo->valbuf); ! ereport(ERROR, ! (errcode(ERRCODE_OUT_OF_MEMORY), ! errmsg("out of memory"))); ! } ! for (i = 0 ; i < sinfo->nattrs ; i++) ! { ! sinfo->valbuf[i] = NULL; ! sinfo->valbuflen[i] = -1; ! } ! rsinfo->setResult = sinfo->tuplestore; ! rsinfo->setDesc = tupdesc; ! } ! static void ! finishStoreInfo(storeInfo *sinfo) ! { ! int i; ! if (sinfo->valbuf) ! { ! for (i = 0 ; i < sinfo->nattrs ; i++) ! { ! if (sinfo->valbuf[i]) ! free(sinfo->valbuf[i]); } + free(sinfo->valbuf); + sinfo->valbuf = NULL; + } ! if (sinfo->valbuflen) ! { ! free(sinfo->valbuflen); ! sinfo->valbuflen = NULL; ! } ! MemoryContextSwitchTo(sinfo->oldcontext); ! } ! static int ! storeHandler(PGresult *res, void *param, PGrowValue *columns) ! { ! storeInfo *sinfo = (storeInfo *)param; ! HeapTuple tuple; ! int fields = PQnfields(res); ! int i; ! char *cstrs[PQnfields(res)]; ! if (sinfo->error_occurred) ! return FALSE; ! if (sinfo->nattrs != fields) ! { ! sinfo->error_occurred = TRUE; ! sinfo->nummismatch = TRUE; ! finishStoreInfo(sinfo); ! ! /* This error will be processed in ! * dblink_record_internal(). So do not set error message ! * here. */ ! return FALSE; ! } ! /* ! * value input functions assumes that the input string is ! * terminated by zero. We should make the values to be so. ! */ ! for(i = 0 ; i < fields ; i++) ! { ! int len = columns[i].len; ! if (len < 0) ! cstrs[i] = NULL; ! else ! { ! char *tmp = sinfo->valbuf[i]; ! int tmplen = sinfo->valbuflen[i]; ! /* ! * Divide calls to malloc and realloc so that things will ! * go fine even on the systems of which realloc() does not ! * accept NULL as old memory block. ! * ! * Also try to (re)allocate in bigger steps to ! * avoid flood of allocations on weird data. ! */ ! if (tmp == NULL) ! { ! tmplen = len + 1; ! if (tmplen < 64) ! tmplen = 64; ! tmp = (char *)malloc(tmplen); ! } ! else if (tmplen < len + 1) ! { ! if (len + 1 > tmplen * 2) ! tmplen = len + 1; else ! tmplen = tmplen * 2; ! tmp = (char *)realloc(tmp, tmplen); } ! /* ! * sinfo->valbuf[n] will be freed in finishStoreInfo() ! * when realloc returns NULL. ! */ ! if (tmp == NULL) ! return FALSE; ! ! sinfo->valbuf[i] = tmp; ! sinfo->valbuflen[i] = tmplen; ! ! cstrs[i] = sinfo->valbuf[i]; ! memcpy(cstrs[i], columns[i].value, len); ! cstrs[i][len] = '\0'; } + } ! PG_TRY(); ! { ! tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); ! tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { ! MemoryContext context; ! /* ! * Store exception for later ReThrow and cancel the exception. ! */ ! sinfo->error_occurred = TRUE; ! context = MemoryContextSwitchTo(sinfo->oldcontext); ! sinfo->edata = CopyErrorData(); ! MemoryContextSwitchTo(context); ! FlushErrorState(); ! return FALSE; } PG_END_TRY(); + + return TRUE; } /*
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers