On Fri, Jan 22, 2021 at 10:01 AM vignesh C <vignes...@gmail.com> wrote: > > Thanks Rahila for your comments. Please find my thoughts below: > > On Wed, Jan 20, 2021 at 6:27 PM Rahila Syed <rahilasye...@gmail.com> wrote: > > > > Hi Vignesh, > > > >> > >> I have handled the above scenario(drop schema should automatically > >> remove the schema entry from publication schema relation) & addition > >> of tests in the new v2 patch attached. > >> Thoughts? > > > > > > Please see some initial comments: > > > > 1. I think there should be more tests to show that the schema data is > > actually replicated > > to the subscriber. Currently, I am not seeing the data being replicated > > when I use FOR SCHEMA. > > > I will fix this issue and include more tests in my next version of the patch.
Modified to handle this and also added a few more tests. > > 2. How does replication behave when a table is added or removed from a > > subscribed schema > > using ALTER TABLE SET SCHEMA? > > > I would like to keep the behavior similar to the table behavior. I > will post more details for this along with my next version of the > patch. > If a table is set to a different schema, after the schema change table data will not be sent to the subscriber. When a new table is added to the published schema, the table data will be sent by the publisher, subscriber will not apply the changes. If the change needs to be reflected, subscriber's publication should be refreshed using "alter subscription mysub1 refresh publication". This relation will be reflected in the subscriber relation when the subscriber's publication is refreshed. If a table is dropped, there is no impact on subscriber, This relation will be present in pg_subscriber_rel after refreshing subscriber publication. > > 3. Can we have a default schema like a public or current schema that gets > > replicated in case the user didn't > > specify one, this can be handy to replicate current schema tables. > > > It looks like a good use case, I will check on the feasibility of this > and try to implement this. This can be done, I will handle this later. > > 4. + The fourth, fifth and sixth variants change which schemas are part > > of the > > + publication. The <literal>SET TABLE</literal> clause will replace the > > list > > + of schemas in the publication with the specified one. The <literal>ADD > > > > There is a typo above s/SET TABLE/SET SCHEMA > I will fix this in the next version of the patch. Modified it. I have separated the tests and documentation into a separate patch to make review easier. Attached v3 patch with the fixes. Thoughts? Regards, Vignesh
From 72255fcb52cfba2d284b8402b64c118160b35b9f Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@enterprisedb.com> Date: Sun, 31 Jan 2021 22:47:38 +0530 Subject: [PATCH v3 1/2] Added schema level support for publication. This patch adds schema level support for publication. User can specify multiple schemas with schema option. When user specifies schema option, then the tables present in the schema specified will be selected by publisher for sending the data to subscriber. --- src/backend/catalog/Makefile | 4 +- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 9 + src/backend/catalog/objectaddress.c | 138 +++++++++++++++ src/backend/catalog/pg_publication.c | 134 +++++++++++++- src/backend/commands/alter.c | 1 + src/backend/commands/event_trigger.c | 4 + src/backend/commands/publicationcmds.c | 266 +++++++++++++++++++++++++++- src/backend/commands/seclabel.c | 1 + src/backend/commands/tablecmds.c | 1 + src/backend/parser/gram.y | 83 ++++++--- src/backend/replication/pgoutput/pgoutput.c | 12 ++ src/backend/utils/cache/syscache.c | 23 +++ src/bin/pg_dump/common.c | 3 + src/bin/pg_dump/pg_backup_archiver.c | 3 +- src/bin/pg_dump/pg_dump.c | 155 +++++++++++++++- src/bin/pg_dump/pg_dump.h | 17 ++ src/bin/pg_dump/pg_dump_sort.c | 7 + src/bin/psql/describe.c | 110 +++++++++++- src/include/catalog/dependency.h | 1 + src/include/catalog/pg_publication.h | 16 +- src/include/catalog/pg_publication_schema.h | 49 +++++ src/include/commands/publicationcmds.h | 1 + src/include/nodes/parsenodes.h | 3 + src/include/utils/syscache.h | 2 + src/test/regress/expected/publication.out | 100 +++++------ src/test/regress/expected/sanity_check.out | 1 + 27 files changed, 1055 insertions(+), 91 deletions(-) create mode 100644 src/include/catalog/pg_publication_schema.h diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index c85f0ca..dc8a9eb 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -67,8 +67,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_rel.h pg_publication_schema.h \ + pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index f3c1ca1..8322d50 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3417,6 +3417,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_TABCONSTRAINT: @@ -3556,6 +3557,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_TRANSFORM: case OBJECT_TSPARSER: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 2140151..94ed1f4 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -49,6 +49,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -180,6 +181,7 @@ static const Oid object_classes[] = { PolicyRelationId, /* OCLASS_POLICY */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ + PublicationSchemaRelationId, /* OCLASS_PUBLICATION_SCHEMA */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ TransformRelationId /* OCLASS_TRANSFORM */ }; @@ -1549,6 +1551,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePublicationRelById(object->objectId); break; + case OCLASS_PUBLICATION_SCHEMA: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_CAST: case OCLASS_COLLATION: case OCLASS_CONVERSION: @@ -2982,6 +2988,9 @@ getObjectClass(const ObjectAddress *object) case PublicationRelRelationId: return OCLASS_PUBLICATION_REL; + case PublicationSchemaRelationId: + return OCLASS_PUBLICATION_SCHEMA; + case SubscriptionRelationId: return OCLASS_SUBSCRIPTION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 6d88b69..02661c4 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -49,6 +49,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -828,6 +829,10 @@ static const struct object_type_map { "publication relation", OBJECT_PUBLICATION_REL }, + /* OCLASS_PUBLICATION_SCHEMA */ + { + "publication schema", OBJECT_PUBLICATION_SCHEMA + }, /* OCLASS_SUBSCRIPTION */ { "subscription", OBJECT_SUBSCRIPTION @@ -874,6 +879,9 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); + static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1117,6 +1125,10 @@ get_object_address(ObjectType objtype, Node *object, &relation, missing_ok); break; + case OBJECT_PUBLICATION_SCHEMA: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_DEFACL: address = get_object_address_defacl(castNode(List, object), missing_ok); @@ -1935,6 +1947,51 @@ get_object_address_publication_rel(List *object, } /* + * Find the ObjectAddress for a publication schema. The first element of + * the object parameter is the schema name, the second is the + * publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + char *pubname; + Publication *pub; + char *schemaname; + Oid schemaoid; + + ObjectAddressSet(address, PublicationSchemaRelationId, InvalidOid); + + /* fetch publication name and schema oid from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaoid = get_namespace_oid(schemaname, false); + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache. */ + address.objectId = + GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId)) + { + if (!missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication schema \"%u\" in publication \"%s\" does not exist", + schemaoid, pubname))); + return address; + } + + return address; +} + +/* * Find the ObjectAddress for a default ACL. */ static ObjectAddress @@ -2206,6 +2263,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_CAST: case OBJECT_USER_MAPPING: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_DEFACL: case OBJECT_TRANSFORM: if (list_length(args) != 1) @@ -2298,6 +2356,9 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_SCHEMA: + objnode = (Node *) list_make2(linitial(name), linitial(args)); + break; case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -3897,6 +3958,40 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_SCHEMA: + { + HeapTuple tup; + char *pubname; + Form_pg_publication_schema prform; + char *nspname; + + tup = SearchSysCache1(PUBLICATIONSCHEMA, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + break; + } + + prform = (Form_pg_publication_schema) GETSTRUCT(tup); + pubname = get_publication_name(prform->prpubid, false); + nspname = get_namespace_name(prform->prnspcid); + if (!nspname) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + object->objectId); + break; + } + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + ReleaseSysCache(tup); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname = get_subscription_name(object->objectId, @@ -4470,6 +4565,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication relation"); break; + case OCLASS_PUBLICATION_SCHEMA: + appendStringInfoString(&buffer, "publication schema"); + break; + case OCLASS_SUBSCRIPTION: appendStringInfoString(&buffer, "subscription"); break; @@ -5705,6 +5804,45 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_SCHEMA: + { + HeapTuple tup; + char *pubname; + char *nspname; + Form_pg_publication_schema prform; + + tup = SearchSysCache1(PUBLICATIONSCHEMA, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + break; + } + + prform = (Form_pg_publication_schema) GETSTRUCT(tup); + pubname = get_publication_name(prform->prpubid, false); + nspname = get_namespace_name(prform->prnspcid); + if (!nspname) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + object->objectId); + break; + } + + appendStringInfo(&buffer, "%s in publication %s", nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + if (objname) + *objname = list_make1(nspname); + + ReleaseSysCache(tup); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 5f8e1c6..4d46935 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,8 +28,10 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_type.h" #include "funcapi.h" #include "miscadmin.h" @@ -214,6 +216,76 @@ publication_add_relation(Oid pubid, Relation targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaoid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_rel]; + bool nulls[Natts_pg_publication_rel]; + Oid prrelid; + Publication *pub = GetPublication(pubid); + ObjectAddress myself, + referenced; + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONSCHEMAMAP, ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaoid), pub->name))); + } + + /* Form a tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + prrelid = GetNewOidWithIndex(rel, PublicationSchemaObjectIndexId, + Anum_pg_publication_schema_oid); + values[Anum_pg_publication_schema_oid - 1] = ObjectIdGetDatum(prrelid); + values[Anum_pg_publication_schema_prpubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_schema_prnspcid - 1] = + ObjectIdGetDatum(schemaoid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog. */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationSchemaRelationId, prrelid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaoid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table. */ + table_close(rel, RowExclusiveLock); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -305,6 +377,47 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) } /* + * Gets list of schema oids for a publication. + * + * This should only be used for normal publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result; + Relation pubrelsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the schema. */ + pubrelsrel = table_open(PublicationSchemaRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_schema_prpubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubrelsrel, PublicationSchemaPrnspcidPrpubidIndexId, + true, NULL, 1, &scankey); + + result = NIL; + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_schema pubrel; + + pubrel = (Form_pg_publication_schema) GETSTRUCT(tup); + + result = lappend_oid(result, pubrel->prnspcid); + } + + systable_endscan(scan); + table_close(pubrelsrel, AccessShareLock); + + return result; +} + +/* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ List * @@ -349,13 +462,16 @@ GetAllTablesPublications(void) * root partitioned tables. */ List * -GetAllTablesPublicationRelations(bool pubviaroot) +GetAllTablesPublicationRelations(Publication *publication) { Relation classRel; ScanKeyData key[1]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; + bool pubviaroot = publication->pubviaroot; + List *pubschemas = GetPublicationSchemas(publication->oid); + classRel = table_open(RelationRelationId, AccessShareLock); @@ -371,6 +487,16 @@ GetAllTablesPublicationRelations(bool pubviaroot) Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; + /* + * If schema is specified by the user, check if the relation is present + * in one of the schema specified. + */ + if (pubschemas) + { + if (!list_member_oid(pubschemas, relForm->relnamespace)) + continue; + } + if (is_publishable_class(relid, relForm) && !(relForm->relispartition && pubviaroot)) result = lappend_oid(result, relid); @@ -431,6 +557,8 @@ GetPublication(Oid pubid) pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubviaroot = pubform->pubviaroot; + pub->pubtables = pubform->pubtables; + pub->pubschemas = pubform->pubschemas; ReleaseSysCache(tup); @@ -530,8 +658,8 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * replicated using leaf partition identity and schema, so we only * need those. */ - if (publication->alltables) - tables = GetAllTablesPublicationRelations(publication->pubviaroot); + if (publication->alltables || publication->pubschemas) + tables = GetAllTablesPublicationRelations(publication); else tables = GetPublicationRelations(publication->oid, publication->pubviaroot ? diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 2924949..e7c2745 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -661,6 +661,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: /* ignore object types that don't have schema-qualified names */ diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 5bde507..4673e8e 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -974,6 +974,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROUTINE: case OBJECT_RULE: case OBJECT_SCHEMA: @@ -1051,6 +1052,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: return true; @@ -2120,6 +2122,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: @@ -2202,6 +2205,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 95c253c..6bb6a74 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,7 +25,9 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" @@ -53,6 +55,9 @@ static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(List *options, @@ -212,6 +217,10 @@ CreatePublication(CreatePublicationStmt *stmt) BoolGetDatum(pubactions.pubtruncate); values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); + values[Anum_pg_publication_pubtables -1] = + stmt->tables ? BoolGetDatum(true) : BoolGetDatum(false); + values[Anum_pg_publication_pubschemas - 1] = + stmt->schemas ? BoolGetDatum(true) : BoolGetDatum(false); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -226,6 +235,23 @@ CreatePublication(CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); + if (stmt->schemas) + { + ListCell *cell; + List *schemaoidlist = NIL; + Relation nspcrel; + + nspcrel = table_open(NamespaceRelationId, ShareUpdateExclusiveLock); + foreach(cell, stmt->schemas) + { + char *schema = strVal(lfirst(cell)); + schemaoidlist = lappend_oid(schemaoidlist, get_namespace_oid(schema, false)); + } + + PublicationAddSchemas(puboid, schemaoidlist, true, NULL); + table_close(nspcrel, ShareUpdateExclusiveLock); + } + if (stmt->tables) { List *rels; @@ -252,6 +278,30 @@ CreatePublication(CreatePublicationStmt *stmt) return myself; } +static void +UpdatePublicationTupleValue(Relation rel, HeapTuple tup, int col, bool value) +{ + bool nulls[Natts_pg_publication]; + bool replaces[Natts_pg_publication]; + Datum values[Natts_pg_publication]; + + /* Everything ok, form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + values[col - 1] = BoolGetDatum(value); + replaces[col - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + CommandCounterIncrement(); +} + /* * Change options of a publication. */ @@ -370,14 +420,34 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, NameStr(pubform->pubname)), errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + if (pubform->pubschemas) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR SCHEMA", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR SCHEMA publications."))); + Assert(list_length(stmt->tables) > 0); rels = OpenTableList(stmt->tables); if (stmt->tableAction == DEFELEM_ADD) + { PublicationAddTables(pubid, rels, false, stmt); + if (!pubform->pubtables) + UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubtables, + true); + } else if (stmt->tableAction == DEFELEM_DROP) + { + List *tables; PublicationDropTables(pubid, rels, false); + tables = GetPublicationRelations(pubid, PUBLICATION_PART_ROOT); + if (!list_length(tables)) + UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubtables, + false); + + } else /* DEFELEM_SET */ { List *oldrelids = GetPublicationRelations(pubid, @@ -422,16 +492,116 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, PublicationAddTables(pubid, rels, true, stmt); CloseTableList(delrels); + + /* Update pubtables col to true */ + if (!pubform->pubtables) + UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubtables, + true); } CloseTableList(rels); } /* + * Alter the publication schemas. + * + * Add/Remove/Set the schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, Relation rel, + HeapTuple tup, Form_pg_publication pubform) +{ + List *schemaoidlist = NIL; + ListCell *cell; + + /* Check that user is allowed to manipulate the publication tables. */ + if (pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."))); + + if (pubform->pubtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR TABLE", + NameStr(pubform->pubname)), + errdetail("Schemas cannot be added to or dropped from FOR TABLE publications."))); + + /* Convert the text list into oid list. */ + foreach(cell, stmt->schemas) + { + char *schema = strVal(lfirst(cell)); + schemaoidlist = lappend_oid(schemaoidlist, get_namespace_oid(schema, false)); + } + + if (stmt->tableAction == DEFELEM_ADD) + { + PublicationAddSchemas(pubform->oid, schemaoidlist, false, stmt); + if (!pubform->pubschemas) + UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubschemas, + true); + } + else if (stmt->tableAction == DEFELEM_DROP) + { + List *schemas; + PublicationDropSchemas(pubform->oid, schemaoidlist, false); + schemas = GetPublicationSchemas(pubform->oid); + if (!list_length(schemas)) + UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubschemas, + false); + } + else + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + ListCell *oldlc; + + /* Identify which schemas should be dropped. */ + foreach(oldlc, oldschemaids) + { + Oid oldrelid = lfirst_oid(oldlc); + ListCell *newlc; + bool found = false; + + foreach(newlc, schemaoidlist) + { + Oid newschemaid = lfirst_oid(newlc); + + if (newschemaid == oldrelid) + { + found = true; + break; + } + } + + if (!found) + delschemas = lappend_oid(delschemas, oldrelid); + } + + /* And drop them. */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaoidlist, true, stmt); + + if (!pubform->pubschemas) + UpdatePublicationTupleValue(rel, tup, Anum_pg_publication_pubschemas, + true); + } + + return; +} + +/* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(AlterPublicationStmt *stmt) @@ -460,6 +630,8 @@ AlterPublication(AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(stmt, rel, tup); + else if (stmt->schemas) + AlterPublicationSchemas(stmt, rel, tup, pubform); else AlterPublicationTables(stmt, rel, tup); @@ -499,6 +671,30 @@ RemovePublicationRelById(Oid proid) } /* + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid proid) +{ + Relation rel; + HeapTuple tup; + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONSCHEMA, ObjectIdGetDatum(proid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", + proid); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + +/* * Open relations specified by a RangeVar list. * The returned tables are locked in ShareUpdateExclusiveLock mode in order to * add them to a publication. @@ -633,6 +829,39 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, } /* + * Add listed tables to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || !stmt->for_all_tables); + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + ObjectAddress obj; + + /* Must be owner of the schema or superuser. */ + if (!pg_namespace_ownercheck(schemaoid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA, + get_namespace_name(schemaoid)); + + obj = publication_add_schema(pubid, schemaoid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationSchemaRelationId, + obj.objectId, 0); + } + } +} + +/* * Remove listed tables from the publication. */ static void @@ -667,6 +896,39 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } /* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid prid; + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + + prid = GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(prid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("schema \"%s\" is not part of the publication", + get_namespace_name(schemaoid)))); + } + + ObjectAddressSet(obj, PublicationSchemaRelationId, prid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + +/* * Internal workhorse for changing a publication owner */ static void diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index 6906714..b108b64 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -80,6 +80,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: case OBJECT_TABCONSTRAINT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 36747e7..f4593af 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -11709,6 +11709,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 7574d54..88081fd 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -421,7 +421,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <list> group_by_list %type <node> group_by_item empty_grouping_set rollup_clause cube_clause %type <node> grouping_sets_clause -%type <node> opt_publication_for_tables publication_for_tables %type <list> opt_fdw_options fdw_options %type <defelt> fdw_option @@ -9466,46 +9465,49 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR TABLE [WITH options] + * + * CREATE PUBLICATION FOR SCHEMA [WITH options] *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; $$ = (Node *)n; } - ; - -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } - ; - -publication_for_tables: - FOR TABLE relation_expr_list + | CREATE PUBLICATION name FOR ALL TABLES opt_definition { - $$ = (Node *) $3; + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; } - | FOR ALL TABLES + | CREATE PUBLICATION name FOR TABLE relation_expr_list opt_definition { - $$ = (Node *) makeInteger(true); + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->tables = (List *)$6; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR SCHEMA name_list opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->schemas = (List *)$6; + $$ = (Node *)n; } ; - /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) @@ -9516,6 +9518,11 @@ publication_for_tables: * * ALTER PUBLICATION name SET TABLE table [, table2] * + * ALTER PUBLICATION name ADD SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name DROP SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name SET SCHEMA schema [, schema2] *****************************************************************************/ AlterPublicationStmt: @@ -9550,6 +9557,30 @@ AlterPublicationStmt: n->tableAction = DEFELEM_DROP; $$ = (Node *)n; } + | ALTER PUBLICATION name ADD_P SCHEMA name_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER PUBLICATION name SET SCHEMA name_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_SET; + $$ = (Node *)n; + } + | ALTER PUBLICATION name DROP SCHEMA name_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_DROP; + $$ = (Node *)n; + } ; /***************************************************************************** diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 79765f9..458a04b 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1005,6 +1005,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) publish_as_relid = llast_oid(get_partition_ancestors(relid)); } + if (pub->pubschemas) + { + Oid schemaId = get_rel_namespace(relid); + List *pubschemas = GetPublicationSchemas(pub->oid); + if (list_member_oid(pubschemas, schemaId)) + { + publish = true; + if (pub->pubviaroot && am_partition) + publish_as_relid = llast_oid(get_partition_ancestors(relid)); + } + } + if (!publish) { bool ancestor_published = false; diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index e4dc4ee..953ab6f 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -50,6 +50,7 @@ #include "catalog/pg_partitioned_table.h" #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" @@ -650,6 +651,28 @@ static const struct cachedesc cacheinfo[] = { }, 64 }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMA */ + PublicationSchemaObjectIndexId, + 1, + { + Anum_pg_publication_schema_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMAMAP */ + PublicationSchemaPrnspcidPrpubidIndexId, + 2, + { + Anum_pg_publication_schema_prnspcid, + Anum_pg_publication_schema_prpubid, + 0, + 0 + }, + 64 + }, {RangeRelationId, /* RANGEMULTIRANGE */ RangeMultirangeTypidIndexId, 1, diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index b0f02bc..17b132f 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -257,6 +257,9 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading publication membership"); getPublicationTables(fout, tblinfo, numTables); + pg_log_info("reading publciation schemas"); + getPublicationSchemas(fout, nspinfo, numNamespaces); + pg_log_info("reading subscriptions"); getSubscriptions(fout); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 1f82c64..e21acb8 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2847,7 +2847,8 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH) */ if (ropt->no_publications && (strcmp(te->desc, "PUBLICATION") == 0 || - strcmp(te->desc, "PUBLICATION TABLE") == 0)) + strcmp(te->desc, "PUBLICATION TABLE") == 0 || + strcmp(te->desc, "PUBLICATION SCHEMA") == 0)) return 0; /* If it's a security label, maybe ignore it */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 798d145..3924e92 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3882,6 +3882,8 @@ getPublications(Archive *fout, int *numPublications) int i_pubdelete; int i_pubtruncate; int i_pubviaroot; + int i_pubtables; + int i_pubschemas; int i, ntups; @@ -3896,7 +3898,14 @@ getPublications(Archive *fout, int *numPublications) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 130000) + if (fout->remoteVersion >= 140000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubschemas, p.pubtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot " + "FROM pg_publication p", + username_subquery); + else if (fout->remoteVersion >= 130000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " @@ -3932,6 +3941,8 @@ getPublications(Archive *fout, int *numPublications) i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); i_pubviaroot = PQfnumber(res, "pubviaroot"); + i_pubtables = PQfnumber(res, "pubtables"); + i_pubschemas = PQfnumber(res, "pubschemas"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3956,6 +3967,10 @@ getPublications(Archive *fout, int *numPublications) (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); pubinfo[i].pubviaroot = (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); + pubinfo[i].pubtables = + (strcmp(PQgetvalue(res, i, i_pubtables), "t") == 0); + pubinfo[i].pubschemas = + (strcmp(PQgetvalue(res, i, i_pubschemas), "t") == 0); if (strlen(pubinfo[i].rolname) == 0) pg_log_warning("owner of publication \"%s\" appears to be invalid", @@ -4066,6 +4081,102 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo) } /* + * getPublicationSchemas + * get information about publication membership for dumpable schemas. + */ +void +getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], int numSchemas) +{ + PQExpBuffer query; + PGresult *res; + PublicationSchemaInfo *pubrinfo; + DumpOptions *dopt = fout->dopt; + int i_schemaoid; + int i_oid; + int i_pubname; + int i_pubid; + int i, + j, + ntups; + + if (dopt->no_publications || fout->remoteVersion < 140000) + return; + + query = createPQExpBuffer(); + + for (i = 0; i < numSchemas; i++) + { + NamespaceInfo *nsinfo = &nspinfo[i]; + PublicationInfo *pubinfo; + + /* + * Ignore publication membership of schemas whose definitions are not + * to be dumped. + */ + if (!(nsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + continue; + + pg_log_info("reading publication membership for schema \"%s\"", + nsinfo->dobj.name); + + resetPQExpBuffer(query); + + /* Get the publication membership for the table. */ + appendPQExpBuffer(query, + "SELECT pr.prnspcid, pr.oid, p.pubname, p.oid AS pubid " + "FROM pg_publication_schema pr, pg_publication p " + "WHERE pr.prnspcid = '%u'" + " AND p.oid = pr.prpubid", + nsinfo->dobj.catId.oid); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * Schema is not member of any publications. Clean up and return. + */ + PQclear(res); + continue; + } + + i_schemaoid = PQfnumber(res, "prnspcid"); + i_oid = PQfnumber(res, "oid"); + i_pubname = PQfnumber(res, "pubname"); + i_pubid = PQfnumber(res, "pubid"); + + + pubrinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo)); + + for (j = 0; j < ntups; j++) + { + Oid prpubid = atooid(PQgetvalue(res, j, i_pubid)); + + pubinfo = findPublicationByOid(prpubid); + if (pubinfo == NULL) + continue; + + pubrinfo[j].dobj.objType = DO_PUBLICATION_SCHEMA; + pubrinfo[j].dobj.catId.tableoid = + atooid(PQgetvalue(res, j, i_schemaoid)); + pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid)); + AssignDumpId(&pubrinfo[j].dobj); + pubrinfo[j].dobj.namespace = nsinfo->dobj.namespace; + pubrinfo[j].dobj.name = nsinfo->dobj.name; + pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname)); + pubrinfo[j].pubschema = nsinfo; + pubrinfo[j].publication = pubinfo; + + /* Decide whether we want to dump it */ + selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout); + } + PQclear(res); + } + destroyPQExpBuffer(query); +} + +/* * getPublicationTables * get information about publication membership for dumpable tables. */ @@ -4153,6 +4264,44 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) } /* + * dumpPublicationSchema + * dump the definition of the given publication schema mapping + */ +static void +dumpPublicationSchema(Archive *fout, PublicationSchemaInfo *pubrinfo) +{ + NamespaceInfo *schemainfo = pubrinfo->pubschema; + PublicationInfo *pubinfo = pubrinfo->publication; + PQExpBuffer query; + char *tag; + + if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + return; + + tag = psprintf("%s %s", pubrinfo->pubname, schemainfo->dobj.name); + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubrinfo->pubname)); + appendPQExpBuffer(query, "ADD SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); + + /* + * There is no point in creating drop query as the drop is done by schema + * drop. + */ + ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId, + ARCHIVE_OPTS(.tag = tag, + .namespace = schemainfo->dobj.name, + .owner = pubinfo->rolname, + .description = "PUBLICATION SCHEMA", + .section = SECTION_POST_DATA, + .createStmt = query->data)); + + free(tag); + destroyPQExpBuffer(query); +} + +/* * dumpPublicationTable * dump the definition of the given publication table mapping */ @@ -10293,6 +10442,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj) case DO_PUBLICATION_REL: dumpPublicationTable(fout, (PublicationRelInfo *) dobj); break; + case DO_PUBLICATION_SCHEMA: + dumpPublicationSchema(fout, (PublicationSchemaInfo *) dobj); + break; case DO_SUBSCRIPTION: dumpSubscription(fout, (SubscriptionInfo *) dobj); break; @@ -18429,6 +18581,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_POLICY: case DO_PUBLICATION: case DO_PUBLICATION_REL: + case DO_PUBLICATION_SCHEMA: case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1290f96..f3c05ad 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -81,6 +81,7 @@ typedef enum DO_POLICY, DO_PUBLICATION, DO_PUBLICATION_REL, + DO_PUBLICATION_SCHEMA, DO_SUBSCRIPTION } DumpableObjectType; @@ -614,6 +615,8 @@ typedef struct _PublicationInfo bool pubdelete; bool pubtruncate; bool pubviaroot; + bool pubtables; + bool pubschemas; } PublicationInfo; /* @@ -628,6 +631,18 @@ typedef struct _PublicationRelInfo } PublicationRelInfo; /* + * The PublicationSchemaInfo struct is used to represent publication schema + * mapping. + */ +typedef struct PublicationSchemaInfo +{ + DumpableObject dobj; + NamespaceInfo *pubschema; + char *pubname; + PublicationInfo *publication; +} PublicationSchemaInfo; + +/* * The SubscriptionInfo struct is used to represent subscription. */ typedef struct _SubscriptionInfo @@ -732,6 +747,8 @@ extern PublicationInfo *getPublications(Archive *fout, int *numPublications); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); +extern void getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], + int numSchemas); extern void getSubscriptions(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 46461fb..13a6fcd 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -82,6 +82,7 @@ enum dbObjectTypePriorities PRIO_POLICY, PRIO_PUBLICATION, PRIO_PUBLICATION_REL, + PRIO_PUBLICATION_SCHEMA, PRIO_SUBSCRIPTION, PRIO_DEFAULT_ACL, /* done in ACL pass */ PRIO_EVENT_TRIGGER, /* must be next to last! */ @@ -135,6 +136,7 @@ static const int dbObjectTypePriority[] = PRIO_POLICY, /* DO_POLICY */ PRIO_PUBLICATION, /* DO_PUBLICATION */ PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */ + PRIO_PUBLICATION_SCHEMA, /* DO_PUBLICATION_SCHEMA */ PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */ }; @@ -1477,6 +1479,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION TABLE (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_SCHEMA: + snprintf(buf, bufsize, + "PUBLICATION SCHEMA (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_SUBSCRIPTION: snprintf(buf, bufsize, "SUBSCRIPTION (ID %d OID %u)", diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 20af5a9..f9a1283 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -2884,6 +2884,39 @@ describeOneTableDetails(const char *schemaname, printTableAddFooter(&cont, buf.data); } PQclear(result); + + if (pset.sversion >= 140000) + { + int pub_schema_tuples; + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.oid in \n" + "(SELECT s.prpubid FROM \n" + "pg_catalog.pg_class c, \n" + "pg_catalog.pg_publication_schema s \n" + "where c.oid = '%s' AND \n" + "c.relnamespace = s.prnspcid)", + oid); + result = PSQLexec(buf.data); + if (!result) + goto error_return; + else + pub_schema_tuples = PQntuples(result); + + if (!tuples && pub_schema_tuples > 0) + printTableAddFooter(&cont, _("Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < pub_schema_tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(result, i, 0)); + + printTableAddFooter(&cont, buf.data); + } + PQclear(result); + } } } @@ -5829,7 +5862,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5864,7 +5897,12 @@ listPublications(const char *pattern) appendPQExpBuffer(&buf, ",\n pubviaroot AS \"%s\"", gettext_noop("Via root")); - + if (pset.sversion >= 140000) + appendPQExpBuffer(&buf, + ",\n pubtables AS \"%s\",\n" + " pubschemas AS \"%s\"", + gettext_noop("Tables"), + gettext_noop("Schemas")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5906,6 +5944,8 @@ describePublications(const char *pattern) PGresult *res; bool has_pubtruncate; bool has_pubviaroot; + bool has_pubtables; + bool has_pubschemas; if (pset.sversion < 100000) { @@ -5919,6 +5959,8 @@ describePublications(const char *pattern) has_pubtruncate = (pset.sversion >= 110000); has_pubviaroot = (pset.sversion >= 130000); + has_pubtables = (pset.sversion >= 140000); + has_pubschemas= (pset.sversion >= 140000); initPQExpBuffer(&buf); @@ -5932,6 +5974,13 @@ describePublications(const char *pattern) if (has_pubviaroot) appendPQExpBufferStr(&buf, ", pubviaroot"); + if (has_pubtables) + appendPQExpBufferStr(&buf, + ", pubtables"); + if (has_pubschemas) + appendPQExpBufferStr(&buf, + ", pubschemas"); + appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5974,6 +6023,8 @@ describePublications(const char *pattern) char *pubid = PQgetvalue(res, i, 0); char *pubname = PQgetvalue(res, i, 1); bool puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0; + bool pubtables; + bool pubschemas; int j; PQExpBufferData title; printTableOpt myopt = pset.popt.topt; @@ -5983,6 +6034,16 @@ describePublications(const char *pattern) ncols++; if (has_pubviaroot) ncols++; + if (has_pubtables) + { + pubtables = strcmp(PQgetvalue(res, i, 9), "t") == 0; + ncols++; + } + if (has_pubschemas) + { + pubschemas = strcmp(PQgetvalue(res, i, 10), "t") == 0; + ncols++; + } initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); @@ -5997,6 +6058,10 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); if (has_pubviaroot) printTableAddHeader(&cont, gettext_noop("Via root"), true, align); + if (has_pubtables) + printTableAddHeader(&cont, gettext_noop("Pubtables"), true, align); + if (has_pubschemas) + printTableAddHeader(&cont, gettext_noop("Pubschemas"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); @@ -6007,8 +6072,13 @@ describePublications(const char *pattern) printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); if (has_pubviaroot) printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); + if (has_pubtables) + printTableAddCell(&cont, PQgetvalue(res, i, 9), false, false); + if (has_pubschemas) + printTableAddCell(&cont, PQgetvalue(res, i, 10), false, false); - if (!puballtables) + /* Prior to version 14 check was based on all tables */ + if ((has_pubtables && pubtables) || (!has_pubtables && !puballtables)) { printfPQExpBuffer(&buf, "SELECT n.nspname, c.relname\n" @@ -6045,6 +6115,40 @@ describePublications(const char *pattern) } PQclear(tabres); } + else if (pubschemas) + { + printfPQExpBuffer(&buf, + "SELECT n.nspname\n" + "FROM pg_catalog.pg_namespace n,\n" + " pg_catalog.pg_publication_schema ps\n" + "WHERE n.oid = ps.prnspcid\n" + " AND ps.prpubid = '%s'\n" + "ORDER BY 1", pubid); + + tabres = PSQLexec(buf.data); + if (!tabres) + { + printTableCleanup(&cont); + PQclear(res); + termPQExpBuffer(&buf); + termPQExpBuffer(&title); + return false; + } + else + tables = PQntuples(tabres); + + if (tables > 0) + printTableAddFooter(&cont, _("Schemas:")); + + for (j = 0; j < tables; j++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(tabres, j, 0)); + + printTableAddFooter(&cont, buf.data); + } + PQclear(tabres); + } printTable(&cont, pset.queryFout, false, pset.logfile); printTableCleanup(&cont); diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index f272e2c..11af0c8 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -131,6 +131,7 @@ typedef enum ObjectClass OCLASS_POLICY, /* pg_policy */ OCLASS_PUBLICATION, /* pg_publication */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ + OCLASS_PUBLICATION_SCHEMA, /* pg_publication_schema */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ } ObjectClass; diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 0dd50fe..da3bd01 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -20,6 +20,7 @@ #include "catalog/genbki.h" #include "catalog/objectaddress.h" #include "catalog/pg_publication_d.h" +#include "utils/array.h" /* ---------------- * pg_publication definition. cpp turns this into @@ -54,6 +55,12 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if partition changes are published using root schema */ bool pubviaroot; + + /* indicates publication is for specific tables */ + bool pubtables; + + /* indicates publication is for specific schema tables */ + bool pubschemas; } FormData_pg_publication; /* ---------------- @@ -83,6 +90,8 @@ typedef struct Publication bool alltables; bool pubviaroot; PublicationActions pubactions; + bool pubtables; + bool pubschemas; } Publication; extern Publication *GetPublication(Oid pubid); @@ -106,15 +115,16 @@ typedef enum PublicationPartOpt } PublicationPartOpt; extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationSchemas(Oid pubid); extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetAllTablesPublicationRelations(Publication *publication); extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists); - +extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaoid, + bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); - #endif /* PG_PUBLICATION_H */ diff --git a/src/include/catalog/pg_publication_schema.h b/src/include/catalog/pg_publication_schema.h new file mode 100644 index 0000000..73d5815 --- /dev/null +++ b/src/include/catalog/pg_publication_schema.h @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_schema.h + * definition of the system catalog for mappings between schemas and + * publications (pg_publication_schema) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_schema.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_SCHEMA_H +#define PG_PUBLICATION_SCHEMA_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_schema_d.h" + + +/* ---------------- + * pg_publication_schema definition. cpp turns this into + * typedef struct FormData_pg_publication_schema + * ---------------- + */ +CATALOG(pg_publication_schema,8901,PublicationSchemaRelationId) +{ + Oid oid; /* oid */ + Oid prpubid; /* Oid of the publication */ + Oid prnspcid; /* Oid of the schema */ +} FormData_pg_publication_schema; + +/* ---------------- + * Form_pg_publication_schema corresponds to a pointer to a tuple with + * the format of pg_publication_schema relation. + * ---------------- + */ +typedef FormData_pg_publication_schema *Form_pg_publication_schema; + +DECLARE_UNIQUE_INDEX(pg_publication_schema_oid_index, 8902, on pg_publication_schema using btree(oid oid_ops)); +#define PublicationSchemaObjectIndexId 8902 +DECLARE_UNIQUE_INDEX(pg_publication_schema_prrelid_prpubid_index, 8903, on pg_publication_schema using btree(prnspcid oid_ops, prpubid oid_ops)); +#define PublicationSchemaPrnspcidPrpubidIndexId 8903 + +#endif /* PG_PUBLICATION_REL_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 00e2e62..581fedb 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -21,6 +21,7 @@ extern ObjectAddress CreatePublication(CreatePublicationStmt *stmt); extern void AlterPublication(AlterPublicationStmt *stmt); extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationSchemaById(Oid proid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index dc2bb40..8f9639e 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1747,6 +1747,7 @@ typedef enum ObjectType OBJECT_PROCEDURE, OBJECT_PUBLICATION, OBJECT_PUBLICATION_REL, + OBJECT_PUBLICATION_SCHEMA, OBJECT_ROLE, OBJECT_ROUTINE, OBJECT_RULE, @@ -3523,6 +3524,7 @@ typedef struct CreatePublicationStmt List *options; /* List of DefElem nodes */ List *tables; /* Optional list of tables to add */ bool for_all_tables; /* Special publication for all tables in db */ + List *schemas; /* Optional list of schemas */ } CreatePublicationStmt; typedef struct AlterPublicationStmt @@ -3537,6 +3539,7 @@ typedef struct AlterPublicationStmt List *tables; /* List of tables to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ + List *schemas; /* Optional list of schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index d74a348..1ba2952 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -79,6 +79,8 @@ enum SysCacheIdentifier PUBLICATIONOID, PUBLICATIONREL, PUBLICATIONRELMAP, + PUBLICATIONSCHEMA, + PUBLICATIONSCHEMAMAP, RANGEMULTIRANGE, RANGETYPE, RELNAMENSP, diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 63d6ab7..4052873 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -28,20 +28,20 @@ ERROR: unrecognized "publish" value: "cluster" CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0'); ERROR: conflicting or redundant options \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | f | f + testpub_default | regress_publication_user | f | f | t | f | f | f | f | f (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | f | f + testpub_default | regress_publication_user | f | t | t | t | f | f | f | f (2 rows) --- adding tables @@ -85,10 +85,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | t | t | t | f | f | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | t | t | t | f | f | f | f | f (1 row) DROP TABLE testpub_tbl2; @@ -100,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; RESET client_min_messages; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | t | f Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | t | f Tables: "public.testpub_tbl3" @@ -131,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); -- only parent is listed as being in publication, not the partition ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | t | f Tables: "public.testpub_parted" @@ -147,10 +147,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; UPDATE testpub_parted1 SET a = 1; ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | t + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | t | t | f Tables: "public.testpub_parted" @@ -170,10 +170,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -211,10 +211,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | f | f | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -258,10 +258,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | f | f | f | f (1 row) -- fail - must be owner of publication @@ -271,20 +271,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root --------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpub_foo | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas +-------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------+--------- + testpub_foo | regress_publication_user | f | t | t | t | f | f | f | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ------------------+---------------------------+------------+---------+---------+---------+-----------+---------- - testpub_default | regress_publication_user2 | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Tables | Schemas +-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+--------+--------- + testpub_default | regress_publication_user2 | f | t | t | t | f | f | f | f (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index d9ce961..fe5a038 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -140,6 +140,7 @@ pg_policy|t pg_proc|t pg_publication|t pg_publication_rel|t +pg_publication_schema|t pg_range|t pg_replication_origin|t pg_rewrite|t -- 1.8.3.1
From e0bb91a040625100c2d2b3af4eba5c991eed6487 Mon Sep 17 00:00:00 2001 From: Vignesh C <vignes...@enterprisedb.com> Date: Sun, 31 Jan 2021 22:53:23 +0530 Subject: [PATCH v3 2/2] Tests and documentation for schema level support for publication. Tests and documentation for schema level support for publication. --- doc/src/sgml/ref/alter_publication.sgml | 45 +++++- doc/src/sgml/ref/create_publication.sgml | 31 ++++- src/test/regress/expected/object_address.out | 6 +- src/test/regress/expected/publication.out | 198 ++++++++++++++++++++++++++- src/test/regress/sql/object_address.sql | 3 + src/test/regress/sql/publication.sql | 87 +++++++++++- src/test/subscription/t/001_rep_changes.pl | 131 +++++++++++++++++- 7 files changed, 494 insertions(+), 7 deletions(-) diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index faa114b..884cab4 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -24,6 +24,9 @@ PostgreSQL documentation ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> ADD SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...] +ALTER PUBLICATION <replaceable class="parameter">name</replaceable> DROP SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ...] ALTER PUBLICATION <replaceable class="parameter">name</replaceable> SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ALTER PUBLICATION <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable> @@ -50,7 +53,18 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r </para> <para> - The fourth variant of this command listed in the synopsis can change + The fourth, fifth and sixth variants change which schemas are part of the + publication. The <literal>SET SCHEMA</literal> clause will replace the list + of schemas in the publication with the specified one. The <literal>ADD + SCHEMA</literal> and <literal>DROP SCHEMA</literal> clauses will add and + remove one or more schemas from the publication. Note that adding schemas + to a publication that is already subscribed to will require a <literal>ALTER + SUBSCRIPTION ... REFRESH PUBLICATION</literal> action on the subscribing side + in order to become effective. + </para> + + <para> + The seventh variant of this command listed in the synopsis can change all of the publication properties specified in <xref linkend="sql-createpublication"/>. Properties not mentioned in the command retain their previous settings. @@ -98,6 +112,15 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r </varlistentry> <varlistentry> + <term><replaceable class="parameter">schema_name</replaceable></term> + <listitem> + <para> + Name of an existing schema. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><literal>SET ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term> <listitem> <para> @@ -142,6 +165,26 @@ ALTER PUBLICATION noinsert SET (publish = 'update, delete'); <programlisting> ALTER PUBLICATION mypublication ADD TABLE users, departments; </programlisting></para> + + <para> + Add some schemas to the publication: +<programlisting> +ALTER PUBLICATION sales_publication ADD SCHEMA marketing_june, sales_june; +</programlisting> + </para> + + <para> + Drop some schema from the publication: +<programlisting> +ALTER PUBLICATION production_quarterly_publication DROP SCHEMA production_july; +</programlisting> + </para> + + <para> + Set schema to the publication: +<programlisting> +ALTER PUBLICATION production_publication SET SCHEMA production_july; +</programlisting></para> </refsect1> <refsect1> diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index ff82fbc..09c079d 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -22,8 +22,9 @@ PostgreSQL documentation <refsynopsisdiv> <synopsis> CREATE PUBLICATION <replaceable class="parameter">name</replaceable> - [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ...] - | FOR ALL TABLES ] + [ FOR TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ] + | FOR SCHEMA <replaceable class="parameter">schema_name</replaceable> [, ... ] + | FOR ALL TABLES [ WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ] </synopsis> </refsynopsisdiv> @@ -100,6 +101,16 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable> </varlistentry> <varlistentry> + <term><literal>FOR SCHEMA</literal></term> + <listitem> + <para> + Marks the publication as one that replicates changes for the all tables in + the specified list of schemas, including tables created in the future. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><literal>WITH ( <replaceable class="parameter">publication_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] )</literal></term> <listitem> <para> @@ -222,6 +233,22 @@ CREATE PUBLICATION alltables FOR ALL TABLES; <programlisting> CREATE PUBLICATION insert_only FOR TABLE mydata WITH (publish = 'insert'); +</programlisting> + </para> + + <para> + Create a publication that publishes all changes for all the tables present in +production schema: +<programlisting> +CREATE PUBLICATION production_publication FOR SCHEMA production; +</programlisting> + </para> + + <para> + Create a publication that publishes all changes for all the tables present in +marketing and sales schemas: +<programlisting> +CREATE PUBLICATION sales_publication FOR SCHEMA marketing, sales; </programlisting></para> </refsect1> diff --git a/src/test/regress/expected/object_address.out b/src/test/regress/expected/object_address.out index 388097a..49ea22f 100644 --- a/src/test/regress/expected/object_address.out +++ b/src/test/regress/expected/object_address.out @@ -45,6 +45,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( -- suppress warning that depends on wal_level SET client_min_messages = 'ERROR'; CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; +CREATE PUBLICATION addr_pub_schema FOR SCHEMA addr_nsp; RESET client_min_messages; CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables @@ -428,6 +429,7 @@ WITH objects (type, name, args) AS (VALUES ('access method', '{btree}', '{}'), ('publication', '{addr_pub}', '{}'), ('publication relation', '{addr_nsp, gentable}', '{addr_pub}'), + ('publication schema', '{addr_nsp}', '{addr_pub_schema}'), ('subscription', '{regress_addr_sub}', '{}'), ('statistics object', '{addr_nsp, gentable_stat}', '{}') ) @@ -490,7 +492,8 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*, subscription | | regress_addr_sub | regress_addr_sub | t publication | | addr_pub | addr_pub | t publication relation | | | addr_nsp.gentable in publication addr_pub | t -(49 rows) + publication schema | | | addr_nsp in publication addr_pub_schema | t +(50 rows) --- --- Cleanup resources @@ -502,6 +505,7 @@ drop cascades to foreign table genftable drop cascades to server integer drop cascades to user mapping for regress_addr_user on server integer DROP PUBLICATION addr_pub; +DROP PUBLICATION addr_pub_schema; DROP SUBSCRIPTION regress_addr_sub; DROP SCHEMA addr_nsp CASCADE; NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 4052873..3349de7 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -255,7 +255,6 @@ DROP PUBLICATION testpub2; SET ROLE regress_publication_user; REVOKE CREATE ON DATABASE regression FROM regress_publication_user2; DROP TABLE testpub_parted; -DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default Publication testpub_default @@ -287,11 +286,208 @@ ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; testpub_default | regress_publication_user2 | f | t | t | t | f | f | f | f (1 row) +-- CREATE publication with schema +CREATE SCHEMA pub_test1; +CREATE SCHEMA pub_test2; +CREATE SCHEMA pub_test3; +CREATE TABLE pub_test1.tbl1 (id serial primary key, data text); +CREATE TABLE pub_test2.tbl1 (id serial primary key, data text); +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub1_forschema FOR SCHEMA pub_test1; +RESET client_min_messages; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub2_forschema FOR SCHEMA pub_test1, pub_test2, pub_test3; +RESET client_min_messages; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + "pub_test3" + +--- Check create publication on a schema that does not exist. +CREATE PUBLICATION testpub_forschema FOR SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +--- Check create publication on a object which is not schema. +CREATE PUBLICATION testpub1_forschema1 FOR SCHEMA testpub_view; +ERROR: schema "testpub_view" does not exist +-- Dropping the schema should reflect the change in publication. +DROP SCHEMA pub_test3; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +-- Renaming the schema should reflect the change in publication. +ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1_renamed" + "pub_test2" + +ALTER SCHEMA pub_test1_renamed RENAME to pub_test1; +\dRp+ testpub2_forschema + Publication testpub2_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +-- Alter publication add schema +ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test2; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +-- Add non existent schema +ALTER PUBLICATION testpub1_forschema ADD SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +-- Add a schema which is already added to the publication +ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test1; +ERROR: schema "pub_test1" is already member of publication "testpub1_forschema" +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +-- Alter publication drop schema +ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + +-- Drop schema that is not preset in the publication +ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2; +ERROR: schema "pub_test2" is not part of the publication +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + +-- Drop a schema that does not exist in the system +ALTER PUBLICATION testpub1_forschema DROP SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + +-- Drop all schemas +ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test1; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | f +(1 row) + +-- Alter publication set schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + +-- Alter publication set multiple schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +-- Alter publication set non-existent schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA non_existent_schema; +ERROR: schema "non_existent_schema" does not exist +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +-- Alter publication set it with the same schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2; +\dRp+ testpub1_forschema + Publication testpub1_forschema + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtables | Pubschemas +--------------------------+------------+---------+---------+---------+-----------+----------+-----------+------------ + regress_publication_user | f | t | t | t | t | f | f | t +Schemas: + "pub_test1" + "pub_test2" + +DROP VIEW testpub_view; DROP PUBLICATION testpub_default; DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpub_fortbl; +DROP PUBLICATION testpub1_forschema; +DROP PUBLICATION testpub2_forschema; DROP SCHEMA pub_test CASCADE; NOTICE: drop cascades to table pub_test.testpub_nopk +DROP SCHEMA pub_test1 CASCADE; +NOTICE: drop cascades to table pub_test1.tbl1 +DROP SCHEMA pub_test2 CASCADE; +NOTICE: drop cascades to table pub_test2.tbl1 RESET SESSION AUTHORIZATION; DROP ROLE regress_publication_user, regress_publication_user2; DROP ROLE regress_publication_user_dummy; diff --git a/src/test/regress/sql/object_address.sql b/src/test/regress/sql/object_address.sql index 2f4f66e..56d9b85 100644 --- a/src/test/regress/sql/object_address.sql +++ b/src/test/regress/sql/object_address.sql @@ -48,6 +48,7 @@ CREATE TRANSFORM FOR int LANGUAGE SQL ( -- suppress warning that depends on wal_level SET client_min_messages = 'ERROR'; CREATE PUBLICATION addr_pub FOR TABLE addr_nsp.gentable; +CREATE PUBLICATION addr_pub_schema FOR SCHEMA addr_nsp; RESET client_min_messages; CREATE SUBSCRIPTION regress_addr_sub CONNECTION '' PUBLICATION bar WITH (connect = false, slot_name = NONE); CREATE STATISTICS addr_nsp.gentable_stat ON a, b FROM addr_nsp.gentable; @@ -198,6 +199,7 @@ WITH objects (type, name, args) AS (VALUES ('access method', '{btree}', '{}'), ('publication', '{addr_pub}', '{}'), ('publication relation', '{addr_nsp, gentable}', '{addr_pub}'), + ('publication schema', '{addr_nsp}', '{addr_pub_schema}'), ('subscription', '{regress_addr_sub}', '{}'), ('statistics object', '{addr_nsp, gentable_stat}', '{}') ) @@ -215,6 +217,7 @@ SELECT (pg_identify_object(addr1.classid, addr1.objid, addr1.objsubid)).*, --- DROP FOREIGN DATA WRAPPER addr_fdw CASCADE; DROP PUBLICATION addr_pub; +DROP PUBLICATION addr_pub_schema; DROP SUBSCRIPTION regress_addr_sub; DROP SCHEMA addr_nsp CASCADE; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index d844075..ae23f9f 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -148,7 +148,6 @@ SET ROLE regress_publication_user; REVOKE CREATE ON DATABASE regression FROM regress_publication_user2; DROP TABLE testpub_parted; -DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default @@ -169,11 +168,97 @@ ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default +-- CREATE publication with schema +CREATE SCHEMA pub_test1; +CREATE SCHEMA pub_test2; +CREATE SCHEMA pub_test3; +CREATE TABLE pub_test1.tbl1 (id serial primary key, data text); +CREATE TABLE pub_test2.tbl1 (id serial primary key, data text); + +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub1_forschema FOR SCHEMA pub_test1; +RESET client_min_messages; +\dRp+ testpub1_forschema + +-- suppress warning that depends on wal_level +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub2_forschema FOR SCHEMA pub_test1, pub_test2, pub_test3; +RESET client_min_messages; +\dRp+ testpub2_forschema + +--- Check create publication on a schema that does not exist. +CREATE PUBLICATION testpub_forschema FOR SCHEMA non_existent_schema; + +--- Check create publication on a object which is not schema. +CREATE PUBLICATION testpub1_forschema1 FOR SCHEMA testpub_view; + +-- Dropping the schema should reflect the change in publication. +DROP SCHEMA pub_test3; +\dRp+ testpub2_forschema + +-- Renaming the schema should reflect the change in publication. +ALTER SCHEMA pub_test1 RENAME to pub_test1_renamed; +\dRp+ testpub2_forschema + +ALTER SCHEMA pub_test1_renamed RENAME to pub_test1; +\dRp+ testpub2_forschema + +-- Alter publication add schema +ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test2; +\dRp+ testpub1_forschema + +-- Add non existent schema +ALTER PUBLICATION testpub1_forschema ADD SCHEMA non_existent_schema; +\dRp+ testpub1_forschema + +-- Add a schema which is already added to the publication +ALTER PUBLICATION testpub1_forschema ADD SCHEMA pub_test1; +\dRp+ testpub1_forschema + +-- Alter publication drop schema +ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2; +\dRp+ testpub1_forschema + +-- Drop schema that is not preset in the publication +ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test2; +\dRp+ testpub1_forschema + +-- Drop a schema that does not exist in the system +ALTER PUBLICATION testpub1_forschema DROP SCHEMA non_existent_schema; +\dRp+ testpub1_forschema + +-- Drop all schemas +ALTER PUBLICATION testpub1_forschema DROP SCHEMA pub_test1; +\dRp+ testpub1_forschema + +-- Alter publication set schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1; +\dRp+ testpub1_forschema + +-- Alter publication set multiple schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2; +\dRp+ testpub1_forschema + +-- Alter publication set non-existent schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA non_existent_schema; +\dRp+ testpub1_forschema + +-- Alter publication set it with the same schema +ALTER PUBLICATION testpub1_forschema SET SCHEMA pub_test1, pub_test2; +\dRp+ testpub1_forschema + +DROP VIEW testpub_view; + DROP PUBLICATION testpub_default; DROP PUBLICATION testpib_ins_trunct; DROP PUBLICATION testpub_fortbl; +DROP PUBLICATION testpub1_forschema; +DROP PUBLICATION testpub2_forschema; DROP SCHEMA pub_test CASCADE; +DROP SCHEMA pub_test1 CASCADE; +DROP SCHEMA pub_test2 CASCADE; RESET SESSION AUTHORIZATION; DROP ROLE regress_publication_user, regress_publication_user2; diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index c20fadc..c715340 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 27; +use Test::More tests => 40; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -246,6 +246,135 @@ $node_publisher->safe_psql('postgres', "DROP TABLE temp2"); $node_subscriber->safe_psql('postgres', "DROP TABLE temp1"); $node_subscriber->safe_psql('postgres', "DROP TABLE temp2"); +# Test replication with publications created using FOR SCHEMA option. +# Create schemas and tables on publisher +$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch2"); +$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch3"); +$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.temp1 AS SELECT generate_series(1,10) AS a "); +$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.temp2 AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', "CREATE TABLE sch2.temp1 AS SELECT generate_series(1,10) AS a"); +$node_publisher->safe_psql('postgres', "CREATE TABLE sch2.temp2 AS SELECT generate_series(1,10) AS a"); + +# Create schemas and tables on subscriber +$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1"); +$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2"); +$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch3"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.temp1 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.temp2 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.temp1 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.temp2 (a int)"); + +# Setup logical replication for schema sch1 and sch2 that will only be used for +# this test +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_schema FOR SCHEMA sch1,sch2"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_schema CONNECTION '$publisher_connstr' PUBLICATION tap_pub_schema" + ); + +$node_publisher->wait_for_catchup('tap_sub_schema'); + +# Check the schema table data is synced up. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.temp1"); +is($result, qq(10|1|10), 'check rows on subscriber catchup'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.temp2"); +is($result, qq(10|1|10), 'check rows on subscriber catchup'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch2.temp1"); +is($result, qq(10|1|10), 'check rows on subscriber catchup'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch2.temp2"); +is($result, qq(10|1|10), 'check rows on subscriber catchup'); + +# Insert some data into few tables and verify that inserted data is replicated. +$node_publisher->safe_psql('postgres', "INSERT INTO sch1.temp1 VALUES(generate_series(11,20))"); +$node_publisher->safe_psql('postgres', "INSERT INTO sch2.temp1 VALUES(generate_series(11,20))"); + +$node_publisher->wait_for_catchup('tap_sub_schema'); + +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.temp1"); +is($result, qq(20|1|20), 'check rows on subscriber catchup'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch2.temp1"); +is($result, qq(20|1|20), 'check rows on subscriber catchup'); + +# Create new table in the publication schema, verify that subscriber does not get +# the new table data in the subscriber before refresh. +$node_publisher->safe_psql('postgres', "CREATE TABLE SCH1.temp3 AS SELECT generate_series(1,10) AS a"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE SCH1.temp3(a INT)"); +$node_publisher->wait_for_catchup('tap_sub_schema'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM sch1.temp3"); +is($result, qq(0), 'check rows on subscriber catchup'); + +# Table data shsould be reflected after refreshing the publication in +# subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); +$node_publisher->safe_psql('postgres', "INSERT INTO SCH1.temp3 VALUES(11)"); +$node_publisher->wait_for_catchup('tap_sub_schema'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.temp3"); +is($result, qq(11|1|11), 'check rows on subscriber catchup'); + +# Set the schema of a publication schema table to a non publication schema and +# verify that inserted data is not reflected by the subscriber. +$node_publisher->safe_psql('postgres', "ALTER TABLE SCH1.temp3 SET SCHEMA SCH3"); +$node_publisher->safe_psql('postgres', "INSERT INTO SCH3.temp3 VALUES(11)"); +$node_publisher->wait_for_catchup('tap_sub_schema'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM sch1.temp3"); +is($result, qq(11|1|11), 'check rows on subscriber catchup'); + +# Verify that the subscription relation list is updated after refresh. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"); +is($result, qq(5), + 'check subscription relation status was dropped on subscriber'); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"); +is($result, qq(4), + 'check subscription relation status was dropped on subscriber'); + + +# Drop table from the publication schema, verify that subscriber removes the +# table entry after refresh. +$node_publisher->safe_psql('postgres', "DROP TABLE SCH1.temp2"); +$node_publisher->wait_for_catchup('tap_sub_schema'); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"); +is($result, qq(4), + 'check subscription relation status is not yet dropped on subscriber'); + +# Table should be removed from pg_subscription_rel after refreshing the +# publication in subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_schema REFRESH PUBLICATION"); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_subscription_rel WHERE srsubid IN (SELECT oid FROM pg_subscription WHERE subname = 'tap_sub_schema')"); +is($result, qq(3), + 'check subscription relation status was dropped on subscriber'); + +# Drop subscription as we don't need it anymore +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_schema"); + +# Drop publications as we don't need them anymore +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_schema"); + +# Clean up the tables on both publisher and subscriber as we don't need them +$node_publisher->safe_psql('postgres', "DROP SCHEMA sch1 cascade"); +$node_publisher->safe_psql('postgres', "DROP SCHEMA sch2 cascade"); +$node_publisher->safe_psql('postgres', "DROP SCHEMA sch3 cascade"); +$node_subscriber->safe_psql('postgres', "DROP SCHEMA sch1 cascade"); +$node_subscriber->safe_psql('postgres', "DROP SCHEMA sch2 cascade"); +$node_subscriber->safe_psql('postgres', "DROP SCHEMA sch3 cascade"); + # add REPLICA IDENTITY FULL so we can update $node_publisher->safe_psql('postgres', "ALTER TABLE tab_full REPLICA IDENTITY FULL"); -- 1.8.3.1