Hi, In <CAEG8a3K9dE2gt3+K+h=dwtqmenr84aeyuys+cty3sr3laed...@mail.gmail.com> "Re: Make COPY format extendable: Extract COPY TO format implementations" on Wed, 6 Dec 2023 15:11:34 +0800, Junwang Zhao <zhjw...@gmail.com> wrote:
> For the extra curly braces, I mean the following code block in > CopyToFormatBinaryStart: > > + { <-- I thought this is useless? > + /* Generate header for a binary copy */ > + int32 tmp; > + > + /* Signature */ > + CopySendData(cstate, BinarySignature, 11); > + /* Flags field */ > + tmp = 0; > + CopySendInt32(cstate, tmp); > + /* No header extension */ > + tmp = 0; > + CopySendInt32(cstate, tmp); > + } Oh, I see. I've removed and attach the v3 patch. In general, I don't change variable name and so on in this patch. I just move codes in this patch. But I also removed the "tmp" variable for this case because I think that the name isn't suitable for larger scope. (I think that "tmp" is acceptable in a small scope like the above code.) New code: /* Generate header for a binary copy */ /* Signature */ CopySendData(cstate, BinarySignature, 11); /* Flags field */ CopySendInt32(cstate, 0); /* No header extension */ CopySendInt32(cstate, 0); Thanks, -- kou
>From 9fe0087d9a6a79a7d1a7d0af63eb16abadbf0d4a Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <k...@clear-code.com> Date: Mon, 4 Dec 2023 12:32:54 +0900 Subject: [PATCH v3] Extract COPY TO format implementations This is a part of making COPY format extendable. See also these past discussions: * New Copy Formats - avro/orc/parquet: https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com * Make COPY extendable in order to support Parquet and other formats: https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com This doesn't change the current behavior. This just introduces CopyToFormatOps, which just has function pointers of format implementation like TupleTableSlotOps, and use it for existing "text", "csv" and "binary" format implementations. Note that CopyToFormatOps can't be used from extensions yet because CopySend*() aren't exported yet. Extensions can't send formatted data to a destination without CopySend*(). They will be exported by subsequent patches. Here is a benchmark result with/without this change because there was a discussion that we should care about performance regression: https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us > I think that step 1 ought to be to convert the existing formats into > plug-ins, and demonstrate that there's no significant loss of > performance. You can see that there is no significant loss of performance: Data: Random 32 bit integers: CREATE TABLE data (int32 integer); INSERT INTO data SELECT random() * 10000 FROM generate_series(1, ${n_records}); The number of records: 100K, 1M and 10M 100K without this change: format,elapsed time (ms) text,22.527 csv,23.822 binary,24.806 100K with this change: format,elapsed time (ms) text,22.919 csv,24.643 binary,24.705 1M without this change: format,elapsed time (ms) text,223.457 csv,233.583 binary,242.687 1M with this change: format,elapsed time (ms) text,224.591 csv,233.964 binary,247.164 10M without this change: format,elapsed time (ms) text,2330.383 csv,2411.394 binary,2590.817 10M with this change: format,elapsed time (ms) text,2231.307 csv,2408.067 binary,2473.617 --- src/backend/commands/copy.c | 8 + src/backend/commands/copyto.c | 377 ++++++++++++++++++++-------------- src/include/commands/copy.h | 27 ++- 3 files changed, 256 insertions(+), 156 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cfad47b562..27a1add456 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate, opts_out->file_encoding = -1; + /* Text is the default format. */ + opts_out->to_ops = CopyToFormatOpsText; /* Extract options from the statement node tree */ foreach(option, options) { @@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) + { opts_out->csv_mode = true; + opts_out->to_ops = CopyToFormatOpsCSV; + } else if (strcmp(fmt, "binary") == 0) + { opts_out->binary = true; + opts_out->to_ops = CopyToFormatOpsBinary; + } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index c66a047c4a..8f51090a03 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -131,6 +131,228 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToFormatOps implementations. + */ + +/* + * CopyToFormatOps implementation for "text" and "csv". CopyToFormatText*() + * refer cstate->opts.csv_mode and change their behavior. We can split this + * implementation and stop referring cstate->opts.csv_mode later. + */ + +static void +CopyToFormatTextSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopySendEndOfRow(cstate); +} + +static void +CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* + * For non-binary copy, we need to convert null_print to file + * encoding, because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false, + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToFormatTextSendEndOfRow(cstate); + } +} + +static void +CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + CopySendString(cstate, cstate->opts.null_print_client); + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], value); + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1], + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToFormatTextSendEndOfRow(cstate); +} + +static void +CopyToFormatTextEnd(CopyToState cstate) +{ +} + +/* + * CopyToFormatOps implementation for "binary". + */ + +static void +CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* Generate header for a binary copy */ + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + CopySendInt32(cstate, 0); + /* No header extension */ + CopySendInt32(cstate, 0); +} + +static void +CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + CopySendInt32(cstate, -1); + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +static void +CopyToFormatBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} + +const CopyToFormatOps CopyToFormatOpsText = { + .start = CopyToFormatTextStart, + .one_row = CopyToFormatTextOneRow, + .end = CopyToFormatTextEnd, +}; + +/* + * We can use the same CopyToFormatOps for both of "text" and "csv" because + * CopyToFormatText*() refer cstate->opts.csv_mode and change their + * behavior. We can split the implementations and stop referring + * cstate->opts.csv_mode later. + */ +const CopyToFormatOps CopyToFormatOpsCSV = CopyToFormatOpsText; + +const CopyToFormatOps CopyToFormatOpsBinary = { + .start = CopyToFormatBinaryStart, + .one_row = CopyToFormatBinaryOneRow, + .end = CopyToFormatBinaryEnd, +}; /* * Send copy start/stop messages for frontend copies. These have changed @@ -198,16 +420,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -242,10 +454,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -748,8 +956,6 @@ DoCopyTo(CopyToState cstate) bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; - int num_phys_attrs; - ListCell *cur; uint64 processed; if (fe_copy) @@ -759,32 +965,11 @@ DoCopyTo(CopyToState cstate) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; - num_phys_attrs = tupDesc->natts; cstate->opts.null_print_client = cstate->opts.null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); - /* Get info about the columns we need to process. */ - cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (cstate->opts.binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside @@ -795,57 +980,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false, - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->opts.to_ops.start(cstate, tupDesc); if (cstate->rel) { @@ -884,13 +1019,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } + cstate->opts.to_ops.end(cstate); MemoryContextDelete(cstate->rowcontext); @@ -906,71 +1035,15 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - bool need_delim = false; - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; - ListCell *cur; - char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - - if (!cstate->opts.binary) - { - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->opts.binary) - CopySendString(cstate, cstate->opts.null_print_client); - else - CopySendInt32(cstate, -1); - } - else - { - if (!cstate->opts.binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; - - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->opts.to_ops.one_row(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2cca0b90b..6b5231b2f3 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -30,6 +30,28 @@ typedef enum CopyHeaderChoice COPY_HEADER_MATCH, } CopyHeaderChoice; +/* These are private in commands/copy[from|to].c */ +typedef struct CopyFromStateData *CopyFromState; +typedef struct CopyToStateData *CopyToState; + +/* Routines for a COPY TO format implementation. */ +typedef struct CopyToFormatOps +{ + /* Called when COPY TO is started. This will send a header. */ + void (*start) (CopyToState cstate, TupleDesc tupDesc); + + /* Copy one row for COPY TO. */ + void (*one_row) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO is ended. This will send a trailer. */ + void (*end) (CopyToState cstate); +} CopyToFormatOps; + +/* Predefined CopyToFormatOps for "text", "csv" and "binary". */ +extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsText; +extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsCSV; +extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsBinary; + /* * A struct to hold COPY options, in a parsed form. All of these are related * to formatting, except for 'freeze', which doesn't really belong here, but @@ -63,12 +85,9 @@ typedef struct CopyFormatOptions bool *force_null_flags; /* per-column CSV FN flags */ bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ + CopyToFormatOps to_ops; /* how to format to */ } CopyFormatOptions; -/* These are private in commands/copy[from|to].c */ -typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; - typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); typedef void (*copy_data_dest_cb) (void *data, int len); -- 2.40.1