Hi, Andrei!

Looks better!

There are no major problems, but see comments below. There're few
suggestions how to simplify the code.

On Nov 05, Andrei Elkin wrote:
> revision-id: d282f5c55609469cd74d7390f70c7d922c778711 
> (mariadb-10.1.35-93-gd282f5c5560)
> parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51
> author: Andrei Elkin <andrei.el...@mariadb.com>
> committer: Andrei Elkin <andrei.el...@mariadb.com>
> timestamp: 2018-10-21 23:42:00 +0300
> message:
> 
> MDEV-10963 Fragmented BINLOG query
> 
> diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test 
> b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
> new file mode 100644
> index 00000000000..bdf41c94c76
> --- /dev/null
> +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test
> @@ -0,0 +1,50 @@
> +--source include/have_debug.inc
> +--source include/have_log_bin.inc
> +--source include/have_binlog_format_row.inc

you don't need to include have_log_bin, if you include
have_binlog_format_row.

> +
> +--let $MYSQLD_DATADIR= `select @@datadir`
> +--let $max_size=1024
> +
> +CREATE TABLE t (a TEXT);
> +# events of interest are guaranteed to stay in 000001 log
> +RESET MASTER;
> +--eval INSERT INTO t SET a=repeat('a', $max_size)

eh? why did you do it with let/eval instead of a simple sql statement?
you don't use $max_size anywhere else.

> +SELECT a from t into @a;
> +FLUSH LOGS;
> +DELETE FROM t;
> +
> +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 
> $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +
> +--let $assert_text= BINLOG is fragmented
> +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1
> +--let $assert_count= 1
> +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +--source include/assert_grep.inc

no, please, use search_pattern_in_file.inc instead.

> +
> +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +
> +SELECT a LIKE @a as 'true' FROM t;
> +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL';

that makes no sense, @binlog_fragment_0 and _1 were set in a separate
client session. You cannot test whether they were cleared or not there,
by looking at the values here

> +
> +# improper syntax error
> +--echo BINLOG number-of-fragments must be exactly two
> +--error ER_PARSE_ERROR
> +BINLOG @binlog_fragment;
> +--error ER_PARSE_ERROR
> +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment;
> +
> +# corrupted fragments error check (to the expected error code notice,
> +# the same error code occurs in a similar unfragmented case)
> +SET @binlog_fragment_0='012345';
> +SET @binlog_fragment_1='012345';
> +--error ER_SYNTAX_ERROR
> +BINLOG @binlog_fragment_0, @binlog_fragment_1;
> +
> +# Not existing fragment is not allowed
> +SET @binlog_fragment_0='012345';
> +--error ER_WRONG_TYPE_FOR_VAR
> +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist;
> +
> +--echo # Cleanup
> +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql
> +DROP TABLE t;
> diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c
> --- a/mysys/mf_iocache2.c
> +++ b/mysys/mf_iocache2.c
> @@ -22,52 +22,53 @@
>  #include <stdarg.h>
>  #include <m_ctype.h>
>  
> -/*
> -  Copy contents of an IO_CACHE to a file.
> -
> -  SYNOPSIS
> -    my_b_copy_to_file()
> -    cache  IO_CACHE to copy from
> -    file   File to copy to
> -
> -  DESCRIPTION
> -    Copy the contents of the cache to the file. The cache will be
> -    re-inited to a read cache and will read from the beginning of the
> -    cache.
> -
> -    If a failure to write fully occurs, the cache is only copied
> -    partially.
> +/**
> +  Copy the cache to the file. Copying can be constrained to @c count
> +  number of bytes when the parameter is less than SIZE_T_MAX.  The
> +  cache will be optionally re-inited to a read cache and will read
> +  from the beginning of the cache.  If a failure to write fully
> +  occurs, the cache is only copied partially.
>  
>    TODO
> -    Make this function solid by handling partial reads from the cache
> -    in a correct manner: it should be atomic.
> -
> -  RETURN VALUE
> -    0  All OK
> -    1  An error occurred
> +  Make this function solid by handling partial reads from the cache
> +  in a correct manner: it should be atomic.
> +
> +  @param cache      IO_CACHE to copy from
> +  @param file       File to copy to
> +  @param do_reinit  whether to turn the cache to read mode
> +  @param count      the copied size or the max of the type
> +                    when the whole cache is to be copied.
> +  @return
> +         0          All OK
> +         1          An error occurred
>  */
>  int
> -my_b_copy_to_file(IO_CACHE *cache, FILE *file)
> +my_b_copy_to_file(IO_CACHE *cache, FILE *file,
> +                  my_bool do_reinit,
> +                  size_t count)
>  {
> -  size_t bytes_in_cache;
> +  size_t curr_write, bytes_in_cache;
>    DBUG_ENTER("my_b_copy_to_file");
>  
>    /* Reinit the cache to read from the beginning of the cache */
> -  if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))
> +  if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE))

generally, when there's a function that is always called with a
constant (compile-time) argument, I prefer to split the code
compile-time too, if it isn't too much trouble. In this case it would
mean a new function like

  int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file)
  {
    if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)
      return 1;
    return my_b_copy_to_file(cache, file, SIZE_T_MAX);
  }

and all old code will be changed to use my_b_copy_all_to_file().
Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and
your code will use it directly.

>      DBUG_RETURN(1);
>    bytes_in_cache= my_b_bytes_in_cache(cache);
>    do
>    {
> -    if (my_fwrite(file, cache->read_pos, bytes_in_cache,
> +    curr_write= MY_MIN(bytes_in_cache, count);
> +    if (my_fwrite(file, cache->read_pos, curr_write,
>                    MYF(MY_WME | MY_NABP)) == (size_t) -1)
>        DBUG_RETURN(1);
> -  } while ((bytes_in_cache= my_b_fill(cache)));
> +
> +    cache->read_pos += curr_write;
> +    count -= curr_write;
> +  } while (count && (bytes_in_cache= my_b_fill(cache)));
>    if(cache->error == -1)
>      DBUG_RETURN(1);
>    DBUG_RETURN(0);
>  }
>  
> -
>  my_off_t my_b_append_tell(IO_CACHE* info)
>  {
>    /*
> diff --git a/sql/log_event.cc b/sql/log_event.cc
> index e07b7002398..aeca794f0cd 100644
> --- a/sql/log_event.cc
> +++ b/sql/log_event.cc
> @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol)
>  #endif
>  
>  #ifdef MYSQL_CLIENT
> +/**
> +  Print an event "body" cache to @c file possibly in multiple fragements.
> +  Each fragement is optionally per @c do_wrap to procude an SQL statement.
> +
> +  @param file      a file to print to
> +  @param body      the "body" IO_CACHE of event
> +  @param do_wrap   whether to wrap base64-encoded strings with
> +                   SQL cover.
> +  The function signals on any error through setting @c body->error to -1.
> +*/
> +void copy_cache_to_file_wrapped(FILE *file,
> +                                IO_CACHE *body,
> +                                bool do_wrap,
> +                                const char *delimiter)
> +{
> +  uint n_frag= 1;
> +  const char* before_frag= NULL;
> +  char* after_frag= NULL;
> +  char* after_last= NULL;
> +  /*
> +    2 fragments can always represent near 1GB row-based
> +    base64-encoded event as two strings each of size less than
> +    max(max_allowed_packet).  Greater number of fragments does not
> +    save from potential need to tweak (increase) @@max_allowed_packet
> +    before to process the fragments. So 2 is safe and enough.
> +  */
> +  const char fmt_last_frag2[]=
> +    "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n";
> +  const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d 
> ='\n";

this ONE_SHOT is confusing, even if in a comment. Better not to do it :)

> +  /*
> +    Buffer to hold computed formatted strings according to specifiers.
> +    The sizes may depend on an actual fragment number size in terms of 
> decimal
> +    signs so its maximum is estimated (not precisely yet safely) below.
> +  */
> +  char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2)
> +           + ((sizeof(n_frag) * 8)/3 + 1)                // max of decimal 
> index
> +           + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n 
> and 0

sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean

  sizeof(PRINT_EVENT_INFO::delimiter)

or simply

  PRINT_EVENT_INFO::max_delimiter_len

without sizeof?

> +
> +  if (do_wrap)
> +  {
> +    after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
> +    sprintf(after_frag, "'%s\n", delimiter);
> +    if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size)
> +      n_frag= 2;
> +    if (n_frag > 1)
> +    {
> +      before_frag= fmt_before_frag;
> +      after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME));
> +      sprintf(after_last, fmt_last_frag2, (char*) delimiter);
> +    }
> +    else
> +    {
> +      before_frag= "\nBINLOG '\n"; // single "fragment"
> +    }
> +  }
> +
> +  size_t total_size= my_b_tell(body), total_written= 0;
> +  size_t frag_size= total_size / n_frag + 1, curr_size;
> +
> +  if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE))
> +  {
> +    body->error= -1;
> +    goto err;
> +  }
> +
> +  for (uint i= 0; i < n_frag; i++, total_written += curr_size)
> +  {
> +    curr_size= i < n_frag - 1 ? frag_size : total_size - total_written;
> +
> +    DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size);
> +
> +    if (before_frag)
> +    {
> +      sprintf(buf, before_frag, i);
> +      my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> +    }
> +    if (my_b_copy_to_file(body, file, FALSE, curr_size))
> +    {
> +      body->error= -1;
> +      goto err;
> +    }
> +    if (after_frag)
> +    {
> +      sprintf(buf, after_frag, NULL);
> +      my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> +    }
> +  }

Hmm, dunno. I suspect you can do it three times shorter and five times
easier to read if you wouldn't try to generalize it for a arbitrary
number of fragments with arbitrary prefixes and suffixes. Just

  if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin)
  {
    my_fprintf(file, "BINLOG '");
    my_b_copy_to_file(body, file);
    my_fprintf(file, "'%s\n", delimiter);
  }
  else
  {
    my_fprintf(file, "SET @binlog_fragment_0='");
    my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size);
    my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter);
    my_b_copy_to_file(body, file, SIZE_T_MAX);
    my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n",
               delimiter, delimiter);
  }

See?

> +
> +  if (after_last)
> +  {
> +    sprintf(buf, after_last, n_frag);
> +    my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP));
> +  }
> +  reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE);
> +
> +err:
> +  my_free(after_frag);
> +  my_free(after_last);
> +}
> +
> +/**
> +  The function invokes base64 encoder to run on the current
> +  event string and store the result into two caches.
> +  When the event ends the current statement the caches are is copied into
> +  the argument file.
> +  Copying is also concerned how to wrap the event, specifically to produce
> +  a valid SQL syntax.
> +  When the encoded data size is within max(MAX_ALLOWED_PACKET)
> +  a regular BINLOG query is composed. Otherwise it is build as fragmented
> +
> +    SET @binlog_fragment_0='...';
> +    SET @binlog_fragment_1='...';
> +    BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1);
> +
> +  where fragments are represented by a sequence of "indexed" user
> +  variables.
> +  Two more statements are composed as well
> +
> +    SET @binlog_fragment_0=NULL;
> +    SET @binlog_fragment_1=NULL;
> +
> +  to promptly release memory.

No, they aren't

> +
> +  NOTE.

@note

> +  If any changes made don't forget to duplicate them to
> +  Old_rows_log_event as long as it's supported.
> +
> +  @param file               pointer to IO_CACHE
> +  @param print_event_info   pointer to print_event_info specializing
> +                            what out of and how to print the event
> +  @param name               the name of a table that the event operates on
> +
> +  The function signals on any error of cache access through setting
> +  that cache's @c error to -1.
> +*/
>  void Rows_log_event::print_helper(FILE *file,
>                                    PRINT_EVENT_INFO *print_event_info,
>                                    char const *const name)
>  {
>    IO_CACHE *const head= &print_event_info->head_cache;
>    IO_CACHE *const body= &print_event_info->body_cache;
> +  bool do_print_encoded=
> +    print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS &&
> +    !print_event_info->short_form;
> +
>    if (!print_event_info->short_form)
>    {
>      bool const last_stmt_event= get_flags(STMT_END_F);
> diff --git a/sql/log_event.h b/sql/log_event.h
> index 90900f63533..28277e659d2 100644
> --- a/sql/log_event.h
> +++ b/sql/log_event.h
> @@ -749,6 +749,7 @@ typedef struct st_print_event_info
>      that was printed.  We cache these so that we don't have to print
>      them if they are unchanged.
>    */
> +  static const uint max_delimiter_len= 16;

why did you introduce this max_delimiter_len, if all you use
is sizeof(delimiter) anyway? (and even that is not needed)

>    // TODO: have the last catalog here ??
>    char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is
>    bool flags2_inited;
> @@ -798,7 +799,7 @@ typedef struct st_print_event_info
>    bool printed_fd_event;
>    my_off_t hexdump_from;
>    uint8 common_header_len;
> -  char delimiter[16];
> +  char delimiter[max_delimiter_len];
>  
>    uint verbose;
>    table_mapping m_table_map;
> diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc
> index 91cf038907e..b4e3342d8f3 100644
> --- a/sql/sql_binlog.cc
> +++ b/sql/sql_binlog.cc
> @@ -28,6 +28,70 @@
>                                                  // START_EVENT_V3,
>                                                  // Log_event_type,
>                                                  // Log_event
> +
> +/**
> +  Copy fragments into the standard placeholder thd->lex->comment.str.
> +
> +  Compute the size of the (still) encoded total,
> +  allocate and then copy fragments one after another.
> +  The size can exceed max(max_allowed_packet) which is not a
> +  problem as no String instance is created off this char array.
> +
> +  @param thd  THD handle
> +  @return
> +     0        at success,
> +    -1        otherwise.
> +*/
> +int binlog_defragment(THD *thd)
> +{
> +  user_var_entry *entry[2];
> +  LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident };
> +
> +  /* compute the total size */
> +  thd->lex->comment.str= NULL;
> +  thd->lex->comment.length= 0;
> +  for (uint k= 0; k < 2; k++)
> +  {
> +    entry[k]=
> +      (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str,
> +                                       name[k].length);
> +    if (!entry[k] || entry[k]->type != STRING_RESULT)
> +    {
> +      my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str);
> +      return -1;
> +    }
> +    thd->lex->comment.length += entry[k]->length;
> +  }
> +
> +  thd->lex->comment.str=                            // to be freed by the 
> caller
> +    (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME));
> +  if (!thd->lex->comment.str)
> +  {
> +    my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
> +    return -1;
> +  }
> +
> +  /* fragments are merged into allocated buf while the user var:s get reset 
> */
> +  size_t gathered_length= 0;
> +  for (uint k=0; k < 2; k++)
> +  {
> +    memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, 
> entry[k]->length);
> +    gathered_length += entry[k]->length;
> +    if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, 
> 0))
> +    {
> +      my_printf_error(ER_WRONG_TYPE_FOR_VAR,
> +                      "%s: BINLOG fragment user "
> +                      "variable '%s' could not be unset", MYF(0),
> +                      ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value);
> +    }

I don't see how update_hash(entry[k], true, ...) can ever fail, so
there's no need to pretend that it can.

> +  }
> +
> +  DBUG_ASSERT(gathered_length == thd->lex->comment.length);
> +
> +  return 0;
> +}
> +
> +
>  /**
>    Execute a BINLOG statement.
>  
> @@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd)
>    rli->sql_driver_thd= thd;
>    rli->no_storage= TRUE;
>  
> +  if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str))
> +    if (binlog_defragment(thd))
> +      goto end;
> +
> +  if (!(coded_len= thd->lex->comment.length))
> +  {
> +    my_error(ER_SYNTAX_ERROR, MYF(0));
> +    goto end;
> +  }
> +
> +  decoded_len= base64_needed_decoded_length(coded_len);
> +  if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME))))
> +  {
> +    my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1);
> +    goto end;
> +  }
> +

Technically, it should be possible to decode base64 in-place and avoid
allocating a second 3GB buffer. But let's not do it in this MDEV :)

>    for (char const *strptr= thd->lex->comment.str ;
>         strptr < thd->lex->comment.str + thd->lex->comment.length ; )
>    {

Regards,
Sergei
Chief Architect MariaDB
and secur...@mariadb.org

_______________________________________________
Mailing list: https://launchpad.net/~maria-developers
Post to     : maria-developers@lists.launchpad.net
Unsubscribe : https://launchpad.net/~maria-developers
More help   : https://help.launchpad.net/ListHelp

Reply via email to