Hi, Andrei! Looks better!
There are no major problems, but see comments below. There're few suggestions how to simplify the code. On Nov 05, Andrei Elkin wrote: > revision-id: d282f5c55609469cd74d7390f70c7d922c778711 > (mariadb-10.1.35-93-gd282f5c5560) > parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51 > author: Andrei Elkin <andrei.el...@mariadb.com> > committer: Andrei Elkin <andrei.el...@mariadb.com> > timestamp: 2018-10-21 23:42:00 +0300 > message: > > MDEV-10963 Fragmented BINLOG query > > diff --git a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test > b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test > new file mode 100644 > index 00000000000..bdf41c94c76 > --- /dev/null > +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test > @@ -0,0 +1,50 @@ > +--source include/have_debug.inc > +--source include/have_log_bin.inc > +--source include/have_binlog_format_row.inc you don't need to include have_log_bin, if you include have_binlog_format_row. > + > +--let $MYSQLD_DATADIR= `select @@datadir` > +--let $max_size=1024 > + > +CREATE TABLE t (a TEXT); > +# events of interest are guaranteed to stay in 000001 log > +RESET MASTER; > +--eval INSERT INTO t SET a=repeat('a', $max_size) eh? why did you do it with let/eval instead of a simple sql statement? you don't use $max_size anywhere else. > +SELECT a from t into @a; > +FLUSH LOGS; > +DELETE FROM t; > + > +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 > $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql > + > +--let $assert_text= BINLOG is fragmented > +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1 > +--let $assert_count= 1 > +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql > +--source include/assert_grep.inc no, please, use search_pattern_in_file.inc instead. > + > +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql > + > +SELECT a LIKE @a as 'true' FROM t; > +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL'; that makes no sense, @binlog_fragment_0 and _1 were set in a separate client session. You cannot test whether they were cleared or not there, by looking at the values here > + > +# improper syntax error > +--echo BINLOG number-of-fragments must be exactly two > +--error ER_PARSE_ERROR > +BINLOG @binlog_fragment; > +--error ER_PARSE_ERROR > +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment; > + > +# corrupted fragments error check (to the expected error code notice, > +# the same error code occurs in a similar unfragmented case) > +SET @binlog_fragment_0='012345'; > +SET @binlog_fragment_1='012345'; > +--error ER_SYNTAX_ERROR > +BINLOG @binlog_fragment_0, @binlog_fragment_1; > + > +# Not existing fragment is not allowed > +SET @binlog_fragment_0='012345'; > +--error ER_WRONG_TYPE_FOR_VAR > +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist; > + > +--echo # Cleanup > +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql > +DROP TABLE t; > diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c > --- a/mysys/mf_iocache2.c > +++ b/mysys/mf_iocache2.c > @@ -22,52 +22,53 @@ > #include <stdarg.h> > #include <m_ctype.h> > > -/* > - Copy contents of an IO_CACHE to a file. > - > - SYNOPSIS > - my_b_copy_to_file() > - cache IO_CACHE to copy from > - file File to copy to > - > - DESCRIPTION > - Copy the contents of the cache to the file. The cache will be > - re-inited to a read cache and will read from the beginning of the > - cache. > - > - If a failure to write fully occurs, the cache is only copied > - partially. > +/** > + Copy the cache to the file. Copying can be constrained to @c count > + number of bytes when the parameter is less than SIZE_T_MAX. The > + cache will be optionally re-inited to a read cache and will read > + from the beginning of the cache. If a failure to write fully > + occurs, the cache is only copied partially. > > TODO > - Make this function solid by handling partial reads from the cache > - in a correct manner: it should be atomic. > - > - RETURN VALUE > - 0 All OK > - 1 An error occurred > + Make this function solid by handling partial reads from the cache > + in a correct manner: it should be atomic. > + > + @param cache IO_CACHE to copy from > + @param file File to copy to > + @param do_reinit whether to turn the cache to read mode > + @param count the copied size or the max of the type > + when the whole cache is to be copied. > + @return > + 0 All OK > + 1 An error occurred > */ > int > -my_b_copy_to_file(IO_CACHE *cache, FILE *file) > +my_b_copy_to_file(IO_CACHE *cache, FILE *file, > + my_bool do_reinit, > + size_t count) > { > - size_t bytes_in_cache; > + size_t curr_write, bytes_in_cache; > DBUG_ENTER("my_b_copy_to_file"); > > /* Reinit the cache to read from the beginning of the cache */ > - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) > + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) generally, when there's a function that is always called with a constant (compile-time) argument, I prefer to split the code compile-time too, if it isn't too much trouble. In this case it would mean a new function like int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) { if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE) return 1; return my_b_copy_to_file(cache, file, SIZE_T_MAX); } and all old code will be changed to use my_b_copy_all_to_file(). Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and your code will use it directly. > DBUG_RETURN(1); > bytes_in_cache= my_b_bytes_in_cache(cache); > do > { > - if (my_fwrite(file, cache->read_pos, bytes_in_cache, > + curr_write= MY_MIN(bytes_in_cache, count); > + if (my_fwrite(file, cache->read_pos, curr_write, > MYF(MY_WME | MY_NABP)) == (size_t) -1) > DBUG_RETURN(1); > - } while ((bytes_in_cache= my_b_fill(cache))); > + > + cache->read_pos += curr_write; > + count -= curr_write; > + } while (count && (bytes_in_cache= my_b_fill(cache))); > if(cache->error == -1) > DBUG_RETURN(1); > DBUG_RETURN(0); > } > > - > my_off_t my_b_append_tell(IO_CACHE* info) > { > /* > diff --git a/sql/log_event.cc b/sql/log_event.cc > index e07b7002398..aeca794f0cd 100644 > --- a/sql/log_event.cc > +++ b/sql/log_event.cc > @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol) > #endif > > #ifdef MYSQL_CLIENT > +/** > + Print an event "body" cache to @c file possibly in multiple fragements. > + Each fragement is optionally per @c do_wrap to procude an SQL statement. > + > + @param file a file to print to > + @param body the "body" IO_CACHE of event > + @param do_wrap whether to wrap base64-encoded strings with > + SQL cover. > + The function signals on any error through setting @c body->error to -1. > +*/ > +void copy_cache_to_file_wrapped(FILE *file, > + IO_CACHE *body, > + bool do_wrap, > + const char *delimiter) > +{ > + uint n_frag= 1; > + const char* before_frag= NULL; > + char* after_frag= NULL; > + char* after_last= NULL; > + /* > + 2 fragments can always represent near 1GB row-based > + base64-encoded event as two strings each of size less than > + max(max_allowed_packet). Greater number of fragments does not > + save from potential need to tweak (increase) @@max_allowed_packet > + before to process the fragments. So 2 is safe and enough. > + */ > + const char fmt_last_frag2[]= > + "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n"; > + const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d > ='\n"; this ONE_SHOT is confusing, even if in a comment. Better not to do it :) > + /* > + Buffer to hold computed formatted strings according to specifiers. > + The sizes may depend on an actual fragment number size in terms of > decimal > + signs so its maximum is estimated (not precisely yet safely) below. > + */ > + char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2) > + + ((sizeof(n_frag) * 8)/3 + 1) // max of decimal > index > + + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n > and 0 sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean sizeof(PRINT_EVENT_INFO::delimiter) or simply PRINT_EVENT_INFO::max_delimiter_len without sizeof? > + > + if (do_wrap) > + { > + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); > + sprintf(after_frag, "'%s\n", delimiter); > + if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size) > + n_frag= 2; > + if (n_frag > 1) > + { > + before_frag= fmt_before_frag; > + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); > + sprintf(after_last, fmt_last_frag2, (char*) delimiter); > + } > + else > + { > + before_frag= "\nBINLOG '\n"; // single "fragment" > + } > + } > + > + size_t total_size= my_b_tell(body), total_written= 0; > + size_t frag_size= total_size / n_frag + 1, curr_size; > + > + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) > + { > + body->error= -1; > + goto err; > + } > + > + for (uint i= 0; i < n_frag; i++, total_written += curr_size) > + { > + curr_size= i < n_frag - 1 ? frag_size : total_size - total_written; > + > + DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size); > + > + if (before_frag) > + { > + sprintf(buf, before_frag, i); > + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); > + } > + if (my_b_copy_to_file(body, file, FALSE, curr_size)) > + { > + body->error= -1; > + goto err; > + } > + if (after_frag) > + { > + sprintf(buf, after_frag, NULL); > + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); > + } > + } Hmm, dunno. I suspect you can do it three times shorter and five times easier to read if you wouldn't try to generalize it for a arbitrary number of fragments with arbitrary prefixes and suffixes. Just if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin) { my_fprintf(file, "BINLOG '"); my_b_copy_to_file(body, file); my_fprintf(file, "'%s\n", delimiter); } else { my_fprintf(file, "SET @binlog_fragment_0='"); my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size); my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter); my_b_copy_to_file(body, file, SIZE_T_MAX); my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n", delimiter, delimiter); } See? > + > + if (after_last) > + { > + sprintf(buf, after_last, n_frag); > + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); > + } > + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); > + > +err: > + my_free(after_frag); > + my_free(after_last); > +} > + > +/** > + The function invokes base64 encoder to run on the current > + event string and store the result into two caches. > + When the event ends the current statement the caches are is copied into > + the argument file. > + Copying is also concerned how to wrap the event, specifically to produce > + a valid SQL syntax. > + When the encoded data size is within max(MAX_ALLOWED_PACKET) > + a regular BINLOG query is composed. Otherwise it is build as fragmented > + > + SET @binlog_fragment_0='...'; > + SET @binlog_fragment_1='...'; > + BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1); > + > + where fragments are represented by a sequence of "indexed" user > + variables. > + Two more statements are composed as well > + > + SET @binlog_fragment_0=NULL; > + SET @binlog_fragment_1=NULL; > + > + to promptly release memory. No, they aren't > + > + NOTE. @note > + If any changes made don't forget to duplicate them to > + Old_rows_log_event as long as it's supported. > + > + @param file pointer to IO_CACHE > + @param print_event_info pointer to print_event_info specializing > + what out of and how to print the event > + @param name the name of a table that the event operates on > + > + The function signals on any error of cache access through setting > + that cache's @c error to -1. > +*/ > void Rows_log_event::print_helper(FILE *file, > PRINT_EVENT_INFO *print_event_info, > char const *const name) > { > IO_CACHE *const head= &print_event_info->head_cache; > IO_CACHE *const body= &print_event_info->body_cache; > + bool do_print_encoded= > + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && > + !print_event_info->short_form; > + > if (!print_event_info->short_form) > { > bool const last_stmt_event= get_flags(STMT_END_F); > diff --git a/sql/log_event.h b/sql/log_event.h > index 90900f63533..28277e659d2 100644 > --- a/sql/log_event.h > +++ b/sql/log_event.h > @@ -749,6 +749,7 @@ typedef struct st_print_event_info > that was printed. We cache these so that we don't have to print > them if they are unchanged. > */ > + static const uint max_delimiter_len= 16; why did you introduce this max_delimiter_len, if all you use is sizeof(delimiter) anyway? (and even that is not needed) > // TODO: have the last catalog here ?? > char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is > bool flags2_inited; > @@ -798,7 +799,7 @@ typedef struct st_print_event_info > bool printed_fd_event; > my_off_t hexdump_from; > uint8 common_header_len; > - char delimiter[16]; > + char delimiter[max_delimiter_len]; > > uint verbose; > table_mapping m_table_map; > diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc > index 91cf038907e..b4e3342d8f3 100644 > --- a/sql/sql_binlog.cc > +++ b/sql/sql_binlog.cc > @@ -28,6 +28,70 @@ > // START_EVENT_V3, > // Log_event_type, > // Log_event > + > +/** > + Copy fragments into the standard placeholder thd->lex->comment.str. > + > + Compute the size of the (still) encoded total, > + allocate and then copy fragments one after another. > + The size can exceed max(max_allowed_packet) which is not a > + problem as no String instance is created off this char array. > + > + @param thd THD handle > + @return > + 0 at success, > + -1 otherwise. > +*/ > +int binlog_defragment(THD *thd) > +{ > + user_var_entry *entry[2]; > + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident }; > + > + /* compute the total size */ > + thd->lex->comment.str= NULL; > + thd->lex->comment.length= 0; > + for (uint k= 0; k < 2; k++) > + { > + entry[k]= > + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) name[k].str, > + name[k].length); > + if (!entry[k] || entry[k]->type != STRING_RESULT) > + { > + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); > + return -1; > + } > + thd->lex->comment.length += entry[k]->length; > + } > + > + thd->lex->comment.str= // to be freed by the > caller > + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); > + if (!thd->lex->comment.str) > + { > + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); > + return -1; > + } > + > + /* fragments are merged into allocated buf while the user var:s get reset > */ > + size_t gathered_length= 0; > + for (uint k=0; k < 2; k++) > + { > + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, > entry[k]->length); > + gathered_length += entry[k]->length; > + if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, &my_charset_bin, > 0)) > + { > + my_printf_error(ER_WRONG_TYPE_FOR_VAR, > + "%s: BINLOG fragment user " > + "variable '%s' could not be unset", MYF(0), > + ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value); > + } I don't see how update_hash(entry[k], true, ...) can ever fail, so there's no need to pretend that it can. > + } > + > + DBUG_ASSERT(gathered_length == thd->lex->comment.length); > + > + return 0; > +} > + > + > /** > Execute a BINLOG statement. > > @@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd) > rli->sql_driver_thd= thd; > rli->no_storage= TRUE; > > + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) > + if (binlog_defragment(thd)) > + goto end; > + > + if (!(coded_len= thd->lex->comment.length)) > + { > + my_error(ER_SYNTAX_ERROR, MYF(0)); > + goto end; > + } > + > + decoded_len= base64_needed_decoded_length(coded_len); > + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) > + { > + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); > + goto end; > + } > + Technically, it should be possible to decode base64 in-place and avoid allocating a second 3GB buffer. But let's not do it in this MDEV :) > for (char const *strptr= thd->lex->comment.str ; > strptr < thd->lex->comment.str + thd->lex->comment.length ; ) > { Regards, Sergei Chief Architect MariaDB and secur...@mariadb.org _______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : maria-developers@lists.launchpad.net Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp