On Sat, Oct 08, 2022 at 02:11:38PM +0900, Michael Paquier wrote: > Using an ereport(NOTICE) to show the data reported in the callback is > fine by me. How about making the module a bit more modular, by > passing as argument a regclass and building a list of arguments with > it? You may want to hold the ShareAccessLock on the relation until > the end of the transaction in this example.
Yeah, that makes more sense. It actually simplifies things a bit, too. -- Nathan Bossart Amazon Web Services: https://aws.amazon.com
>From 0b45607988e28b405c7795de0c7f82f51d4662e5 Mon Sep 17 00:00:00 2001 From: Nathan Bossart <nathandboss...@gmail.com> Date: Tue, 2 Aug 2022 16:15:01 -0700 Subject: [PATCH v4 1/1] Support COPY TO callback functions. --- src/backend/commands/copy.c | 2 +- src/backend/commands/copyto.c | 18 +++++-- src/include/commands/copy.h | 3 +- src/test/modules/Makefile | 1 + src/test/modules/meson.build | 1 + .../modules/test_copy_callbacks/.gitignore | 4 ++ src/test/modules/test_copy_callbacks/Makefile | 23 +++++++++ .../expected/test_copy_callbacks.out | 12 +++++ .../modules/test_copy_callbacks/meson.build | 34 ++++++++++++++ .../sql/test_copy_callbacks.sql | 4 ++ .../test_copy_callbacks--1.0.sql | 8 ++++ .../test_copy_callbacks/test_copy_callbacks.c | 47 +++++++++++++++++++ .../test_copy_callbacks.control | 4 ++ src/tools/pgindent/typedefs.list | 1 + 14 files changed, 157 insertions(+), 5 deletions(-) create mode 100644 src/test/modules/test_copy_callbacks/.gitignore create mode 100644 src/test/modules/test_copy_callbacks/Makefile create mode 100644 src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out create mode 100644 src/test/modules/test_copy_callbacks/meson.build create mode 100644 src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.c create mode 100644 src/test/modules/test_copy_callbacks/test_copy_callbacks.control diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 49924e476a..db4c9dbc23 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyTo(pstate, rel, query, relid, stmt->filename, stmt->is_program, - stmt->attlist, stmt->options); + NULL, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); } diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index fca29a9a10..a7b8ec030d 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -51,6 +51,7 @@ typedef enum CopyDest { COPY_FILE, /* to file (or a piped program) */ COPY_FRONTEND, /* to frontend */ + COPY_CALLBACK, /* to callback function */ } CopyDest; /* @@ -85,6 +86,7 @@ typedef struct CopyToStateData List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDOUT */ bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ CopyFormatOptions opts; Node *whereClause; /* WHERE condition (or NULL) */ @@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_CALLBACK: + cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); + break; } /* Update the progress */ @@ -344,11 +349,12 @@ BeginCopyTo(ParseState *pstate, Oid queryRelId, const char *filename, bool is_program, + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options) { CopyToState cstate; - bool pipe = (filename == NULL); + bool pipe = (filename == NULL && data_dest_cb == NULL); TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; @@ -656,7 +662,13 @@ BeginCopyTo(ParseState *pstate, cstate->copy_dest = COPY_FILE; /* default */ - if (pipe) + if (data_dest_cb) + { + progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + } + else if (pipe) { progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; @@ -769,7 +781,7 @@ EndCopyTo(CopyToState cstate) uint64 DoCopyTo(CopyToState cstate) { - bool pipe = (cstate->filename == NULL); + bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; int num_phys_attrs; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 3f6677b132..b77b935005 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState; typedef struct CopyToStateData *CopyToState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); +typedef void (*copy_data_dest_cb) (void *data, int len); extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void); */ extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query, Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options); extern void EndCopyTo(CopyToState cstate); extern uint64 DoCopyTo(CopyToState cstate); extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 6c31c8707c..7b3f292965 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ snapshot_too_old \ spgist_name_ops \ test_bloomfilter \ + test_copy_callbacks \ test_ddl_deparse \ test_extensions \ test_ginpostinglist \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index a80e6e2ce2..c2e5f5ffd5 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -9,6 +9,7 @@ subdir('snapshot_too_old') subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') +subdir('test_copy_callbacks') subdir('test_ddl_deparse') subdir('test_extensions') subdir('test_ginpostinglist') diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore new file mode 100644 index 0000000000..5dcb3ff972 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile new file mode 100644 index 0000000000..6b0a0efc37 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_callbacks/Makefile + +MODULE_big = test_copy_callbacks +OBJS = \ + $(WIN32RES) \ + test_copy_callbacks.o +PGFILEDESC = "test_copy_callbacks - example use of COPY callbacks" + +EXTENSION = test_copy_callbacks +DATA = test_copy_callbacks--1.0.sql + +REGRESS = test_copy_callbacks + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_callbacks +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out new file mode 100644 index 0000000000..3c4c504ef8 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out @@ -0,0 +1,12 @@ +CREATE EXTENSION test_copy_callbacks; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +SELECT test_copy_to_callback('public.test'::pg_catalog.regclass); +NOTICE: COPY TO callback called with data "1 2 3" and length 5 +NOTICE: COPY TO callback called with data "12 34 56" and length 8 +NOTICE: COPY TO callback called with data "123 456 789" and length 11 + test_copy_to_callback +----------------------- + +(1 row) + diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build new file mode 100644 index 0000000000..0f1ec47951 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/meson.build @@ -0,0 +1,34 @@ +# FIXME: prevent install during main install, but not during test :/ + +test_copy_callbacks_sources = files( + 'test_copy_callbacks.c', +) + +if host_system == 'windows' + test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_callbacks', + '--FILEDESC', 'test_copy_callbacks - example use of COPY callbacks',]) +endif + +test_copy_callbacks = shared_module('test_copy_callbacks', + test_copy_callbacks_sources, + kwargs: pg_mod_args, +) +testprep_targets += test_copy_callbacks + +install_data( + 'test_copy_callbacks.control', + 'test_copy_callbacks--1.0.sql', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'test_copy_callbacks', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_callbacks', + ], + }, +} diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql new file mode 100644 index 0000000000..2deffba635 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql @@ -0,0 +1,4 @@ +CREATE EXTENSION test_copy_callbacks; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +SELECT test_copy_to_callback('public.test'::pg_catalog.regclass); diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql new file mode 100644 index 0000000000..215cf3fad6 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit + +CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass) + RETURNS pg_catalog.void + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c new file mode 100644 index 0000000000..a18f20a5be --- /dev/null +++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c @@ -0,0 +1,47 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_callbacks.c + * Code for testing COPY callbacks. + * + * Copyright (c) 2022, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/test/modules/test_copy_callbacks/test_copy_callbacks.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "commands/copy.h" +#include "fmgr.h" +#include "nodes/makefuncs.h" +#include "utils/rel.h" + +PG_MODULE_MAGIC; + +static void +to_cb(void *data, int len) +{ + ereport(NOTICE, + (errmsg("COPY TO callback called with data \"%s\" and length %d", + (char *) data, len))); +} + +PG_FUNCTION_INFO_V1(test_copy_to_callback); +Datum +test_copy_to_callback(PG_FUNCTION_ARGS) +{ + Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock); + CopyToState cstate; + + cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL, + to_cb, NIL, NIL); + (void) DoCopyTo(cstate); + EndCopyTo(cstate); + + table_close(rel, AccessShareLock); + + PG_RETURN_VOID(); +} diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control new file mode 100644 index 0000000000..b7ce3f12ff --- /dev/null +++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control @@ -0,0 +1,4 @@ +comment = 'Test code for COPY callbacks' +default_version = '1.0' +module_pathname = '$libdir/test_copy_callbacks' +relocatable = true diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 97c9bc1861..d9b839c979 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3177,6 +3177,7 @@ compare_context config_var_value contain_aggs_of_level_context convert_testexpr_context +copy_data_dest_cb copy_data_source_cb core_YYSTYPE core_yy_extra_type -- 2.25.1