On 2022-Jan-10, Alvaro Herrera wrote: > Hmm. So you're saying that we should only raise errors about the column > list if we are publishing UPDATE or DELETE, but otherwise let the > replica identity be anything. OK, I'll see if I can come up with a > reasonable set of rules ...
This is an attempt to do it that way. Now you can add a table to a publication without regards for how column filter compares to the replica identity, as long as the publication does not include updates and inserts. -- Álvaro Herrera Valdivia, Chile — https://www.EnterpriseDB.com/ "La fuerza no está en los medios físicos sino que reside en una voluntad indomable" (Gandhi)
>From e3350af9023b5181b70ce480d039b3df46e9c019 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Tue, 11 Jan 2022 15:46:07 -0300 Subject: [PATCH v17] Support column lists for logical replication of tables Add the capability of specifying a column list for individual tables as part of a publication. Columns not in the list are not published. This enables replicating to a table with only a subset of the columns. If no column list is specified, all the columns are replicated, as previously Author: Rahila Syed <rahilasye...@gmail.com> Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektnrh8nj1jf...@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 13 + doc/src/sgml/protocol.sgml | 4 +- doc/src/sgml/ref/alter_publication.sgml | 20 +- doc/src/sgml/ref/create_publication.sgml | 11 +- src/backend/catalog/pg_publication.c | 353 +++++++++++++++++++- src/backend/commands/publicationcmds.c | 67 +++- src/backend/commands/tablecmds.c | 87 ++++- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/equalfuncs.c | 1 + src/backend/parser/gram.y | 60 +++- src/backend/replication/logical/proto.c | 66 ++-- src/backend/replication/logical/tablesync.c | 119 ++++++- src/backend/replication/pgoutput/pgoutput.c | 118 ++++++- src/bin/pg_dump/pg_dump.c | 41 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 26 +- src/bin/psql/tab-complete.c | 2 + src/include/catalog/pg_publication.h | 6 + src/include/catalog/pg_publication_rel.h | 3 + src/include/nodes/parsenodes.h | 4 +- src/include/replication/logicalproto.h | 6 +- src/test/regress/expected/publication.out | 59 +++- src/test/regress/sql/publication.sql | 39 ++- src/test/subscription/t/028_column_list.pl | 164 +++++++++ 24 files changed, 1184 insertions(+), 87 deletions(-) create mode 100644 src/test/subscription/t/028_column_list.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 03e2537b07..b7b75f64a2 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6311,6 +6311,19 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l Reference to relation </para></entry> </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>prattrs</structfield> <type>int2vector</type> + (references <link linkend="catalog-pg-attribute"><structname>pg_attribute</structname></link>.<structfield>attnum</structfield>) + </para> + <para> + This is an array of values that indicates which table columns are + part of the publication. For example a value of <literal>1 3</literal> + would mean that the first and the third table columns are published. + A null value indicates that all attributes are published. + </para></entry> + </row> </tbody> </tgroup> </table> diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 34a7034282..5bc2e7a591 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -6877,7 +6877,9 @@ Relation </listitem> </varlistentry> </variablelist> - Next, the following message part appears for each column (except generated columns): + Next, the following message part appears for each column (except + generated columns and other columns that don't appear in the column + filter list, for tables that have one): <variablelist> <varlistentry> <term> diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index bb4ef5e5e2..d0e97243b8 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -25,12 +25,13 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD <replace ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET <replaceable class="parameter">publication_object</replaceable> [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP <replaceable class="parameter">publication_object</replaceable> [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ALTER TABLE <replaceable class="parameter">publication_object</replaceable> SET COLUMNS { ( <replaceable class="parameter">name</replaceable> [, ...] ) | ALL } ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase> - TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ] </synopsis> </refsynopsisdiv> @@ -62,6 +63,11 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r command retain their previous settings. </para> + <para> + The <literal>ALTER TABLE ... SET COLUMNS</literal> variant allows changing + the set of columns that are included in the publication. + </para> + <para> The remaining variants change the owner and the name of the publication. </para> @@ -110,6 +116,8 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r specified, the table and all its descendant tables (if any) are affected. Optionally, <literal>*</literal> can be specified after the table name to explicitly indicate that descendant tables are included. + Optionally, a column list can be specified. See <xref + linkend="sql-createpublication"/> for details. </para> </listitem> </varlistentry> @@ -164,9 +172,15 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete'); </para> <para> - Add some tables to the publication: + Add tables to the publication: <programlisting> -ALTER PUBLICATION mypublication ADD TABLE users, departments; +ALTER PUBLICATION mypublication ADD TABLE users (user_id, firstname), departments; +</programlisting></para> + + <para> + Change the set of columns published for a table: +<programlisting> +ALTER PUBLICATION mypublication ALTER TABLE users SET COLUMNS (user_id, firstname, lastname); </programlisting></para> <para> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index d805e8e77a..73a23cbb02 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase> - TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ ( <replaceable class="parameter">column_name</replaceable>, [, ... ] ) ] [, ... ] ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ] </synopsis> </refsynopsisdiv> @@ -78,6 +78,15 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> publication, so they are never explicitly added to the publication. </para> + <para> + When a column list is specified, only the listed columns are replicated; + any other columns are ignored for the purpose of replication through + this publication. If no column list is specified, all columns of the + table are replicated through this publication, including any columns + added later. If a column list is specified, it must include the replica + identity columns. + </para> + <para> Only persistent base tables and partitioned tables can be part of a publication. Temporary tables, unlogged tables, foreign tables, diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 2992a2e0c6..ea12b52b83 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -45,13 +45,26 @@ #include "utils/rel.h" #include "utils/syscache.h" + +static void check_publication_columns(Publication *pub, Relation targetrel, + Bitmapset *columns); +static void publication_translate_columns(Relation targetrel, List *columns, + int *natts, AttrNumber **attrs, + Bitmapset **attset); + /* - * Check if relation can be in given publication and throws appropriate - * error if not. + * Check if relation can be in given publication and that the column + * filter is sensible, and throws appropriate error if not. + * + * targetcols is the bitmapset of attribute numbers given in the column list, + * or NULL if it was not specified. */ static void -check_publication_add_relation(Relation targetrel) +check_publication_add_relation(Publication *pub, Relation targetrel, + Bitmapset *columns) { + bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) @@ -82,6 +95,73 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); + + /* Make sure the column list checks out */ + if (columns != NULL) + { + /* + * Even if the user listed all columns in the column list, we cannot + * allow a column list to be specified when REPLICA IDENTITY is FULL; + * that would cause problems if a new column is added later, because + * the new column would have to be included (because of being part of + * the replica identity) but it's technically not allowed (because of + * not being in the publication's column list yet). So reject this + * case altogether. + */ + if (replidentfull) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("invalid column list for publishing relation \"%s\"", + RelationGetRelationName(targetrel)), + errdetail("Cannot specify a column list on relations with REPLICA IDENTITY FULL.")); + + check_publication_columns(pub, targetrel, columns); + } +} + +/* + * Enforce that the column list can only leave out columns that aren't + * forced to be sent. + * + * No column can be excluded if REPLICA IDENTITY is FULL (since all the + * columns need to be sent regardless); and in other cases, the columns in + * the REPLICA IDENTITY cannot be left out. + */ +static void +check_publication_columns(Publication *pub, Relation targetrel, Bitmapset *columns) +{ + if (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot change column set for relation \"%s\"", + RelationGetRelationName(targetrel)), + errdetail("Cannot specify column list on relations with REPLICA IDENTITY FULL.")); + + if (pub->pubactions.pubupdate || pub->pubactions.pubdelete) + { + Bitmapset *idattrs; + int x; + + idattrs = RelationGetIndexAttrBitmap(targetrel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + + /* + * We have to test membership the hard way, because the values returned by + * RelationGetIndexAttrBitmap are offset. + */ + x = -1; + while ((x = bms_next_member(idattrs, x)) >= 0) + { + if (!bms_is_member(x + FirstLowInvalidHeapAttributeNumber, columns)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("invalid column list for publishing relation \"%s\"", + RelationGetRelationName(targetrel)), + errdetail("All columns in REPLICA IDENTITY must be present in the column list.")); + } + + bms_free(idattrs); + } } /* @@ -289,6 +369,9 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, Oid relid = RelationGetRelid(targetrel->relation); Oid pubreloid; Publication *pub = GetPublication(pubid); + Bitmapset *attset = NULL; + AttrNumber *attarray; + int natts = 0; ObjectAddress myself, referenced; List *relids = NIL; @@ -314,7 +397,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel->relation); + /* Translate column names to numbers and verify suitability */ + publication_translate_columns(targetrel->relation, + targetrel->columns, + &natts, &attarray, &attset); + + check_publication_add_relation(pub, targetrel->relation, attset); + + bms_free(attset); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -327,6 +417,15 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + if (targetrel->columns) + { + int2vector *prattrs; + + prattrs = buildint2vector(attarray, natts); + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs); + } + else + nulls[Anum_pg_publication_rel_prattrs - 1] = true; tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -337,6 +436,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, /* Register dependencies as needed */ ObjectAddressSet(myself, PublicationRelRelationId, pubreloid); + /* Add dependency on the columns, if any are listed */ + for (int i = 0; i < natts; i++) + { + ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + } + pfree(attarray); /* Add dependency on the publication */ ObjectAddressSet(referenced, PublicationRelationId, pubid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); @@ -364,6 +470,155 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, return myself; } +/* + * Update the column list for a relation in a publication. + */ +void +publication_set_table_columns(Relation pubrel, HeapTuple pubreltup, + Relation targetrel, List *columns) +{ + Bitmapset *attset; + AttrNumber *attarray; + HeapTuple copytup; + int natts; + bool nulls[Natts_pg_publication_rel]; + bool replaces[Natts_pg_publication_rel]; + Datum values[Natts_pg_publication_rel]; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_publication_rel_prattrs - 1] = true; + + deleteDependencyRecordsForClass(PublicationRelationId, + ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid, + RelationRelationId, + DEPENDENCY_AUTO); + + if (columns == NULL) + { + nulls[Anum_pg_publication_rel_prattrs - 1] = true; + } + else + { + ObjectAddress myself, + referenced; + int2vector *prattrs; + Publication *pub; + + pub = GetPublication(((Form_pg_publication_rel) GETSTRUCT(pubreltup))->prpubid); + + publication_translate_columns(targetrel, columns, + &natts, &attarray, &attset); + + /* + * Make sure the column list checks out. XXX this should occur at + * caller in publicationcmds.c, not here. + */ + check_publication_columns(pub, targetrel, attset); + bms_free(attset); + /* XXX "pub" is leaked here */ + + prattrs = buildint2vector(attarray, natts); + values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(prattrs); + + /* Add dependencies on the new list of columns */ + ObjectAddressSet(myself, PublicationRelRelationId, + ((Form_pg_publication_rel) GETSTRUCT(pubreltup))->oid); + for (int i = 0; i < natts; i++) + { + ObjectAddressSubSet(referenced, RelationRelationId, + RelationGetRelid(targetrel), attarray[i]); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + } + } + + copytup = heap_modify_tuple(pubreltup, RelationGetDescr(pubrel), + values, nulls, replaces); + + CatalogTupleUpdate(pubrel, &pubreltup->t_self, copytup); + + heap_freetuple(copytup); +} + +/* qsort comparator for attnums */ +static int +compare_int16(const void *a, const void *b) +{ + int av = *(const int16 *) a; + int bv = *(const int16 *) b; + + /* this can't overflow if int is wider than int16 */ + return (av - bv); +} + +/* + * Translate a list of column names to an array of attribute numbers + * and a Bitmapset with them; verify that each attribute is appropriate + * to have in a publication column list. Other checks are done later; + * see check_publication_columns. + * + * Note that the attribute numbers are *not* offset by + * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this + * is okay. + */ +static void +publication_translate_columns(Relation targetrel, List *columns, int *natts, + AttrNumber **attrs, Bitmapset **attset) +{ + AttrNumber *attarray; + Bitmapset *set = NULL; + ListCell *lc; + int n = 0; + TupleDesc tupdesc = RelationGetDescr(targetrel); + + /* + * Translate list of columns to attnums. We prohibit system attributes and + * make sure there are no duplicate columns. + */ + attarray = palloc(sizeof(AttrNumber) * list_length(columns)); + foreach(lc, columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname); + + if (attnum == InvalidAttrNumber) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colname, RelationGetRelationName(targetrel))); + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference system column \"%s\" in publication column list", + colname)); + + if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot reference generated column \"%s\" in publication column list", + colname)); + + if (bms_is_member(attnum, set)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("duplicate column \"%s\" in publication column list", + colname)); + + set = bms_add_member(set, attnum); + attarray[n++] = attnum; + } + + /* Be tidy, so that the catalog representation is always sorted */ + qsort(attarray, n, sizeof(AttrNumber), compare_int16); + + *natts = n; + *attrs = attarray; + *attset = set; +} + /* * Insert new publication / schema mapping. */ @@ -471,6 +726,96 @@ GetRelationPublications(Oid relid) return result; } +/* + * Gets a list of OIDs of all partial-column publications of the given + * relation, that is, those that specify a column list. + */ +List * +GetRelationColumnPartialPublications(Oid relid) +{ + CatCList *pubrellist; + List *pubs = NIL; + + pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid)); + for (int i = 0; i < pubrellist->n_members; i++) + { + HeapTuple tup = &pubrellist->members[i]->tuple; + bool isnull; + + (void) SysCacheGetAttr(PUBLICATIONRELMAP, tup, + Anum_pg_publication_rel_prattrs, + &isnull); + if (isnull) + continue; + + pubs = lappend_oid(pubs, + ((Form_pg_publication_rel) GETSTRUCT(tup))->prpubid); + } + + ReleaseSysCacheList(pubrellist); + + return pubs; +} + +/* FIXME maybe these two routines should be in lsyscache.c */ +/* Return the set of actions that the given publication includes */ +void +GetActionsInPublication(Oid pubid, PublicationActions *actions) +{ + HeapTuple pub; + Form_pg_publication pubForm; + + pub = SearchSysCache1(PUBLICATIONOID, + ObjectIdGetDatum(pubid)); + if (!HeapTupleIsValid(pub)) + elog(ERROR, "cache lookup failed for publication %u", pubid); + + pubForm = (Form_pg_publication) GETSTRUCT(pub); + actions->pubinsert = pubForm->pubinsert; + actions->pubupdate = pubForm->pubupdate; + actions->pubdelete = pubForm->pubdelete; + actions->pubtruncate = pubForm->pubtruncate; + + ReleaseSysCache(pub); +} + +/* + * For a relation in a publication that is known to have a non-null column + * list, return the list of attribute numbers that are in it. + */ +List * +GetRelationColumnListInPublication(Oid relid, Oid pubid) +{ + HeapTuple tup; + Datum adatum; + bool isnull; + ArrayType *arr; + int nelems; + int16 *elems; + List *attnos = NIL; + + tup = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pubid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for rel %u of publication %u", relid, pubid); + adatum = SysCacheGetAttr(PUBLICATIONRELMAP, tup, + Anum_pg_publication_rel_prattrs, &isnull); + if (isnull) + elog(ERROR, "found unexpected null in pg_publication_rel.prattrs"); + arr = DatumGetArrayTypeP(adatum); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + for (int i = 0; i < nelems; i++) + attnos = lappend_oid(attnos, elems[i]); + + ReleaseSysCache(tup); + + return attnos; +} + /* * Gets list of relation oids for a publication. * diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 3ab1bdeae1..aa41940d1b 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -376,6 +376,46 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) return myself; } +/* + * Change the column list of a relation in a publication + */ +static void +PublicationSetColumns(AlterPublicationStmt *stmt, + Form_pg_publication pubform, PublicationTable *table) +{ + Relation rel, + urel; + HeapTuple tup; + ObjectAddress obj, + secondary; + + rel = table_open(PublicationRelRelationId, RowExclusiveLock); + urel = table_openrv(table->relation, ShareUpdateExclusiveLock); + + tup = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(RelationGetRelid(urel)), + ObjectIdGetDatum(pubform->oid)); + if (!HeapTupleIsValid(tup)) + ereport(ERROR, + errmsg("relation \"%s\" is not already in publication \"%s\"", + table->relation->relname, + NameStr(pubform->pubname))); + + publication_set_table_columns(rel, tup, urel, table->columns); + + ObjectAddressSet(obj, PublicationRelationId, + ((Form_pg_publication_rel) GETSTRUCT(tup))->oid); + ObjectAddressSet(secondary, RelationRelationId, RelationGetRelid(urel)); + EventTriggerCollectSimpleCommand(obj, secondary, (Node *) stmt); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); + table_close(urel, NoLock); + + InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0); +} + /* * Change options of a publication. */ @@ -523,6 +563,14 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, } else if (stmt->action == AP_DropObjects) PublicationDropTables(pubid, rels, false); + else if (stmt->action == AP_SetColumns) + { + Assert(schemaidlist == NIL); + Assert(list_length(tables) == 1); + + PublicationSetColumns(stmt, pubform, + linitial_node(PublicationTable, tables)); + } else /* AP_SetObjects */ { List *oldrelids = GetPublicationRelations(pubid, @@ -562,7 +610,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, pubrel = palloc(sizeof(PublicationRelInfo)); pubrel->relation = oldrel; - + /* This is not needed to delete a table */ + pubrel->columns = NIL; delrels = lappend(delrels, pubrel); } } @@ -622,7 +671,7 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, } else if (stmt->action == AP_DropObjects) PublicationDropSchemas(pubform->oid, schemaidlist, false); - else /* AP_SetObjects */ + else if (stmt->action == AP_SetObjects) { List *oldschemaids = GetPublicationSchemas(pubform->oid); List *delschemas = NIL; @@ -645,6 +694,10 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, */ PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt); } + else + { + /* Nothing to do for AP_SetColumns */ + } } /* @@ -934,6 +987,8 @@ OpenTableList(List *tables) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -967,8 +1022,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); + pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = t->columns; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); } @@ -1076,6 +1134,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) Relation rel = pubrel->relation; Oid relid = RelationGetRelid(rel); + if (pubrel->columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 1f0654c2f5..893223b437 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8364,6 +8364,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, bool missing_ok, LOCKMODE lockmode, ObjectAddresses *addrs) { + Oid relid = RelationGetRelid(rel); HeapTuple tuple; Form_pg_attribute targetatt; AttrNumber attnum; @@ -8383,7 +8384,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, /* * get the number of the attribute */ - tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); + tuple = SearchSysCacheAttName(relid, colName); if (!HeapTupleIsValid(tuple)) { if (!missing_ok) @@ -8437,13 +8438,42 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, ReleaseSysCache(tuple); + /* + * Also, if the column is used in the column list of a publication, + * disallow the drop if the DROP is RESTRICT. We don't do anything if the + * DROP is CASCADE, which means that the dependency mechanism will remove + * the relation from the publication. + */ + if (behavior == DROP_RESTRICT) + { + List *pubs; + ListCell *lc; + + pubs = GetRelationColumnPartialPublications(relid); + foreach(lc, pubs) + { + Oid pubid = lfirst_oid(lc); + List *published_cols; + + published_cols = + GetRelationColumnListInPublication(relid, pubid); + + if (list_member_oid(published_cols, attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop column \"%s\" because it is part of publication \"%s\"", + colName, get_publication_name(pubid, false)), + errhint("Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication.")); + } + } + /* * Propagate to children as appropriate. Unlike most other ALTER * routines, we have to do this one level of recursion at a time; we can't * use find_all_inheritors to do it in one pass. */ children = - find_inheritance_children(RelationGetRelid(rel), lockmode); + find_inheritance_children(relid, lockmode); if (children) { @@ -8531,7 +8561,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, /* Add object to delete */ object.classId = RelationRelationId; - object.objectId = RelationGetRelid(rel); + object.objectId = relid; object.objectSubId = attnum; add_exact_object_address(&object, addrs); @@ -15841,6 +15871,7 @@ relation_mark_replica_identity(Relation rel, char ri_type, Oid indexOid, CatalogTupleUpdate(pg_index, &pg_index_tuple->t_self, pg_index_tuple); InvokeObjectPostAlterHookArg(IndexRelationId, thisIndexOid, 0, InvalidOid, is_internal); + /* * Invalidate the relcache for the table, so that after we commit * all sessions will refresh the table's replica identity index @@ -15863,6 +15894,11 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode Oid indexOid; Relation indexRel; int key; + List *pubs; + Bitmapset *indexed_cols = NULL; + ListCell *lc; + + pubs = GetRelationColumnPartialPublications(RelationGetRelid(rel)); if (stmt->identity_type == REPLICA_IDENTITY_DEFAULT) { @@ -15871,11 +15907,16 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode } else if (stmt->identity_type == REPLICA_IDENTITY_FULL) { + if (pubs != NIL) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set REPLICA IDENTITY FULL when publications contain relations that specify column lists")); relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true); return; } else if (stmt->identity_type == REPLICA_IDENTITY_NOTHING) { + /* XXX not sure what's the right check for publications here */ relation_mark_replica_identity(rel, stmt->identity_type, InvalidOid, true); return; } @@ -15960,6 +16001,46 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode errmsg("index \"%s\" cannot be used as replica identity because column \"%s\" is nullable", RelationGetRelationName(indexRel), NameStr(attr->attname)))); + + /* + * Collect columns used, in case we have any publications that we need + * to vet. System attributes are disallowed so no need to subtract + * FirstLowInvalidHeapAttributeNumber. + */ + indexed_cols = bms_add_member(indexed_cols, attno); + } + + /* + * Check partial-column publications. For those that include UPDATE and + * DELETE, we must enforce that the columns in the replica identity are + * included in the column list. For publications that only include INSERT + * and TRUNCATE, we don't need to restrict the replica identity. + */ + foreach(lc, pubs) + { + Oid pubid = lfirst_oid(lc); + List *published_cols; + PublicationActions actions; + + GetActionsInPublication(pubid, &actions); + /* No need to worry about this one */ + if (!actions.pubupdate && !actions.pubdelete) + continue; + + published_cols = + GetRelationColumnListInPublication(RelationGetRelid(rel), pubid); + + for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++) + { + int16 attno = indexRel->rd_index->indkey.values[key]; + + if (!list_member_oid(published_cols, attno)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("index \"%s\" cannot be used because publication \"%s\" does not include all indexed columns", + RelationGetRelationName(indexRel), + get_publication_name(pubid, false))); + } } /* This index is suitable for use as a replica identity. Mark it. */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 456d563f34..aa333fcdf5 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4834,6 +4834,7 @@ _copyPublicationTable(const PublicationTable *from) PublicationTable *newnode = makeNode(PublicationTable); COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(columns); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 53beef1488..3119f7836c 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2312,6 +2312,7 @@ static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(columns); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 879018377b..c1f3d6a8c8 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9740,12 +9740,13 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr + TABLE relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; + $$->pubtable->columns = $3; } | ALL TABLES IN_P SCHEMA ColId { @@ -9760,28 +9761,38 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA; $$->location = @5; } - | ColId + | ColId opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - $$->name = $1; + if ($2 != NULL) + { + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->columns = $2; + $$->name = NULL; + } + else + $$->name = $1; $$->location = @1; } - | ColId indirection + | ColId indirection opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->columns = $3; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr + | extended_relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; + $$->pubtable->columns = $2; } | CURRENT_SCHEMA { @@ -9807,6 +9818,9 @@ pub_obj_list: PublicationObjSpec * * ALTER PUBLICATION name SET pub_obj [, ...] * + * ALTER PUBLICATION name SET COLUMNS table_name (column[, ...]) + * ALTER PUBLICATION name SET COLUMNS table_name ALL + * * pub_obj is one of: * * TABLE table_name [, ...] @@ -9840,6 +9854,32 @@ AlterPublicationStmt: n->action = AP_SetObjects; $$ = (Node *)n; } + | ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS '(' columnList ')' + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + PublicationObjSpec *obj = makeNode(PublicationObjSpec); + obj->pubobjtype = PUBLICATIONOBJ_TABLE; + obj->pubtable = makeNode(PublicationTable); + obj->pubtable->relation = $6; + obj->pubtable->columns = $10; + n->pubname = $3; + n->pubobjects = list_make1(obj); + n->action = AP_SetColumns; + $$ = (Node *) n; + } + | ALTER PUBLICATION name ALTER TABLE relation_expr SET COLUMNS ALL + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + PublicationObjSpec *obj = makeNode(PublicationObjSpec); + obj->pubobjtype = PUBLICATIONOBJ_TABLE; + obj->pubtable = makeNode(PublicationTable); + obj->pubtable->relation = $6; + obj->pubtable->columns = NIL; + n->pubname = $3; + n->pubobjects = list_make1(obj); + n->action = AP_SetColumns; + $$ = (Node *) n; + } | ALTER PUBLICATION name DROP pub_obj_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); @@ -17444,6 +17484,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_SCHEMA || pubobj->pubobjtype == PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA) { + /* + * This can happen if a column list is specified in a continuation + * for a schema entry; reject it. + */ + if (pubobj->pubtable) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column specification not allowed for schemas"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 953942692c..e6da46d83e 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,9 +29,11 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, + Bitmapset *columns); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); + HeapTuple tuple, bool binary, + Bitmapset *columns); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +400,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + HeapTuple newtuple, bool binary, Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +412,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, columns); } /* @@ -442,7 +444,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, bool binary, + Bitmapset *columns) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +466,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, columns); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, columns); } /* @@ -536,7 +539,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, NULL); } /* @@ -651,7 +654,8 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, + Bitmapset *columns) { char *relname; @@ -673,7 +677,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, columns); } /* @@ -749,7 +753,8 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, + bool binary, Bitmapset *columns) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; @@ -761,7 +766,13 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar for (i = 0; i < desc->natts; i++) { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + + /* Don't count attributes that are not to be sent. */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) continue; nliveatts++; } @@ -783,6 +794,10 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar if (att->attisdropped || att->attgenerated) continue; + /* Ignore attributes that are not to be sent. */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + if (isnull[i]) { pq_sendbyte(out, LOGICALREP_COLUMN_NULL); @@ -904,7 +919,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) { TupleDesc desc; int i; @@ -914,20 +929,24 @@ logicalrep_write_attrs(StringInfo out, Relation rel) desc = RelationGetDescr(rel); - /* send number of live attributes */ - for (i = 0; i < desc->natts; i++) - { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) - continue; - nliveatts++; - } - pq_sendint16(out, nliveatts); - /* fetch bitmap of REPLICATION IDENTITY attributes */ replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); if (!replidentfull) idattrs = RelationGetIdentityKeyBitmap(rel); + /* send number of live attributes */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + nliveatts++; + } + pq_sendint16(out, nliveatts); + /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -936,7 +955,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; - + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e596b69d46..a7befd712a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -112,6 +112,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "utils/acl.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -699,17 +700,20 @@ fetch_remote_table_info(char *nspname, char *relname, WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; - Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; + Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID}; bool isnull; int natt; + ListCell *lc; + bool am_partition; + Bitmapset *included_cols = NULL; lrel->nspname = nspname; lrel->relname = relname; /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -739,14 +743,19 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + am_partition = DatumGetBool(slot_getattr(slot, 4, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - /* Now fetch columns. */ + /* + * Now fetch column names and types. + */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT a.attname," + "SELECT a.attnum," + " a.attname," " a.atttypid," " a.attnum = ANY(i.indkey)" " FROM pg_catalog.pg_attribute a" @@ -774,16 +783,91 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + /* + * In server versions 15 and higher, obtain the publication column list, + * if any. + */ + if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000) + { + WalRcvExecResult *pubres; + TupleTableSlot *slot; + Oid attrsRow[] = {INT2OID}; + StringInfoData publications; + bool first = true; + + initStringInfo(&publications); + foreach(lc, MySubscription->publications) + { + if (!first) + appendStringInfo(&publications, ", "); + appendStringInfoString(&publications, quote_literal_cstr(strVal(lfirst(lc)))); + first = false; + } + + resetStringInfo(&cmd); + appendStringInfo(&cmd, + " SELECT pg_catalog.unnest(prattrs)\n" + " FROM pg_catalog.pg_publication p JOIN\n" + " pg_catalog.pg_publication_rel pr ON (p.oid = pr.prpubid)\n" + " WHERE p.pubname IN (%s) AND\n", + publications.data); + if (!am_partition) + appendStringInfo(&cmd, "prrelid = %u", lrel->remoteid); + else + appendStringInfo(&cmd, + "prrelid IN (SELECT relid\n" + " FROM pg_catalog.pg_partition_tree(pg_catalog.pg_partition_root(%u)))", + lrel->remoteid); + + pubres = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(attrsRow), attrsRow); + + if (pubres->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch attribute info for table \"%s.%s\" from publisher: %s", + nspname, relname, pubres->err))); + + slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) + { + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); + if (isnull) + continue; + included_cols = bms_add_member(included_cols, attnum); + } + ExecDropSingleTupleTableSlot(slot); + pfree(publications.data); + walrcv_clear_result(pubres); + } + + /* + * Store the columns as a list of names. Ignore those that are not + * present in the column list, if there is one. + */ natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = - TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + char *rel_colname; + AttrNumber attnum; + + attnum = DatumGetInt16(slot_getattr(slot, 1, &isnull)); Assert(!isnull); - lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); + + if (included_cols != NULL && !bms_is_member(attnum, included_cols)) + continue; + + rel_colname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); - if (DatumGetBool(slot_getattr(slot, 3, &isnull))) + + lrel->attnames[natt] = rel_colname; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + if (DatumGetBool(slot_getattr(slot, 4, &isnull))) lrel->attkeys = bms_add_member(lrel->attkeys, natt); /* Should never happen. */ @@ -793,12 +877,12 @@ fetch_remote_table_info(char *nspname, char *relname, ExecClearTuple(slot); } + ExecDropSingleTupleTableSlot(slot); - - lrel->natts = natt; - walrcv_clear_result(res); pfree(cmd.data); + + lrel->natts = natt; } /* @@ -831,8 +915,17 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); if (lrel.relkind == RELKIND_RELATION) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + for (int i = 0; i < lrel.natts; i++) + { + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + if (i < lrel.natts - 1) + appendStringInfoString(&cmd, ", "); + } + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index af8d51aee9..bdab0b1c8d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,16 +15,19 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel_d.h" #include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *columns); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -130,6 +134,13 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ TupleConversionMap *map; + + /* + * Set of columns included in the publication, or NULL if all columns are + * included implicitly. Note that the attnums in this list are not + * shifted by FirstLowInvalidHeapAttributeNumber. + */ + Bitmapset *columns; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -570,11 +581,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, } MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->columns); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->columns); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -587,7 +598,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *columns) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -610,13 +622,17 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; + /* Skip this attribute if it's not present in the column list */ + if (columns != NULL && !bms_is_member(att->attnum, columns)) + continue; + OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, columns); OutputPluginWrite(ctx, false); } @@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); + data->binary, relentry->columns); OutputPluginWrite(ctx, true); break; } @@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + newtuple, data->binary, relentry->columns); OutputPluginWrite(ctx, true); break; } @@ -1120,6 +1136,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { RelationSyncEntry *entry; bool found; + Oid ancestor_id; MemoryContext oldctx; Assert(RelationSyncCache != NULL); @@ -1140,6 +1157,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; + entry->columns = NULL; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ } @@ -1175,13 +1193,16 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* * Build publication cache. We can't use one provided by relcache as - * relcache considers all publications given relation is in, but here - * we only need to consider ones that the subscriber requested. + * relcache considers all publications that the given relation is in, + * but here we only need to consider ones that the subscriber + * requested. */ foreach(lc, data->publications) { Publication *pub = lfirst(lc); bool publish = false; + bool ancestor_published = false; + bool all_columns = false; if (pub->alltables) { @@ -1192,8 +1213,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!publish) { - bool ancestor_published = false; - /* * For a partition, check if any of the ancestors are * published. If so, note down the topmost ancestor that is @@ -1219,6 +1238,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) pub->oid)) { ancestor_published = true; + ancestor_id = ancestor; if (pub->pubviaroot) publish_as_relid = ancestor; } @@ -1232,9 +1252,13 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } /* + * If the relation is to be published, determine actions to + * publish, and list of columns, if appropriate. + * * Don't publish changes for partitioned tables, because - * publishing those of its partitions suffices, unless partition - * changes won't be published due to pubviaroot being set. + * publishing those of its partitions suffices. (However, ignore + * this if partition changes are not to published due to + * pubviaroot being set.) */ if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) @@ -1243,10 +1267,74 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + + /* + * Obtain columns published by this publication, and add them + * to the list for this rel. Note that if at least one + * publication has a empty column list, that means to publish + * everything; so if we saw a publication that includes all + * columns, skip this. + */ + if (!all_columns) + { + HeapTuple pub_rel_tuple; + Oid relid; + + relid = ancestor_published ? ancestor_id : publish_as_relid; + + pub_rel_tuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(pub_rel_tuple)) + { + Datum pub_rel_cols; + bool isnull; + + pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, + pub_rel_tuple, + Anum_pg_publication_rel_prattrs, + &isnull); + if (isnull) + { + /* + * If we see a publication with no columns, reset the + * list and ignore further ones. + */ + all_columns = true; + bms_free(entry->columns); + entry->columns = NULL; + } + else if (!isnull) + { + ArrayType *arr; + int nelems; + int16 *elems; + + arr = DatumGetArrayTypeP(pub_rel_cols); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + /* XXX is there a danger of memory leak here? beware */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + for (int i = 0; i < nelems; i++) + entry->columns = bms_add_member(entry->columns, + elems[i]); + MemoryContextSwitchTo(oldctx); + } + + ReleaseSysCache(pub_rel_tuple); + } + } } + /* + * If we've seen all action bits, and we know that all columns are + * published, there's no reason to look at further publications. + */ if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) + entry->pubactions.pubdelete && entry->pubactions.pubtruncate && + all_columns) break; } @@ -1343,6 +1431,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->columns); + entry->columns = NULL; if (entry->map) { /* diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 92ab95724d..d13570f5aa 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4042,6 +4042,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_oid; int i_prpubid; int i_prrelid; + int i_prattrs; int i, j, ntups; @@ -4053,8 +4054,13 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* Collect all publication membership info. */ appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid " - "FROM pg_catalog.pg_publication_rel"); + "SELECT tableoid, oid, prpubid, prrelid"); + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, ", prattrs"); + else + appendPQExpBufferStr(query, ", NULL as prattrs"); + appendPQExpBufferStr(query, + " FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); @@ -4063,6 +4069,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_oid = PQfnumber(res, "oid"); i_prpubid = PQfnumber(res, "prpubid"); i_prrelid = PQfnumber(res, "prrelid"); + i_prattrs = PQfnumber(res, "prattrs"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4104,6 +4111,28 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) pubrinfo[j].publication = pubinfo; pubrinfo[j].pubtable = tbinfo; + if (!PQgetisnull(res, i, i_prattrs)) + { + char **attnames; + int nattnames; + PQExpBuffer attribs; + + if (!parsePGArray(PQgetvalue(res, i, i_prattrs), + &attnames, &nattnames)) + fatal("could not parse %s array", "prattrs"); + attribs = createPQExpBuffer(); + for (int k = 0; k < nattnames; k++) + { + if (k > 0) + appendPQExpBufferStr(attribs, ", "); + + appendPQExpBufferStr(attribs, fmtId(attnames[k])); + } + pubrinfo[i].pubrattrs = attribs->data; + } + else + pubrinfo[j].pubrattrs = NULL; + /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); @@ -4178,10 +4207,12 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) query = createPQExpBuffer(); - appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY ", fmtId(pubinfo->dobj.name)); - appendPQExpBuffer(query, " %s;\n", - fmtQualifiedDumpable(tbinfo)); + appendPQExpBufferStr(query, fmtQualifiedDumpable(tbinfo)); + if (pubrinfo->pubrattrs) + appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs); + appendPQExpBufferStr(query, ";\n"); /* * There is no point in creating a drop query as the drop is done by table diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 066a129ee5..857f2891fc 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -630,6 +630,7 @@ typedef struct _PublicationRelInfo DumpableObject dobj; PublicationInfo *publication; TableInfo *pubtable; + char *pubrattrs; } PublicationRelInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 8587b19160..37faf4bef4 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5823,7 +5823,7 @@ listPublications(const char *pattern) */ static bool addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, - bool singlecol, printTableContent *cont) + bool as_schema, printTableContent *cont) { PGresult *res; int count = 0; @@ -5840,10 +5840,14 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, for (i = 0; i < count; i++) { - if (!singlecol) + if (!as_schema) /* as table */ + { printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), PQgetvalue(res, i, 1)); - else + if (!PQgetisnull(res, i, 2)) + appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 2)); + } + else /* as schema */ printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); printTableAddFooter(cont, buf->data); @@ -5971,8 +5975,20 @@ describePublications(const char *pattern) { /* Get the tables for the specified publication */ printfPQExpBuffer(&buf, - "SELECT n.nspname, c.relname\n" - "FROM pg_catalog.pg_class c,\n" + "SELECT n.nspname, c.relname, \n"); + if (pset.sversion >= 150000) + appendPQExpBufferStr(&buf, + " CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " pg_catalog.array_to_string" + "(ARRAY(SELECT attname\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = c.oid AND attnum = prattrs[s]), ', ')\n" + " ELSE NULL END AS columns"); + else + appendPQExpBufferStr(&buf, "NULL as columns"); + appendPQExpBuffer(&buf, + "\nFROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" " pg_catalog.pg_publication_rel pr\n" "WHERE c.relnamespace = n.oid\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 39be6f556a..20c852cdf9 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1657,6 +1657,8 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION <name> ADD */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD", "TABLE")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); /* ALTER PUBLICATION <name> DROP */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 841b9b6c25..70f73d01dc 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -86,6 +86,7 @@ typedef struct Publication typedef struct PublicationRelInfo { Relation relation; + List *columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); @@ -109,8 +110,11 @@ typedef enum PublicationPartOpt } PublicationPartOpt; extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetRelationColumnPartialPublications(Oid relid); +extern List *GetRelationColumnListInPublication(Oid relid, Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern void GetActionsInPublication(Oid pubid, PublicationActions *actions); extern List *GetPublicationSchemas(Oid pubid); extern List *GetSchemaPublications(Oid schemaid); extern List *GetSchemaPublicationRelations(Oid schemaid, @@ -127,6 +131,8 @@ extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *tar bool if_not_exists); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists); +extern void publication_set_table_columns(Relation pubrel, HeapTuple pubreltup, + Relation targetrel, List *columns); extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index 0ff3716225..151644b870 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + int2vector prattrs; +#endif } FormData_pg_publication_rel; /* ---------------- diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 413e7c85a1..096a3c1fe0 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3642,6 +3642,7 @@ typedef struct PublicationTable { NodeTag type; RangeVar *relation; /* relation to be published */ + List *columns; /* List of columns in a publication table */ } PublicationTable; /* @@ -3678,7 +3679,8 @@ typedef enum AlterPublicationAction { AP_AddObjects, /* add objects to publication */ AP_DropObjects, /* remove objects from publication */ - AP_SetObjects /* set list of objects */ + AP_SetObjects, /* set list of objects */ + AP_SetColumns /* change list of columns for a table */ } AlterPublicationAction; typedef struct AlterPublicationStmt diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 22fffaca62..fcbed4ed2d 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, - bool binary); + bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, bool binary, Bitmapset *columns); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *columns); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 12c5f67080..af98090f8f 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -165,8 +165,54 @@ Publications: regress_publication_user | t | t | t | f | f | f (1 row) -DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text, + d int generated always as (a + length(b)) stored); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error +ERROR: column "x" of relation "testpub_tbl5" does not exist +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error +ERROR: invalid column list for publishing relation "testpub_tbl5" +DETAIL: All columns in REPLICA IDENTITY must be present in the column list. +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); -- error +ERROR: cannot reference generated column "d" in publication column list +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok +ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice +ERROR: cannot drop column "c" because it is part of publication "testpub_fortable" +HINT: Specify CASCADE or use ALTER PUBLICATION to remove the column from the publication. +/* not all replica identities are good enough */ +CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c); +ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL; +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; -- nope +ERROR: index "testpub_tbl5_b_key" cannot be used because publication "testpub_fortable" does not include all indexed columns +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; -- ok, but ... +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- no dice +ERROR: invalid column list for publishing relation "testpub_tbl5" +DETAIL: All columns in REPLICA IDENTITY must be present in the column list. +/* But if upd/del are not published, it works OK */ +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); +RESET client_min_messages; +ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok +\dRp+ testpub_table_ins + Publication testpub_table_ins + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root +--------------------------+------------+---------+---------+---------+-----------+---------- + regress_publication_user | f | t | f | f | t | f +Tables: + "public.testpub_tbl5" (a) + +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error +ERROR: invalid column list for publishing relation "testpub_tbl6" +DETAIL: Cannot specify a column list on relations with REPLICA IDENTITY FULL. +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok +ALTER PUBLICATION testpub_fortable + ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c); -- error +ERROR: cannot change column set for relation "testpub_tbl6" +DETAIL: Cannot specify column list on relations with REPLICA IDENTITY FULL. +DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_table_ins; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); SET client_min_messages = 'ERROR'; @@ -670,6 +716,15 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_tes Tables from schemas: "pub_test1" +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ERROR: syntax error at or near "(" +LINE 1: ...TION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); + ^ +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); +ERROR: column specification not allowed for schemas +LINE 1: ... testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b)... + ^ -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 56dd358554..e0d50e3f69 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -89,8 +89,39 @@ SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_forall \d+ testpub_tbl2 \dRp+ testpub_foralltables -DROP TABLE testpub_tbl2; -DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema; +CREATE TABLE testpub_tbl5 (a int PRIMARY KEY, b text, c text, + d int generated always as (a + length(b)) stored); +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, x); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- ok +ALTER TABLE testpub_tbl5 DROP COLUMN c; -- no dice + +/* not all replica identities are good enough */ +CREATE UNIQUE INDEX testpub_tbl5_b_key ON testpub_tbl5 (b, c); +ALTER TABLE testpub_tbl5 ALTER b SET NOT NULL, ALTER c SET NOT NULL; +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; -- nope +ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5; + +ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key; -- ok, but ... +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c); -- no dice + +/* But if upd/del are not published, it works OK */ +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_table_ins WITH (publish = 'insert, truncate'); +RESET client_min_messages; +ALTER PUBLICATION testpub_table_ins ADD TABLE testpub_tbl5 (a); -- ok +\dRp+ testpub_table_ins + +CREATE TABLE testpub_tbl6 (a int, b text, c text); +ALTER TABLE testpub_tbl6 REPLICA IDENTITY FULL; +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6 (a, b, c); -- error +ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl6; -- ok +ALTER PUBLICATION testpub_fortable + ALTER TABLE testpub_tbl6 SET COLUMNS (a, b, c); -- error + +DROP TABLE testpub_tbl2, testpub_tbl5, testpub_tbl6; +DROP PUBLICATION testpub_foralltables, testpub_fortable, testpub_forschema, testpub_table_ins; CREATE TABLE testpub_tbl3 (a int); CREATE TABLE testpub_tbl3a (b text) INHERITS (testpub_tbl3); @@ -362,6 +393,10 @@ ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA non_existent_schem ALTER PUBLICATION testpub1_forschema SET ALL TABLES IN SCHEMA pub_test1, pub_test1; \dRp+ testpub1_forschema +-- Verify that it fails to add a schema with a column specification +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo (a, b); +ALTER PUBLICATION testpub1_forschema ADD ALL TABLES IN SCHEMA foo, bar (a, b); + -- cleanup pub_test1 schema for invalidation tests ALTER PUBLICATION testpub2_forschema DROP ALL TABLES IN SCHEMA pub_test1; DROP PUBLICATION testpub3_forschema, testpub4_forschema, testpub5_forschema, testpub6_forschema, testpub_fortable; diff --git a/src/test/subscription/t/028_column_list.pl b/src/test/subscription/t/028_column_list.pl new file mode 100644 index 0000000000..5a4f022f26 --- /dev/null +++ b/src/test/subscription/t/028_column_list.pl @@ -0,0 +1,164 @@ +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test partial-column publication of tables +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 9; + +# setup + +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + qq(max_logical_replication_workers = 6)); +$node_subscriber->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, \"B\" int, c int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar, c int); + INSERT INTO tab2 VALUES (2, 'foo', 2);"); +# Test with weird column names +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, B varchar, \"c'\" int)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text, c timestamptz) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +# Test replication with multi-level partition +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)"); + +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_1_1 PARTITION OF test_part FOR VALUES IN (1,2,3)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab3 (\"a'\" int PRIMARY KEY, \"c'\" int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b varchar)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_2_1 PARTITION OF test_part FOR VALUES IN (4,5,6) PARTITION BY LIST (a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_part_2_2 PARTITION OF test_part_2_1 FOR VALUES IN (4,5)"); + +# Test create publication with a column list +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1(a, \"B\"), tab3(\"a'\",\"c'\"), test_part(a,b)"); + +my $result = $node_publisher->safe_psql('postgres', + "select relname, prattrs from pg_publication_rel pb, pg_class pc where pb.prrelid = pc.oid;"); +is($result, qq(tab1|1 2 +tab3|1 3 +test_part|1 2), 'publication relation updated'); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1" +); +# Initial sync +$node_publisher->wait_for_catchup('sub1'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1,2,3)"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1,2,3)"); +# Test for replication of partition data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (1,'abc', '2021-07-04 12:00:00')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (2,'bcd', '2021-07-03 11:12:13')"); +# Test for replication of multi-level partition data +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (4,'abc', '2021-07-04 12:00:00')"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_part VALUES (5,'bcd', '2021-07-03 11:12:13')"); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'insert on column tab1.c is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab3"); +is($result, qq(1|3), 'insert on column tab3.b is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_part"); +is($result, qq(1|abc\n2|bcd\n4|abc\n5|bcd), 'insert on all columns is replicated'); + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET c = 5 where a = 1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab1"); +is($result, qq(1|2|), 'update on column tab1.c is not replicated'); + +# Verify user-defined types +$node_publisher->safe_psql('postgres', + qq{CREATE TYPE test_typ AS ENUM ('blue', 'red'); + CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, c int, d text); + ALTER PUBLICATION pub1 ADD TABLE test_tab4 (a, b, d); + }); +$node_subscriber->safe_psql('postgres', + qq{CREATE TYPE test_typ AS ENUM ('blue', 'red'); + CREATE TABLE test_tab4 (a INT PRIMARY KEY, b test_typ, d text); + }); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab4 VALUES (1, 'red', 3, 'oh my');"); + +# Test alter publication with a column list +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab2(a, b)"); + +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1,'abc',3)"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET c = 5 where a = 2"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2 WHERE a = 1"); +is($result, qq(1|abc), 'insert on column tab2.c is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM tab2 WHERE a = 2"); +is($result, qq(2|foo), 'update on column tab2.c is not replicated'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT * FROM test_tab4"); +is($result, qq(1|red|oh my), 'insert on table with user-defined type'); + +$node_publisher->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab5 (a int PRIMARY KEY, b int, d int)"); +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b)"); +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d)"); +$node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2, pub3"); +$node_publisher->wait_for_catchup('sub2'); +$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (1, 11, 111, 1111)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab5 VALUES (2, 22, 222, 2222)"); +$node_publisher->wait_for_catchup('sub2'); +is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5;"), + qq(1|11|1111 +2|22|2222), + 'overlapping publications with overlapping column lists'); -- 2.30.2