This implements the basics to do replication from a single catalog on a
catalog-enabled master ("the cloud") to a (not necessarily catalog-enabled)
slave ("on-premise"). Such slave must only see its own events, eg. events
from the catalog of the connecting user.

Changes done:

 - Extend the GTID event with the catalog (a follow-up patch will change
   replication slave-side to use this when applying events).

 - In the dump thread, when a catalog user does a binlog dump (eg.
   connecting slave), filter events so only events from that catalog are
   sent.

 - Extend the server version of the connector library to be able to connect
   to a specific catalog.

 - Add a Master_catalog option to CHANGE MASTER to make a slave connect to a
   user in a specific catalog.

Open issues:

 - Possibly there needs to be a way for the catalog superuser to control
   which catalogs allow the COM_BINLOG_DUMP command.

 - In GTID replication, how in the single-catalog slave to maintain the GTID
   position in domains that it does not receive events from. One option is
   to update the GTID position (eg. with the GTID_LIST events) in the other
   domains; this way the slave will have the correct global GTID position,
   but it will not be possible to use multi-source to replicate two catalogs
   individually to the same server. Another option is to binlog each catalog
   in their own domain(s) and do master-side filtering on domain_id; then
   the master will ignore irrelevant domains for a connecting catalog slave
   (and not report "position too old" in those domains), and the slave will
   only have its own domains in its GTID position.

Signed-off-by: Kristian Nielsen <kniel...@knielsen-hq.org>
---
 include/mysql.h                            |   7 +-
 include/sql_common.h                       |   3 +-
 mysql-test/suite/rpl/r/rpl_catalogs.result | 137 +++++++++++++++++++++
 mysql-test/suite/rpl/t/rpl_catalogs.test   | 119 ++++++++++++++++++
 sql-common/client.c                        |  36 +++++-
 sql/lex.h                                  |   1 +
 sql/lex_string.h                           |   2 +-
 sql/log_event.cc                           |  20 +++
 sql/log_event.h                            |  13 +-
 sql/log_event_server.cc                    |  88 ++++++++++++-
 sql/privilege.h                            |   7 +-
 sql/rpl_mi.cc                              |  15 ++-
 sql/rpl_mi.h                               |   1 +
 sql/slave.cc                               |   3 +
 sql/sql_lex.h                              |   4 +-
 sql/sql_repl.cc                            |  37 ++++++
 sql/sql_yacc.yy                            |   6 +
 17 files changed, 483 insertions(+), 16 deletions(-)
 create mode 100644 mysql-test/suite/rpl/r/rpl_catalogs.result
 create mode 100644 mysql-test/suite/rpl/t/rpl_catalogs.test

diff --git a/include/mysql.h b/include/mysql.h
index a66dcc7bd02..44ca2099694 100644
--- a/include/mysql.h
+++ b/include/mysql.h
@@ -189,14 +189,15 @@ enum mysql_option
   /* MariaDB options */
   MYSQL_PROGRESS_CALLBACK=5999,
   MYSQL_OPT_NONBLOCK,
-  MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY
+  MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY,
+  MARIADB_OPT_CATALOG=7029
 };
 
 /**
   @todo remove the "extension", move st_mysql_options completely
   out of mysql.h
 */
-struct st_mysql_options_extention; 
+struct st_mysql_options_extension;
 
 struct st_mysql_options {
   unsigned int connect_timeout, read_timeout, write_timeout;
@@ -231,7 +232,7 @@ struct st_mysql_options {
   void (*local_infile_end)(void *);
   int (*local_infile_error)(void *, char *, unsigned int);
   void *local_infile_userdata;
-  struct st_mysql_options_extention *extension;
+  struct st_mysql_options_extension *extension;
 };
 
 enum mysql_status 
diff --git a/include/sql_common.h b/include/sql_common.h
index ad5ab7e19af..c261ee9fe94 100644
--- a/include/sql_common.h
+++ b/include/sql_common.h
@@ -28,7 +28,7 @@ extern const char     *cant_connect_sqlstate;
 extern const char      *not_error_sqlstate;
 
 
-struct st_mysql_options_extention {
+struct st_mysql_options_extension {
   char *plugin_dir;
   char *default_auth;
   char *ssl_crl;                               /* PEM CRL file */
@@ -41,6 +41,7 @@ struct st_mysql_options_extention {
                           uint proc_info_length);
   HASH connection_attributes;
   size_t connection_attributes_length;
+  char *catalog;
 };
 
 typedef struct st_mysql_methods
diff --git a/mysql-test/suite/rpl/r/rpl_catalogs.result 
b/mysql-test/suite/rpl/r/rpl_catalogs.result
new file mode 100644
index 00000000000..4b25fd20e6c
--- /dev/null
+++ b/mysql-test/suite/rpl/r/rpl_catalogs.result
@@ -0,0 +1,137 @@
+include/master-slave.inc
+[connection master]
+*** Create some catalogs to work with.
+connection master_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+connection slave_config;
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+*** Replicate a few events full-catalogs -> full-catalogs
+connection master_config;
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+sync_slave_with_master slave;
+connection slave_config;
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Create a normal slave user to replicate single catalog
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+master_catalog="bar";
+connection master_config;
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY 
"bar_pw";
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a      b
+1      1
+2      4
+3      9
+4      16
+5      25
+6      0
+7      1
+8      2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+10
+20
+30
+USE CATALOG def;
+use test;
+connect con1,localhost,rpl_bar,bar_pw,bar.db2;
+SELECT * FROM t2 ORDER BY a;
+a      b
+1      1
+2      4
+3      9
+4      16
+5      25
+6      0
+7      1
+8      2
+disconnect con1;
+connection master;
+save_master_pos;
+connection slave_config;
+include/start_slave.inc
+connection slave;
+sync_with_master;
+connection slave_config;
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+a      b
+1      1
+2      4
+3      9
+4      16
+5      25
+6      0
+7      1
+8      2
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+a
+1
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+a
+1
+*** Clean up.
+connection slave_config;
+include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+include/start_slave.inc
+connection master_config;
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+connection master;
+include/rpl_end.inc
diff --git a/mysql-test/suite/rpl/t/rpl_catalogs.test 
b/mysql-test/suite/rpl/t/rpl_catalogs.test
new file mode 100644
index 00000000000..939e68b920d
--- /dev/null
+++ b/mysql-test/suite/rpl/t/rpl_catalogs.test
@@ -0,0 +1,119 @@
+--source include/have_catalogs.inc
+--source include/master-slave.inc
+
+--echo *** Create some catalogs to work with.
+--connection master_config
+# ToDo: It seems CREATE CATALOG cannot replicate currently.
+# So don't binlog it for now.
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+--connection slave_config
+SET SESSION sql_log_bin= 0;
+CREATE CATALOG foo;
+CREATE CATALOG bar;
+CREATE CATALOG baz;
+SET SESSION sql_log_bin= 1;
+
+
+--echo *** Replicate a few events full-catalogs -> full-catalogs
+
+--connection master_config
+USE CATALOG foo;
+CREATE DATABASE db1;
+use db1;
+CREATE TABLE t1 (a INT PRIMARY KEY);
+INSERT INTO t1 VALUES (1);
+
+USE CATALOG baz;
+CREATE DATABASE db3;
+use db3;
+CREATE TABLE t3 (a INT PRIMARY KEY);
+INSERT INTO t3 VALUES (1);
+
+--sync_slave_with_master slave
+
+--connection slave_config
+USE CATALOG foo;
+use db1;
+SELECT * FROM t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+
+--echo *** Create a normal slave user to replicate single catalog
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='rpl_bar', master_password='bar_pw',
+  master_catalog="bar";
+
+--connection master_config
+USE CATALOG bar;
+CREATE USER rpl_bar@localhost;
+GRANT replication slave, select ON *.* TO rpl_bar@localhost IDENTIFIED BY 
"bar_pw";
+
+CREATE DATABASE db2;
+use db2;
+CREATE TABLE t2 (a INT PRIMARY KEY, b INT);
+INSERT INTO t2(a) VALUES (1), (2), (3), (4), (5);
+INSERT INTO t2 VALUES (6, 0), (7, 1), (8, 2);
+
+# Do something in other catalogs, see that it is not replicated.
+USE CATALOG foo;
+INSERT INTO db1.t1 VALUES (10), (20), (30);
+USE CATALOG baz;
+INSERT INTO db3.t3 VALUES (10), (20), (30);
+
+USE CATALOG bar;
+use db2;
+UPDATE t2 SET b=a*a WHERE b IS NULL;
+
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+USE CATALOG def;
+use test;
+
+connect(con1,localhost,rpl_bar,bar_pw,bar.db2);
+SELECT * FROM t2 ORDER BY a;
+--disconnect con1
+
+--connection master
+--save_master_pos
+
+--connection slave_config
+--source include/start_slave.inc
+--connection slave
+--sync_with_master
+
+--connection slave_config
+USE CATALOG bar;
+SELECT * FROM db2.t2 ORDER BY a;
+USE CATALOG foo;
+SELECT * FROM db1.t1 ORDER BY a;
+USE CATALOG baz;
+SELECT * FROM db3.t3 ORDER BY a;
+
+--echo *** Clean up.
+
+--connection slave_config
+--source include/stop_slave.inc
+CHANGE MASTER TO master_user='root', master_password='', master_catalog='';
+--source include/start_slave.inc
+
+--connection master_config
+USE CATALOG def;
+use test;
+DROP CATALOG foo;
+DROP CATALOG bar;
+DROP CATALOG baz;
+
+--connection master
+--source include/rpl_end.inc
diff --git a/sql-common/client.c b/sql-common/client.c
index ba804ce2a7c..711f8733c9f 100644
--- a/sql-common/client.c
+++ b/sql-common/client.c
@@ -99,6 +99,7 @@ extern my_bool using_catalogs;
 
 
 #define CONNECT_TIMEOUT 0
+#define MAX_CATALOG_NAME 65   /* Including terminating '\0' */
 
 #include "client_settings.h"
 #include <ssl_compat.h>
@@ -836,9 +837,9 @@ static int add_init_command(struct st_mysql_options 
*options, const char *cmd)
 
 
 #define ALLOCATE_EXTENSIONS(OPTS)                                \
-      (OPTS)->extension= (struct st_mysql_options_extention *)   \
+      (OPTS)->extension= (struct st_mysql_options_extension *)   \
         my_malloc(key_memory_mysql_options,                      \
-                  sizeof(struct st_mysql_options_extention),     \
+                  sizeof(struct st_mysql_options_extension),     \
                   MYF(MY_WME | MY_ZEROFILL))                     \
 
 
@@ -2089,7 +2090,8 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
     see end= buff+32 below, fixed size of the packet is 32 bytes.
      +9 because data is a length encoded binary where meta data size is max 9.
   */
-  buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN + 
connect_attrs_len + 9;
+  buff_size= 33 + USERNAME_LENGTH + data_len + 9 + NAME_LEN + NAME_LEN +
+    connect_attrs_len + 9 + MAX_CATALOG_NAME;
   buff= my_alloca(buff_size);
 
   mysql->client_flag|= mysql->options.client_flag;
@@ -2122,10 +2124,19 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
   if (mysql->client_flag & CLIENT_PROTOCOL_41)
   {
     /* 4.1 server and 4.1 client has a 32 byte option flag */
+    if (!(mysql->server_capabilities & CLIENT_MYSQL))
+      mysql->client_flag&= ~CLIENT_MYSQL;
     int4store(buff,mysql->client_flag);
     int4store(buff+4, net->max_packet_size);
     buff[8]= (char) mysql->charset->number;
     bzero(buff+9, 32-9);
+    if (!(mysql->server_capabilities & CLIENT_MYSQL))
+    {
+      /* ToDo: Should this check if the server has the catalog capability? */
+      uint client_extended_cap= mysql->options.extension->catalog ?
+        (uint)((MARIADB_CLIENT_CONNECT_CATALOG) >> 32) : (uint)0;
+      int4store(buff + 28, client_extended_cap);
+    }
     end= buff+32;
   }
   else
@@ -2274,6 +2285,17 @@ static int send_client_reply_packet(MCPVIO_EXT *mpvio,
 
   end= (char *) send_client_connect_attrs(mysql, (uchar *) end);
 
+  /* Add catalog */
+  if (mysql->options.extension->catalog)
+  {
+    size_t len= strlen(mysql->options.extension->catalog);
+    if (len >= MAX_CATALOG_NAME)
+      len= MAX_CATALOG_NAME - 1;
+    end= (char*)write_length_encoded_string4(
+                     (uchar*)end, buff_size - (end - buff),
+                     (uchar *)mysql->options.extension->catalog, len);
+  }
+
   /* Write authentication package */
   if (my_net_write(net, (uchar*) buff, (size_t) (end-buff)) || net_flush(net))
   {
@@ -3332,6 +3354,7 @@ static void mysql_close_free_options(MYSQL *mysql)
     my_free(mysql->options.extension->plugin_dir);
     my_free(mysql->options.extension->default_auth);
     my_hash_free(&mysql->options.extension->connection_attributes);
+    my_free(mysql->options.extension->catalog);
     my_free(mysql->options.extension);
   }
   bzero((char*) &mysql->options,sizeof(mysql->options));
@@ -3850,9 +3873,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, 
const void *arg)
     break;
   case MYSQL_PROGRESS_CALLBACK:
     if (!mysql->options.extension)
-      mysql->options.extension= (struct st_mysql_options_extention *)
+      mysql->options.extension= (struct st_mysql_options_extension *)
         my_malloc(key_memory_mysql_options,
-                  sizeof(struct st_mysql_options_extention),
+                  sizeof(struct st_mysql_options_extension),
                   MYF(MY_WME | MY_ZEROFILL));
     if (mysql->options.extension)
       mysql->options.extension->report_progress= 
@@ -3918,6 +3941,9 @@ mysql_options(MYSQL *mysql,enum mysql_option option, 
const void *arg)
       }
     }
     break;
+  case MARIADB_OPT_CATALOG:
+    EXTENSION_SET_STRING(&mysql->options, catalog, arg);
+    break;
   case MYSQL_SHARED_MEMORY_BASE_NAME:
   default:
     DBUG_RETURN(1);
diff --git a/sql/lex.h b/sql/lex.h
index bf0223ce544..9771c0e39ba 100644
--- a/sql/lex.h
+++ b/sql/lex.h
@@ -367,6 +367,7 @@ SYMBOL symbols[] = {
   { "LOOP",             SYM(LOOP_SYM)},
   { "LOW_PRIORITY",    SYM(LOW_PRIORITY)},
   { "MASTER",           SYM(MASTER_SYM)},
+  { "MASTER_CATALOG",   SYM(MASTER_CATALOG_SYM)},
   { "MASTER_CONNECT_RETRY",           SYM(MASTER_CONNECT_RETRY_SYM)},
   { "MASTER_DELAY",     SYM(MASTER_DELAY_SYM)},
   { "MASTER_GTID_POS",  SYM(MASTER_GTID_POS_SYM)},
diff --git a/sql/lex_string.h b/sql/lex_string.h
index e7a732346c4..f49e567c64c 100644
--- a/sql/lex_string.h
+++ b/sql/lex_string.h
@@ -71,7 +71,7 @@ static inline bool lex_string_cmp(CHARSET_INFO *charset, 
const LEX_CSTRING *a,
 }
 
 /*
-  Compare to LEX_CSTRING's and return 0 if equal
+  Compare two LEX_CSTRING's and return 0 if equal
 */
 
 static inline bool cmp(const LEX_CSTRING *a, const LEX_CSTRING *b)
diff --git a/sql/log_event.cc b/sql/log_event.cc
index 992054c7d17..004010e9d99 100644
--- a/sql/log_event.cc
+++ b/sql/log_event.cc
@@ -2448,6 +2448,26 @@ Gtid_log_event::Gtid_log_event(const uchar *buf, uint 
event_len,
       sa_seq_no= uint8korr(buf);
       buf+= 8;
     }
+    if (flags_extra & FL_CATALOG)
+    {
+      if (unlikely(buf - buf_0) >= event_len)
+      {
+        seq_no= 0;
+        return;
+      }
+      uint32_t cat_len= *buf++;
+      if (unlikely(cat_len > MAX_CATALOG_NAME) ||
+          unlikely(buf - buf_0 + cat_len) >= event_len)
+      {
+        seq_no= 0;
+        return;
+      }
+      memcpy(cat_name_buf, buf, cat_len);
+      cat_name_int.str= cat_name_buf;
+      cat_name_int.length= cat_len;
+      cat_name= &cat_name_int;
+      buf+= cat_len;
+    }
   }
   /*
     the strict '<' part of the assert corresponds to extra zero-padded
diff --git a/sql/log_event.h b/sql/log_event.h
index bd318f147d7..2f866efca7c 100644
--- a/sql/log_event.h
+++ b/sql/log_event.h
@@ -3250,13 +3250,16 @@ class Gtid_log_event: public Log_event
 public:
   uint64 seq_no;
   uint64 commit_id;
-  uint32 domain_id;
   uint64 sa_seq_no;   // start alter identifier for CA/RA
+  const LEX_CSTRING *cat_name;  // Points either to catalog object or own 
buffer
 #ifdef MYSQL_SERVER
   event_xid_t xid;
 #else
   event_mysql_xid_t xid;
 #endif
+  LEX_CSTRING cat_name_int;
+  uint32 domain_id;
+  char cat_name_buf[MAX_CATALOG_NAME];
   uchar flags2;
   /*
     More flags area placed after the regular flags2's area. The type
@@ -3309,11 +3312,15 @@ class Gtid_log_event: public Log_event
     FL_EXTRA_MULTI_ENGINE_E1 is set for event group comprising a transaction
     involving multiple storage engines. No flag and extra data are added
     to the event when the transaction involves only one engine.
+
+    FL_CATALOG is set when a catalog name is included in the GTID (happens
+    when not the default catalog).
   */
   static const uchar FL_EXTRA_MULTI_ENGINE_E1= 1;
   static const uchar FL_START_ALTER_E1= 2;
   static const uchar FL_COMMIT_ALTER_E1= 4;
   static const uchar FL_ROLLBACK_ALTER_E1= 8;
+  static const uchar FL_CATALOG= 16;
 
 #ifdef MYSQL_SERVER
   Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool 
standalone,
@@ -3346,6 +3353,10 @@ class Gtid_log_event: public Log_event
                    enum enum_binlog_checksum_alg checksum_alg,
                    uint32 *domain_id, uint32 *server_id, uint64 *seq_no,
                    uchar *flags2, const Format_description_log_event *fdev);
+  static bool peek_catalog(const uchar *event_start, size_t event_len,
+                           const Format_description_log_event *fdev,
+                           enum enum_binlog_checksum_alg checksum_alg,
+                           uchar *out_flags2, LEX_CSTRING *out_catname);
 #endif
 };
 
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc
index ff3a5bda8e5..22ad0a8b4a6 100644
--- a/sql/log_event_server.cc
+++ b/sql/log_event_server.cc
@@ -2936,6 +2936,13 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 
seq_no_arg,
     if (extra_engines > 0)
       flags_extra|= FL_EXTRA_MULTI_ENGINE_E1;
   }
+  const SQL_CATALOG *cat= thd_arg->catalog;
+  if (cat != default_catalog())
+  {
+    flags_extra|= FL_CATALOG;
+    cat_name= &cat->name;
+  }
+
   if (thd->get_binlog_flags_for_alter())
   {
     flags_extra |= thd->get_binlog_flags_for_alter();
@@ -2982,10 +2989,80 @@ Gtid_log_event::peek(const uchar *event_start, size_t 
event_len,
 }
 
 
+/*
+  Obtain the catalog (if any) in the GTID (without constructing the full
+  object).
+
+  This is a separate function from Gtid_log_event::peek(), since this function
+  needs to do a lot of parsing of flags etc. to know where the catalog is, and
+  this overhead is not wanted in the often-used Gtid_log_event::peek(). But if
+  more peek-functionality would be needed in the future, it could make sense to
+  add it to this function which already has the parsing overhead.
+
+  Returns true if error (malformed or short event), false if ok. Returns the
+  name of the default catalog if catalog is not included explicitly in the 
GTID.
+
+  Note that the returned out_catname will point into the passed-in packet
+  memory, so will only be valid as long as the packet memory is!
+*/
+bool
+Gtid_log_event::peek_catalog(const uchar *event_start, size_t event_len,
+                             const Format_description_log_event *fdev,
+                             enum enum_binlog_checksum_alg checksum_alg,
+                             uchar *out_flags2, LEX_CSTRING *out_catname)
+{
+  if (checksum_alg == BINLOG_CHECKSUM_ALG_CRC32)
+  {
+    if (event_len > BINLOG_CHECKSUM_LEN)
+      event_len-= BINLOG_CHECKSUM_LEN;
+    else
+      event_len= 0;
+  }
+  else
+    DBUG_ASSERT(checksum_alg == BINLOG_CHECKSUM_ALG_UNDEF ||
+                checksum_alg == BINLOG_CHECKSUM_ALG_OFF);
+
+  if (event_len < (uint32)fdev->common_header_len + GTID_HEADER_LEN)
+    return true;
+  const uchar *p= event_start + fdev->common_header_len;
+  const uchar *p_end= event_start + event_len;
+  uchar flags2= *out_flags2= p[12];
+  p+= 13;    /* seq_no, domain_id, and flags2. */
+  if (flags2 & FL_GROUP_COMMIT_ID)
+    p+= 8;
+  if (flags2 & (FL_PREPARED_XA | FL_COMPLETED_XA))
+  {
+    if (p + 6 > p_end)
+      return true;
+    p+= 6 + p[4] + p[5];
+  }
+  uchar flags_extra;
+  if (p >= p_end || !((flags_extra= *p) & FL_CATALOG))
+  {
+    *out_catname= default_catalog_name;
+    return false;
+  }
+  ++p;
+
+  if (flags_extra & FL_EXTRA_MULTI_ENGINE_E1)
+    ++p;
+  if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1))
+    p+= 8;
+
+  uchar cat_len;
+  if (p >= p_end || (p + (cat_len= *p)) >= p_end)
+    return true;
+  out_catname->str= (const char *)p+1;
+  out_catname->length= cat_len;
+
+  return false;
+}
+
+
 bool
 Gtid_log_event::write()
 {
-  uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 1+4];
+  uchar buf[GTID_HEADER_LEN+2+sizeof(XID) + /* flags_extra: */ 
1+1+8+MAX_CATALOG_NAME];
   size_t write_len= 13;
 
   int8store(buf, seq_no);
@@ -3026,6 +3103,15 @@ Gtid_log_event::write()
     write_len+= 8;
   }
 
+  if (flags_extra & FL_CATALOG)
+  {
+    uint32_t cat_len= std::min(cat_name->length, (size_t)(MAX_CATALOG_NAME-1));
+    DBUG_ASSERT(cat_name->length <= MAX_CATALOG_NAME-1);
+    buf[write_len++]= cat_len;
+    memcpy(buf + write_len, cat_name->str, cat_len);
+    write_len+= cat_len;
+  }
+
   if (write_len < GTID_HEADER_LEN)
   {
     bzero(buf+write_len, GTID_HEADER_LEN-write_len);
diff --git a/sql/privilege.h b/sql/privilege.h
index 953ffb177c1..16d7f91f27d 100644
--- a/sql/privilege.h
+++ b/sql/privilege.h
@@ -305,7 +305,12 @@ constexpr privilege_t CATALOG_ACLS=
   CATALOG_ACL |
   SHUTDOWN_ACL |
   CREATE_TABLESPACE_ACL |
-  REPL_SLAVE_ACL |
+  /*
+    ToDo: REPL_SLAVE_ACL is needed to be able to replicate from a single
+    catalog to an on-premise slave. However, we may need a way for the catalog
+    superuser to control replication access for a catalog.
+  */
+//  REPL_SLAVE_ACL |
   BINLOG_ADMIN_ACL |
   BINLOG_MONITOR_ACL |
 //  BINLOG_REPLAY_ACL |
diff --git a/sql/rpl_mi.cc b/sql/rpl_mi.cc
index 3c698f27a19..6a68254052a 100644
--- a/sql/rpl_mi.cc
+++ b/sql/rpl_mi.cc
@@ -47,6 +47,7 @@ Master_info::Master_info(LEX_CSTRING *connection_name_arg,
 {
   char *tmp;
   host[0] = 0; user[0] = 0; password[0] = 0;
+  catalog_name[0]= 0;
   ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
   ssl_cipher[0]= 0; ssl_key[0]= 0;
   ssl_crl[0]= 0; ssl_crlpath[0]= 0;
@@ -644,6 +645,17 @@ file '%s')", fname);
             }
             seen_ignore_domain_ids= true;
           }
+          else if (got_eq && !strcmp(buf, "master_catalog"))
+          {
+            if (init_strvar_from_file(mi->catalog_name,
+                                      sizeof(mi->catalog_name),
+                                      &mi->file, ""))
+
+            {
+              sql_print_error("Failed to initialize master info 
do_domain_ids");
+              goto errwithmsg;
+            }
+          }
           else if (!got_eq && !strcmp(buf, "END_MARKER"))
           {
             /*
@@ -817,6 +829,7 @@ int flush_master_info(Master_info* mi,
               "using_gtid=%d\n"
               "do_domain_ids=%s\n"
               "ignore_domain_ids=%s\n"
+              "master_catalog=%s\n"
               "END_MARKER\n",
               LINES_IN_MASTER_INFO,
               mi->master_log_name, llstr(mi->master_log_pos, lbuf),
@@ -827,7 +840,7 @@ int flush_master_info(Master_info* mi,
               heartbeat_buf, "", ignore_server_ids_buf,
               "", 0,
               mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid,
-              do_domain_ids_buf, ignore_domain_ids_buf);
+              do_domain_ids_buf, ignore_domain_ids_buf, mi->catalog_name);
   err= flush_io_cache(file);
   if (sync_masterinfo_period && !err &&
       ++(mi->sync_counter) >= sync_masterinfo_period)
diff --git a/sql/rpl_mi.h b/sql/rpl_mi.h
index 6058b7fb34c..98f26a4d050 100644
--- a/sql/rpl_mi.h
+++ b/sql/rpl_mi.h
@@ -213,6 +213,7 @@ class Master_info : public Slave_reporting_capability
   /* the variables below are needed because we can change masters on the fly */
   char master_log_name[FN_REFLEN+6]; /* Room for multi-*/
   char host[HOSTNAME_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
+  char catalog_name[MAX_CATALOG_NAME];
   char user[USERNAME_LENGTH+1];
   char password[MAX_PASSWORD_LENGTH*SYSTEM_CHARSET_MBMAXLEN+1];
   LEX_CSTRING connection_name;                 /* User supplied connection 
name */
diff --git a/sql/slave.cc b/sql/slave.cc
index 28b183fbd69..02f58777732 100644
--- a/sql/slave.cc
+++ b/sql/slave.cc
@@ -7140,6 +7140,9 @@ static int connect_to_master(THD* thd, MYSQL* mysql, 
Master_info* mi,
   if (opt_plugin_dir_ptr && *opt_plugin_dir_ptr)
     mysql_options(mysql, MYSQL_PLUGIN_DIR, opt_plugin_dir_ptr);
 
+  if (mi->catalog_name[0])
+    mysql_options(mysql, MARIADB_OPT_CATALOG, mi->catalog_name);
+
   /* we disallow empty users */
   if (mi->user[0] == 0)
   {
diff --git a/sql/sql_lex.h b/sql/sql_lex.h
index adcf45aea93..2d540c948a2 100644
--- a/sql/sql_lex.h
+++ b/sql/sql_lex.h
@@ -487,7 +487,7 @@ struct LEX_MASTER_INFO
   DYNAMIC_ARRAY repl_ignore_server_ids;
   DYNAMIC_ARRAY repl_do_domain_ids;
   DYNAMIC_ARRAY repl_ignore_domain_ids;
-  const char *host, *user, *password, *log_file_name;
+  const char *host, *catalog, *user, *password, *log_file_name;
   const char *ssl_key, *ssl_cert, *ssl_ca, *ssl_capath, *ssl_cipher;
   const char *ssl_crl, *ssl_crlpath;
   const char *relay_log_name;
@@ -533,7 +533,7 @@ struct LEX_MASTER_INFO
       delete_dynamic(&repl_ignore_domain_ids);
     }
 
-    host= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
+    host= catalog= user= password= log_file_name= ssl_key= ssl_cert= ssl_ca=
       ssl_capath= ssl_cipher= ssl_crl= ssl_crlpath= relay_log_name= NULL;
     pos= relay_log_pos= server_id= port= connect_retry= 0;
     heartbeat_period= 0;
diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc
index dc27ab9ff8b..c3c6eadc0b3 100644
--- a/sql/sql_repl.cc
+++ b/sql/sql_repl.cc
@@ -129,6 +129,7 @@ struct binlog_send_info {
   slave_connection_state *until_gtid_state;
   slave_connection_state until_gtid_state_obj;
   Format_description_log_event *fdev;
+  const SQL_CATALOG *catalog_filter;
   int mariadb_slave_capability;
   enum_gtid_skip_type gtid_skip_group;
   enum_gtid_until_state gtid_until_group;
@@ -167,6 +168,7 @@ struct binlog_send_info {
                    char *lfn)
     : thd(thd_arg), net(&thd_arg->net), packet(packet_arg),
       log_file_name(lfn), until_gtid_state(NULL), fdev(NULL),
+      catalog_filter(NULL),
       gtid_skip_group(GTID_SKIP_NOT), gtid_until_group(GTID_UNTIL_NOT_DONE),
       flags(flags_arg), current_checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF),
       slave_gtid_strict_mode(false), send_fake_gtid_list(false),
@@ -459,6 +461,17 @@ inline void fix_checksum(enum_binlog_checksum_alg 
checksum_alg, String *packet,
 }
 
 
+static const SQL_CATALOG *get_catalog_filter(THD *thd)
+{
+  if (!using_catalogs)
+    return nullptr;
+  if ((thd->security_ctx->master_access & CATALOG_ACL) &&
+      thd->catalog == default_catalog())
+    return nullptr;
+  return thd->catalog;
+}
+
+
 static user_var_entry * get_binlog_checksum_uservar(THD * thd)
 {
   LEX_CSTRING name=  { STRING_WITH_LEN("master_binlog_checksum")};
@@ -1751,6 +1764,26 @@ send_event_to_slave(binlog_send_info *info, 
Log_event_type event_type,
     }
   }
 
+  if (info->catalog_filter && event_type == GTID_EVENT)
+  {
+    uchar flags2;
+    LEX_CSTRING cat_name;
+    if (ev_offset > len || Gtid_log_event::peek_catalog(
+          (uchar*) packet->ptr()+ev_offset, len - ev_offset,
+          info->fdev, current_checksum_alg, &flags2, &cat_name))
+    {
+      info->error= ER_MASTER_FATAL_ERROR_READING_BINLOG;
+      return "Failed to read Gtid_log_event: corrupt binlog";
+    }
+
+    if (cmp(&info->catalog_filter->name, &cat_name))
+    {
+      /* Skip this event group as it doesn't match the user's catalog. */
+      info->gtid_skip_group= (flags2 & Gtid_log_event::FL_STANDALONE ?
+                              GTID_SKIP_STANDALONE : GTID_SKIP_TRANSACTION);
+    }
+  }
+
   /* Skip GTID event groups until we reach slave position within a domain_id. 
*/
   if (event_type == GTID_EVENT && info->using_gtid_state)
   {
@@ -2128,6 +2161,7 @@ static int init_binlog_sender(binlog_send_info *info,
   /** init last pos */
   info->last_pos= *pos;
 
+  info->catalog_filter= get_catalog_filter(thd);
   info->current_checksum_alg= get_binlog_checksum_value_at_connect(thd);
   info->mariadb_slave_capability= get_mariadb_slave_capability(thd);
   info->using_gtid_state= get_slave_connect_state(thd, &connect_gtid_state);
@@ -3729,6 +3763,9 @@ bool change_master(THD* thd, Master_info* mi, bool 
*master_info_added)
 
   if (get_string_parameter(mi->host, lex_mi->host, sizeof(mi->host)-1,
                            "MASTER_HOST", system_charset_info) ||
+      get_string_parameter(mi->catalog_name, lex_mi->catalog,
+                           sizeof(mi->catalog_name)-1, "MASTER_CATALOG",
+                           system_charset_info) ||
       get_string_parameter(mi->user, lex_mi->user, sizeof(mi->user)-1,
                            "MASTER_USER", system_charset_info) ||
       get_string_parameter(mi->password, lex_mi->password,
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 98e18d32e0f..8e3c93cbe51 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -923,6 +923,7 @@ bool my_yyoverflow(short **a, YYSTYPE **b, size_t 
*yystacksize);
 %token  <kwd>  LOCKS_SYM
 %token  <kwd>  LOGFILE_SYM
 %token  <kwd>  LOGS_SYM
+%token  <kwd>  MASTER_CATALOG_SYM
 %token  <kwd>  MASTER_CONNECT_RETRY_SYM
 %token  <kwd>  MASTER_DELAY_SYM
 %token  <kwd>  MASTER_GTID_POS_SYM
@@ -2126,6 +2127,10 @@ master_def:
           {
             Lex->mi.host = $3.str;
           }
+        | MASTER_CATALOG_SYM '=' TEXT_STRING_sys
+          {
+            Lex->mi.catalog = $3.str;
+          }
         | MASTER_USER_SYM '=' TEXT_STRING_sys
           {
             Lex->mi.user = $3.str;
@@ -15988,6 +15993,7 @@ keyword_sp_var_and_label:
         | MASTER_HEARTBEAT_PERIOD_SYM
         | MASTER_GTID_POS_SYM
         | MASTER_HOST_SYM
+        | MASTER_CATALOG_SYM
         | MASTER_PORT_SYM
         | MASTER_LOG_FILE_SYM
         | MASTER_LOG_POS_SYM
-- 
2.39.2

_______________________________________________
commits mailing list -- commits@lists.mariadb.org
To unsubscribe send an email to commits-le...@lists.mariadb.org

Reply via email to