David Rowley <david.row...@2ndquadrant.com> writes: > [ combinefn_for_string_and_array_aggs_v7.patch ]
I spent a fair amount of time hacking on this with intent to commit, but just as I was getting to code that I liked, I started to have second thoughts about whether this is a good idea at all. I quote from the fine manual: The aggregate functions array_agg, json_agg, jsonb_agg, json_object_agg, jsonb_object_agg, string_agg, and xmlagg, as well as similar user-defined aggregate functions, produce meaningfully different result values depending on the order of the input values. This ordering is unspecified by default, but can be controlled by writing an ORDER BY clause within the aggregate call, as shown in Section 4.2.7. Alternatively, supplying the input values from a sorted subquery will usually work ... I do not think it is accidental that these aggregates are exactly the ones that do not have parallelism support today. Rather, that's because you just about always have an interest in the order in which the inputs get aggregated, which is something that parallel aggregation cannot support. I fear that what will happen, if we commit this, is that something like 0.01% of the users of array_agg and string_agg will be pleased, another maybe 20% will be unaffected because they wrote ORDER BY which prevents parallel aggregation, and the remaining 80% will scream because we broke their queries. Telling them they should've written ORDER BY isn't going to cut it, IMO, when the benefit of that breakage will accrue only to some very tiny fraction of use-cases. In short, I think we ought to reject this. Just in case I'm outvoted, attached is what I'd gotten done so far. The main noncosmetic changes I'd made were to improve the caching logic (it's silly to set up a lookup cache and then not cache the fmgr_info lookup) and to not cheat quite as much on the StringInfo passed down to the element typreceive function. There isn't any other place, I don't think, where we don't honor the expectation that StringInfos have trailing null bytes, and some places may depend on it --- array_recv does. The main thing that remains undone is to get some test coverage --- AFAICS, none of these new functions get exercised in the standard regression tests. I'm also a bit unhappy that the patch introduces code into array_userfuncs.c that knows everything there is to know about the contents of ArrayBuildState and ArrayBuildStateArr. Previously that knowledge was pretty well localized in arrayfuncs.c. I wonder if it'd be better to move the new combinefuncs and serial/deserial funcs into arrayfuncs.c. Or perhaps export primitives from there that could be used to build them. regards, tom lane
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 7b1a85f..faa4939 100644 *** a/doc/src/sgml/func.sgml --- b/doc/src/sgml/func.sgml *************** NULL baz</literallayout>(3 rows)</entry> *** 13358,13364 **** <entry> array of the argument type </entry> ! <entry>No</entry> <entry>input values, including nulls, concatenated into an array</entry> </row> --- 13358,13364 ---- <entry> array of the argument type </entry> ! <entry>Yes</entry> <entry>input values, including nulls, concatenated into an array</entry> </row> *************** NULL baz</literallayout>(3 rows)</entry> *** 13372,13378 **** <entry> same as argument data type </entry> ! <entry>No</entry> <entry>input arrays concatenated into array of one higher dimension (inputs must all have same dimensionality, and cannot be empty or NULL)</entry> --- 13372,13378 ---- <entry> same as argument data type </entry> ! <entry>Yes</entry> <entry>input arrays concatenated into array of one higher dimension (inputs must all have same dimensionality, and cannot be empty or NULL)</entry> *************** NULL baz</literallayout>(3 rows)</entry> *** 13633,13639 **** <entry> same as argument types </entry> ! <entry>No</entry> <entry>input values concatenated into a string, separated by delimiter</entry> </row> --- 13633,13639 ---- <entry> same as argument types </entry> ! <entry>Yes</entry> <entry>input values concatenated into a string, separated by delimiter</entry> </row> diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c index cb7a6b6..0007a28 100644 *** a/src/backend/utils/adt/array_userfuncs.c --- b/src/backend/utils/adt/array_userfuncs.c *************** *** 13,24 **** --- 13,44 ---- #include "postgres.h" #include "catalog/pg_type.h" + #include "libpq/pqformat.h" #include "common/int.h" #include "utils/array.h" + #include "utils/datum.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/typcache.h" + /* + * SerialIOData + * Used for caching element-type data in array_agg_serialize + */ + typedef struct SerialIOData + { + FmgrInfo typsend; + } SerialIOData; + + /* + * DeserialIOData + * Used for caching element-type data in array_agg_deserialize + */ + typedef struct DeserialIOData + { + FmgrInfo typreceive; + Oid typioparam; + } DeserialIOData; static Datum array_position_common(FunctionCallInfo fcinfo); *************** array_agg_transfn(PG_FUNCTION_ARGS) *** 499,504 **** --- 519,838 ---- } Datum + array_agg_combine(PG_FUNCTION_ARGS) + { + ArrayBuildState *state1; + ArrayBuildState *state2; + MemoryContext agg_context; + MemoryContext old_context; + int i; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + { + /* + * NULL state2 is easy, just return state1, which we know is already + * in the agg_context + */ + if (state1 == NULL) + PG_RETURN_NULL(); + PG_RETURN_POINTER(state1); + } + + if (state1 == NULL) + { + /* We must copy state2's data into the agg_context */ + state1 = initArrayResultWithSize(state2->element_type, agg_context, + false, state2->alen); + + old_context = MemoryContextSwitchTo(agg_context); + + for (i = 0; i < state2->nelems; i++) + { + if (!state2->dnulls[i]) + state1->dvalues[i] = datumCopy(state2->dvalues[i], + state1->typbyval, + state1->typlen); + else + state1->dvalues[i] = (Datum) 0; + } + + MemoryContextSwitchTo(old_context); + + memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems); + + state1->nelems = state2->nelems; + + PG_RETURN_POINTER(state1); + } + else if (state2->nelems > 0) + { + /* We only need to combine the two states if state2 has any elements */ + int reqsize = state1->nelems + state2->nelems; + MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext); + + Assert(state1->element_type == state2->element_type); + + /* Enlarge state1 arrays if needed */ + if (state1->alen < reqsize) + { + /* Use a power of 2 size rather than allocating just reqsize */ + while (state1->alen < reqsize) + state1->alen *= 2; + + state1->dvalues = (Datum *) repalloc(state1->dvalues, + state1->alen * sizeof(Datum)); + state1->dnulls = (bool *) repalloc(state1->dnulls, + state1->alen * sizeof(bool)); + } + + /* Copy in the state2 elements to the end of the state1 arrays */ + for (i = 0; i < state2->nelems; i++) + { + if (!state2->dnulls[i]) + state1->dvalues[i + state1->nelems] = + datumCopy(state2->dvalues[i], + state1->typbyval, + state1->typlen); + else + state1->dvalues[i + state1->nelems] = (Datum) 0; + } + + memcpy(&state1->dnulls[state1->nelems], state2->dnulls, + sizeof(bool) * state2->nelems); + + state1->nelems = reqsize; + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); + } + + /* + * array_agg_serialize + * Serialize ArrayBuildState into bytea. + */ + Datum + array_agg_serialize(PG_FUNCTION_ARGS) + { + ArrayBuildState *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildState *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + */ + pq_sendint32(&buf, state->element_type); + + /* + * nelems -- send first so we know how large to make the dvalues and + * dnulls array during deserialization. + */ + pq_sendint64(&buf, state->nelems); + + /* alen can be decided during deserialization */ + + /* typlen */ + pq_sendint16(&buf, state->typlen); + + /* typbyval */ + pq_sendbyte(&buf, state->typbyval); + + /* typalign */ + pq_sendbyte(&buf, state->typalign); + + /* dnulls */ + pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems); + + /* + * dvalues. By agreement with array_agg_deserialize, when the element + * type is byval, we just transmit the Datum array as-is, including any + * null elements. For by-ref types, we must invoke the element type's + * send function, and we skip null elements (which is why the nulls flags + * must be sent first). + */ + if (state->typbyval) + pq_sendbytes(&buf, (char *) state->dvalues, + sizeof(Datum) * state->nelems); + else + { + SerialIOData *iodata; + int i; + + /* Avoid repeat catalog lookups for typsend function */ + iodata = (SerialIOData *) fcinfo->flinfo->fn_extra; + if (iodata == NULL) + { + Oid typsend; + bool typisvarlena; + + iodata = (SerialIOData *) + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(SerialIOData)); + getTypeBinaryOutputInfo(state->element_type, &typsend, + &typisvarlena); + fmgr_info_cxt(typsend, &iodata->typsend, + fcinfo->flinfo->fn_mcxt); + fcinfo->flinfo->fn_extra = (void *) iodata; + } + + for (i = 0; i < state->nelems; i++) + { + bytea *outputbytes; + + if (state->dnulls[i]) + continue; + outputbytes = SendFunctionCall(&iodata->typsend, + state->dvalues[i]); + pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ); + pq_sendbytes(&buf, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); + } + + Datum + array_agg_deserialize(PG_FUNCTION_ARGS) + { + bytea *sstate; + ArrayBuildState *result; + StringInfoData buf; + Oid element_type; + int64 nelems; + const char *temp; + + if (!AggCheckCallContext(fcinfo, NULL)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* nelems */ + nelems = pq_getmsgint64(&buf); + + /* Create output ArrayBuildState with the needed number of elements */ + result = initArrayResultWithSize(element_type, CurrentMemoryContext, + false, nelems); + result->nelems = nelems; + + /* typlen */ + result->typlen = pq_getmsgint(&buf, 2); + + /* typbyval */ + result->typbyval = pq_getmsgbyte(&buf); + + /* typalign */ + result->typalign = pq_getmsgbyte(&buf); + + /* dnulls */ + temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems); + memcpy(result->dnulls, temp, sizeof(bool) * nelems); + + /* dvalues --- see comment in array_agg_serialize */ + if (result->typbyval) + { + temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems); + memcpy(result->dvalues, temp, sizeof(Datum) * nelems); + } + else + { + DeserialIOData *iodata; + int i; + + /* Avoid repeat catalog lookups for typreceive function */ + iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra; + if (iodata == NULL) + { + Oid typreceive; + + iodata = (DeserialIOData *) + MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(DeserialIOData)); + getTypeBinaryInputInfo(element_type, &typreceive, + &iodata->typioparam); + fmgr_info_cxt(typreceive, &iodata->typreceive, + fcinfo->flinfo->fn_mcxt); + fcinfo->flinfo->fn_extra = (void *) iodata; + } + + for (i = 0; i < nelems; i++) + { + int itemlen; + StringInfoData elem_buf; + char csave; + + if (result->dnulls[i]) + { + result->dvalues[i] = (Datum) 0; + continue; + } + + itemlen = pq_getmsgint(&buf, 4); + if (itemlen < 0 || itemlen > (buf.len - buf.cursor)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION), + errmsg("insufficient data left in message"))); + + /* + * Rather than copying data around, we just set up a phony + * StringInfo pointing to the correct portion of the input buffer. + * We assume we can scribble on the input buffer so as to maintain + * the convention that StringInfos have a trailing null. + */ + elem_buf.data = &buf.data[buf.cursor]; + elem_buf.maxlen = itemlen + 1; + elem_buf.len = itemlen; + elem_buf.cursor = 0; + + buf.cursor += itemlen; + + csave = buf.data[buf.cursor]; + buf.data[buf.cursor] = '\0'; + + /* Now call the element's receiveproc */ + result->dvalues[i] = ReceiveFunctionCall(&iodata->typreceive, + &elem_buf, + iodata->typioparam, + -1); + + buf.data[buf.cursor] = csave; + } + } + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); + } + + Datum array_agg_finalfn(PG_FUNCTION_ARGS) { Datum result; *************** array_agg_array_transfn(PG_FUNCTION_ARGS *** 578,583 **** --- 912,1216 ---- } Datum + array_agg_array_combine(PG_FUNCTION_ARGS) + { + ArrayBuildStateArr *state1; + ArrayBuildStateArr *state2; + MemoryContext agg_context; + MemoryContext old_context; + + if (!AggCheckCallContext(fcinfo, &agg_context)) + elog(ERROR, "aggregate function called in non-aggregate context"); + + state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1); + + if (state2 == NULL) + { + /* + * NULL state2 is easy, just return state1, which we know is already + * in the agg_context + */ + if (state1 == NULL) + PG_RETURN_NULL(); + PG_RETURN_POINTER(state1); + } + + if (state1 == NULL) + { + /* We must copy state2's data into the agg_context */ + old_context = MemoryContextSwitchTo(agg_context); + + state1 = initArrayResultArr(state2->array_type, InvalidOid, + agg_context, false); + + state1->abytes = state2->abytes; + state1->data = (char *) palloc(state1->abytes); + + if (state2->nullbitmap) + { + int size = (state2->aitems + 7) / 8; + + state1->nullbitmap = (bits8 *) palloc(size); + memcpy(state1->nullbitmap, state2->nullbitmap, size); + } + + memcpy(state1->data, state2->data, state2->nbytes); + state1->nbytes = state2->nbytes; + state1->aitems = state2->aitems; + state1->nitems = state2->nitems; + state1->ndims = state2->ndims; + memcpy(state1->dims, state2->dims, sizeof(state2->dims)); + memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs)); + state1->array_type = state2->array_type; + state1->element_type = state2->element_type; + + MemoryContextSwitchTo(old_context); + + PG_RETURN_POINTER(state1); + } + + /* We only need to combine the two states if state2 has any items */ + else if (state2->nitems > 0) + { + MemoryContext oldContext; + int reqsize = state1->nbytes + state2->nbytes; + int i; + + /* + * Check the states are compatible with each other. Ensure we use the + * same error messages that are listed in accumArrayResultArr so that + * the same error is shown as would have been if we'd not used the + * combine function for the aggregation. + */ + if (state1->ndims != state2->ndims) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + + /* Check dimensions match ignoring the first dimension. */ + for (i = 1; i < state1->ndims; i++) + { + if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i]) + ereport(ERROR, + (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR), + errmsg("cannot accumulate arrays of different dimensionality"))); + } + + + oldContext = MemoryContextSwitchTo(state1->mcontext); + + /* + * If there's not enough space in state1 then we'll need to reallocate + * more. + */ + if (state1->abytes < reqsize) + { + /* use a power of 2 size rather than allocating just reqsize */ + while (state1->abytes < reqsize) + state1->abytes *= 2; + + state1->data = (char *) repalloc(state1->data, state1->abytes); + } + + if (state2->nullbitmap) + { + int newnitems = state1->nitems + state2->nitems; + + if (state1->nullbitmap == NULL) + { + /* + * First input with nulls; we must retrospectively handle any + * previous inputs by marking all their items non-null. + */ + state1->aitems = 256; + while (state1->aitems <= newnitems) + state1->aitems *= 2; + state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8); + array_bitmap_copy(state1->nullbitmap, 0, + NULL, 0, + state1->nitems); + } + else if (newnitems > state1->aitems) + { + int newaitems = state1->aitems + state2->aitems; + + while (state1->aitems < newaitems) + state1->aitems *= 2; + + state1->nullbitmap = (bits8 *) + repalloc(state1->nullbitmap, (state1->aitems + 7) / 8); + } + array_bitmap_copy(state1->nullbitmap, state1->nitems, + state2->nullbitmap, 0, + state2->nitems); + } + + memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes); + state1->nbytes += state2->nbytes; + state1->nitems += state2->nitems; + + state1->dims[0] += state2->dims[0]; + /* remaing dims already match, per test above */ + + Assert(state1->array_type == state2->array_type); + Assert(state1->element_type = state2->element_type); + + MemoryContextSwitchTo(oldContext); + } + + PG_RETURN_POINTER(state1); + } + + /* + * array_agg_array_serialize + * Serialize ArrayBuildStateArr into bytea. + */ + Datum + array_agg_array_serialize(PG_FUNCTION_ARGS) + { + ArrayBuildStateArr *state; + StringInfoData buf; + bytea *result; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0); + + pq_begintypsend(&buf); + + /* + * element_type. Putting this first is more convenient in deserialization + * so that we can init the new state sooner. + */ + pq_sendint32(&buf, state->element_type); + + /* array_type */ + pq_sendint32(&buf, state->array_type); + + /* nbytes */ + pq_sendint32(&buf, state->nbytes); + + /* data */ + pq_sendbytes(&buf, state->data, state->nbytes); + + /* abytes */ + pq_sendint32(&buf, state->abytes); + + /* aitems */ + pq_sendint32(&buf, state->aitems); + + /* nullbitmap */ + if (state->nullbitmap) + { + Assert(state->aitems > 0); + pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8); + } + + /* nitems */ + pq_sendint32(&buf, state->nitems); + + /* ndims */ + pq_sendint32(&buf, state->ndims); + + /* dims: XXX should we just send ndims elements? */ + pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims)); + + /* lbs */ + pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs)); + + result = pq_endtypsend(&buf); + + PG_RETURN_BYTEA_P(result); + } + + Datum + array_agg_array_deserialize(PG_FUNCTION_ARGS) + { + bytea *sstate; + ArrayBuildStateArr *result; + StringInfoData buf; + Oid element_type; + Oid array_type; + int nbytes; + const char *temp; + + /* cannot be called directly because of internal-type argument */ + Assert(AggCheckCallContext(fcinfo, NULL)); + + sstate = PG_GETARG_BYTEA_PP(0); + + /* + * Copy the bytea into a StringInfo so that we can "receive" it using the + * standard recv-function infrastructure. + */ + initStringInfo(&buf); + appendBinaryStringInfo(&buf, + VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); + + /* element_type */ + element_type = pq_getmsgint(&buf, 4); + + /* array_type */ + array_type = pq_getmsgint(&buf, 4); + + /* nbytes */ + nbytes = pq_getmsgint(&buf, 4); + + result = initArrayResultArr(array_type, element_type, + CurrentMemoryContext, false); + + result->abytes = 1024; + while (result->abytes < nbytes) + result->abytes *= 2; + + result->data = (char *) palloc(result->abytes); + + /* data */ + temp = pq_getmsgbytes(&buf, nbytes); + memcpy(result->data, temp, nbytes); + result->nbytes = nbytes; + + /* abytes */ + result->abytes = pq_getmsgint(&buf, 4); + + /* aitems: might be 0 */ + result->aitems = pq_getmsgint(&buf, 4); + + /* nullbitmap */ + if (result->aitems > 0) + { + int size = (result->aitems + 7) / 8; + + result->nullbitmap = (bits8 *) palloc(size); + temp = pq_getmsgbytes(&buf, size); + memcpy(result->nullbitmap, temp, size); + } + else + result->nullbitmap = NULL; + + /* nitems */ + result->nitems = pq_getmsgint(&buf, 4); + + /* ndims */ + result->ndims = pq_getmsgint(&buf, 4); + + /* dims */ + temp = pq_getmsgbytes(&buf, sizeof(result->dims)); + memcpy(result->dims, temp, sizeof(result->dims)); + + /* lbs */ + temp = pq_getmsgbytes(&buf, sizeof(result->lbs)); + memcpy(result->lbs, temp, sizeof(result->lbs)); + + pq_getmsgend(&buf); + pfree(buf.data); + + PG_RETURN_POINTER(result); + } + + Datum array_agg_array_finalfn(PG_FUNCTION_ARGS) { Datum result; diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c index 0cbdbe5..1210704 100644 *** a/src/backend/utils/adt/arrayfuncs.c --- b/src/backend/utils/adt/arrayfuncs.c *************** array_insert_slice(ArrayType *destArray, *** 4996,5007 **** * subcontext=false. * * In cases when the array build states have different lifetimes, using a ! * single memory context is impractical. Instead, pass subcontext=true so that ! * the array build states can be freed individually. */ ArrayBuildState * initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) { ArrayBuildState *astate; MemoryContext arr_context = rcontext; --- 4996,5025 ---- * subcontext=false. * * In cases when the array build states have different lifetimes, using a ! * single memory context is impractical. Instead, pass subcontext=true so ! * that the array build states can be freed individually. */ ArrayBuildState * initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext) { + /* + * When using a subcontext, we can afford to start with a somewhat larger + * initial array size. Without subcontexts, we'd better hope that most of + * the states stay small ... + */ + return initArrayResultWithSize(element_type, rcontext, subcontext, + subcontext ? 64 : 8); + } + + /* + * initArrayResultWithSize + * As initArrayResult, but allow the initial size of the allocated arrays + * to be specified. + */ + ArrayBuildState * + initArrayResultWithSize(Oid element_type, MemoryContext rcontext, + bool subcontext, int initsize) + { ArrayBuildState *astate; MemoryContext arr_context = rcontext; *************** initArrayResult(Oid element_type, Memory *** 5015,5021 **** MemoryContextAlloc(arr_context, sizeof(ArrayBuildState)); astate->mcontext = arr_context; astate->private_cxt = subcontext; ! astate->alen = (subcontext ? 64 : 8); /* arbitrary starting array size */ astate->dvalues = (Datum *) MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum)); astate->dnulls = (bool *) --- 5033,5039 ---- MemoryContextAlloc(arr_context, sizeof(ArrayBuildState)); astate->mcontext = arr_context; astate->private_cxt = subcontext; ! astate->alen = initsize; astate->dvalues = (Datum *) MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum)); astate->dnulls = (bool *) diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c index 4346410..fff89e7 100644 *** a/src/backend/utils/adt/varlena.c --- b/src/backend/utils/adt/varlena.c *************** bytea_string_agg_transfn(PG_FUNCTION_ARG *** 453,481 **** state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); ! /* Append the value unless null. */ if (!PG_ARGISNULL(1)) { bytea *value = PG_GETARG_BYTEA_PP(1); ! /* On the first time through, we ignore the delimiter. */ if (state == NULL) state = makeStringAggState(fcinfo); ! else if (!PG_ARGISNULL(2)) { bytea *delim = PG_GETARG_BYTEA_PP(2); ! appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim)); } ! appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value)); } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ ! PG_RETURN_POINTER(state); } Datum --- 453,502 ---- state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); ! /* Append the value unless null, preceding it with the delimiter. */ if (!PG_ARGISNULL(1)) { bytea *value = PG_GETARG_BYTEA_PP(1); + bool isfirst = false; ! /* ! * We store the delimiter for each aggregated item, even the first ! * one, though you might think that could be discarded immediately. ! * Otherwise, partial aggregation can't work, because the combine ! * function needs the delimiter for the first item in the second ! * aggregated state. (IOW, what we now think is the first item might ! * not be first overall.) The finalfn is responsible for stripping ! * off the first delimiter. So that it can tell how much to strip, we ! * store the length of the first delimiter in the StringInfo's cursor ! * field, which we don't otherwise need here. ! */ if (state == NULL) + { state = makeStringAggState(fcinfo); ! isfirst = true; ! } ! ! if (!PG_ARGISNULL(2)) { bytea *delim = PG_GETARG_BYTEA_PP(2); ! appendBinaryStringInfo(state, VARDATA_ANY(delim), ! VARSIZE_ANY_EXHDR(delim)); ! if (isfirst) ! state->cursor = VARSIZE_ANY_EXHDR(delim); } ! appendBinaryStringInfo(state, VARDATA_ANY(value), ! VARSIZE_ANY_EXHDR(value)); } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ ! if (state) ! PG_RETURN_POINTER(state); ! PG_RETURN_NULL(); } Datum *************** bytea_string_agg_finalfn(PG_FUNCTION_ARG *** 490,500 **** if (state != NULL) { bytea *result; ! result = (bytea *) palloc(state->len + VARHDRSZ); ! SET_VARSIZE(result, state->len + VARHDRSZ); ! memcpy(VARDATA(result), state->data, state->len); PG_RETURN_BYTEA_P(result); } else --- 511,523 ---- if (state != NULL) { + /* As per comment in transfn, strip data before the cursor position */ bytea *result; + int strippedlen = state->len - state->cursor; ! result = (bytea *) palloc(strippedlen + VARHDRSZ); ! SET_VARSIZE(result, strippedlen + VARHDRSZ); ! memcpy(VARDATA(result), &state->data[state->cursor], strippedlen); PG_RETURN_BYTEA_P(result); } else *************** string_agg_transfn(PG_FUNCTION_ARGS) *** 4653,4675 **** state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); ! /* Append the value unless null. */ if (!PG_ARGISNULL(1)) { ! /* On the first time through, we ignore the delimiter. */ if (state == NULL) state = makeStringAggState(fcinfo); ! else if (!PG_ARGISNULL(2)) ! appendStringInfoText(state, PG_GETARG_TEXT_PP(2)); /* delimiter */ ! appendStringInfoText(state, PG_GETARG_TEXT_PP(1)); /* value */ } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ ! PG_RETURN_POINTER(state); } Datum --- 4676,4846 ---- state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); ! /* Append the value unless null, preceding it with the delimiter. */ if (!PG_ARGISNULL(1)) { ! text *value = PG_GETARG_TEXT_PP(1); ! bool isfirst = false; ! ! /* ! * We store the delimiter for each aggregated item, even the first ! * one, though you might think that could be discarded immediately. ! * Otherwise, partial aggregation can't work, because the combine ! * function needs the delimiter for the first item in the second ! * aggregated state. (IOW, what we now think is the first item might ! * not be first overall.) The finalfn is responsible for stripping ! * off the first delimiter. So that it can tell how much to strip, we ! * store the length of the first delimiter in the StringInfo's cursor ! * field, which we don't otherwise need here. ! */ if (state == NULL) + { state = makeStringAggState(fcinfo); ! isfirst = true; ! } ! if (!PG_ARGISNULL(2)) ! { ! text *delim = PG_GETARG_TEXT_PP(2); ! ! appendStringInfoText(state, delim); ! if (isfirst) ! state->cursor = VARSIZE_ANY_EXHDR(delim); ! } ! ! appendStringInfoText(state, value); } /* * The transition type for string_agg() is declared to be "internal", * which is a pass-by-value type the same size as a pointer. */ ! if (state) ! PG_RETURN_POINTER(state); ! PG_RETURN_NULL(); ! } ! ! /* ! * string_agg_combine ! * Aggregate combine function for string_agg(text) and string_agg(bytea) ! */ ! Datum ! string_agg_combine(PG_FUNCTION_ARGS) ! { ! StringInfo state1; ! StringInfo state2; ! MemoryContext agg_context; ! ! if (!AggCheckCallContext(fcinfo, &agg_context)) ! elog(ERROR, "aggregate function called in non-aggregate context"); ! ! state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); ! state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1); ! ! if (state2 == NULL) ! { ! /* ! * NULL state2 is easy, just return state1, which we know is already ! * in the agg_context ! */ ! if (state1 == NULL) ! PG_RETURN_NULL(); ! PG_RETURN_POINTER(state1); ! } ! ! if (state1 == NULL) ! { ! /* We must copy state2's data into the agg_context */ ! MemoryContext old_context; ! ! old_context = MemoryContextSwitchTo(agg_context); ! state1 = makeStringAggState(fcinfo); ! appendBinaryStringInfo(state1, state2->data, state2->len); ! state1->cursor = state2->cursor; ! MemoryContextSwitchTo(old_context); ! } ! else if (state2->len > 0) ! { ! /* Combine ... state1->cursor does not change in this case */ ! appendBinaryStringInfo(state1, state2->data, state2->len); ! } ! ! PG_RETURN_POINTER(state1); ! } ! ! /* ! * string_agg_serialize ! * Aggregate serialize function for string_agg(text) and string_agg(bytea) ! * ! * This is strict, so we need not handle NULL input ! */ ! Datum ! string_agg_serialize(PG_FUNCTION_ARGS) ! { ! StringInfo state; ! StringInfoData buf; ! bytea *result; ! ! /* cannot be called directly because of internal-type argument */ ! Assert(AggCheckCallContext(fcinfo, NULL)); ! ! state = (StringInfo) PG_GETARG_POINTER(0); ! ! pq_begintypsend(&buf); ! ! /* cursor */ ! pq_sendint(&buf, state->cursor, 4); ! ! /* data */ ! pq_sendbytes(&buf, state->data, state->len); ! ! result = pq_endtypsend(&buf); ! ! PG_RETURN_BYTEA_P(result); ! } ! ! /* ! * string_agg_deserialize ! * Aggregate deserial function for string_agg(text) and string_agg(bytea) ! * ! * This is strict, so we need not handle NULL input ! */ ! Datum ! string_agg_deserialize(PG_FUNCTION_ARGS) ! { ! bytea *sstate; ! StringInfo result; ! StringInfoData buf; ! char *data; ! int datalen; ! ! /* cannot be called directly because of internal-type argument */ ! Assert(AggCheckCallContext(fcinfo, NULL)); ! ! sstate = PG_GETARG_BYTEA_PP(0); ! ! /* ! * Copy the bytea into a StringInfo so that we can "receive" it using the ! * standard recv-function infrastructure. ! */ ! initStringInfo(&buf); ! appendBinaryStringInfo(&buf, ! VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate)); ! ! result = makeStringAggState(fcinfo); ! ! /* cursor */ ! result->cursor = pq_getmsgint(&buf, 4); ! ! /* data */ ! datalen = VARSIZE_ANY_EXHDR(sstate) - 4; ! data = (char *) pq_getmsgbytes(&buf, datalen); ! appendBinaryStringInfo(result, data, datalen); ! ! pq_getmsgend(&buf); ! pfree(buf.data); ! ! PG_RETURN_POINTER(result); } Datum *************** string_agg_finalfn(PG_FUNCTION_ARGS) *** 4683,4689 **** state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); if (state != NULL) ! PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len)); else PG_RETURN_NULL(); } --- 4854,4864 ---- state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0); if (state != NULL) ! { ! /* As per comment in transfn, strip data before the cursor position */ ! PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor], ! state->len - state->cursor)); ! } else PG_RETURN_NULL(); } diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h index 125bb5b..b624f4e 100644 *** a/src/include/catalog/pg_aggregate.h --- b/src/include/catalog/pg_aggregate.h *************** DATA(insert ( 2243 n 0 bitor - bitor *** 300,313 **** DATA(insert ( 2901 n 0 xmlconcat2 - - - - - - - f f r r 0 142 0 0 0 _null_ _null_ )); /* array */ ! DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn - - - - - - t f r r 0 2281 0 0 0 _null_ _null_ )); /* text */ ! DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* bytea */ ! DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* json */ DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); --- 300,313 ---- DATA(insert ( 2901 n 0 xmlconcat2 - - - - - - - f f r r 0 142 0 0 0 _null_ _null_ )); /* array */ ! DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn array_agg_combine array_agg_serialize array_agg_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ )); ! DATA(insert ( 4053 n 0 array_agg_array_transfn array_agg_array_finalfn array_agg_array_combine array_agg_array_serialize array_agg_array_deserialize - - - t f r r 0 2281 0 0 0 _null_ _null_ )); /* text */ ! DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn string_agg_combine string_agg_serialize string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* bytea */ ! DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn string_agg_combine string_agg_serialize string_agg_deserialize - - - f f r r 0 2281 0 0 0 _null_ _null_ )); /* json */ DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - - - - f f r r 0 2281 0 0 0 _null_ _null_ )); diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index bfc9009..84cdc44 100644 *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** DATA(insert OID = 3168 ( array_replace *** 942,953 **** --- 942,965 ---- DESCR("replace any occurrences of an element in an array"); DATA(insert OID = 2333 ( array_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); + DATA(insert OID = 3423 ( array_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_combine _null_ _null_ _null_ )); + DESCR("aggregate combine function"); + DATA(insert OID = 3424 ( array_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_serialize _null_ _null_ _null_ )); + DESCR("aggregate serial function"); + DATA(insert OID = 3425 ( array_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_deserialize _null_ _null_ _null_ )); + DESCR("aggregate deserial function"); DATA(insert OID = 2334 ( array_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 2335 ( array_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2776" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); DESCR("concatenate aggregate input into an array"); DATA(insert OID = 4051 ( array_agg_array_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); + DATA(insert OID = 3426 ( array_agg_array_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_combine _null_ _null_ _null_ )); + DESCR("aggregate combine function"); + DATA(insert OID = 3427 ( array_agg_array_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_serialize _null_ _null_ _null_ )); + DESCR("aggregate serial function"); + DATA(insert OID = 3428 ( array_agg_array_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_deserialize _null_ _null_ _null_ )); + DESCR("aggregate deserial function"); DATA(insert OID = 4052 ( array_agg_array_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 4053 ( array_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2277" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); *************** DESCR("aggregate final function"); *** 2710,2715 **** --- 2722,2733 ---- DATA(insert OID = 3535 ( string_agg_transfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 3 0 2281 "2281 25 25" _null_ _null_ _null_ _null_ _null_ string_agg_transfn _null_ _null_ _null_ )); DESCR("aggregate transition function"); + DATA(insert OID = 3429 ( string_agg_combine PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ string_agg_combine _null_ _null_ _null_ )); + DESCR("aggregate combine function"); + DATA(insert OID = 3430 ( string_agg_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_serialize _null_ _null_ _null_ )); + DESCR("aggregate serial function"); + DATA(insert OID = 3431 ( string_agg_deserialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ string_agg_deserialize _null_ _null_ _null_ )); + DESCR("aggregate deserial function"); DATA(insert OID = 3536 ( string_agg_finalfn PGNSP PGUID 12 1 0 0 0 f f f f f i s 1 0 25 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_finalfn _null_ _null_ _null_ )); DESCR("aggregate final function"); DATA(insert OID = 3538 ( string_agg PGNSP PGUID 12 1 0 0 0 a f f f f i s 2 0 25 "25 25" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); diff --git a/src/include/utils/array.h b/src/include/utils/array.h index afbb532..f99e05a 100644 *** a/src/include/utils/array.h --- b/src/include/utils/array.h *************** extern bool array_contains_nulls(ArrayTy *** 393,398 **** --- 393,401 ---- extern ArrayBuildState *initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext); + extern ArrayBuildState *initArrayResultWithSize(Oid element_type, + MemoryContext rcontext, + bool subcontext, int initsize); extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate, Datum dvalue, bool disnull, Oid element_type,