Sergei, hallo. > Hi, Andrei! > > Looks better!
There's never a limit to improvements though :-). Thanks for checking and insightful comments as usual! I am publishing an updated patch to cover all of them. To your notes, I am replying and commenting on a few of your questions below. Could you also please clarify on one point, which is >> -my_b_copy_to_file(IO_CACHE *cache, FILE *file) >> +my_b_copy_to_file(IO_CACHE *cache, FILE *file, >> + my_bool do_reinit, >> + size_t count) >> { >> - size_t bytes_in_cache; >> + size_t curr_write, bytes_in_cache; >> DBUG_ENTER("my_b_copy_to_file"); >> >> /* Reinit the cache to read from the beginning of the cache */ >> - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) >> + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) > > generally, when there's a function that is always called with a > constant (compile-time) argument, I prefer to split the code > compile-time too, if it isn't too much trouble. In this case it would > mean a new function like > > int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) > { > if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE) > return 1; > return my_b_copy_to_file(cache, file, SIZE_T_MAX); > } > why is the preference specifically? I looked around to find something like https://en.wikipedia.org/wiki/Compile_time_function_execution ... but I somewhat doubt you meant that. Or did you really? Now to my acknowledgments and comments... > > There are no major problems, but see comments below. There're few > suggestions how to simplify the code. > > On Nov 05, Andrei Elkin wrote: >> revision-id: d282f5c55609469cd74d7390f70c7d922c778711 >> (mariadb-10.1.35-93-gd282f5c5560) >> parent(s): 2a576f71c5d3c7aacef564e5b1251f83bde48f51 >> author: Andrei Elkin <andrei.el...@mariadb.com> >> committer: Andrei Elkin <andrei.el...@mariadb.com> >> timestamp: 2018-10-21 23:42:00 +0300 >> message: >> >> MDEV-10963 Fragmented BINLOG query >> >> diff --git >> a/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test >> b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test >> new file mode 100644 >> index 00000000000..bdf41c94c76 >> --- /dev/null >> +++ b/mysql-test/suite/binlog/t/binlog_mysqlbinlog_row_frag.test >> @@ -0,0 +1,50 @@ >> +--source include/have_debug.inc >> +--source include/have_log_bin.inc >> +--source include/have_binlog_format_row.inc > > you don't need to include have_log_bin, if you include > have_binlog_format_row. ack > >> + >> +--let $MYSQLD_DATADIR= `select @@datadir` >> +--let $max_size=1024 >> + >> +CREATE TABLE t (a TEXT); >> +# events of interest are guaranteed to stay in 000001 log >> +RESET MASTER; >> +--eval INSERT INTO t SET a=repeat('a', $max_size) > > eh? why did you do it with let/eval instead of a simple sql statement? > you don't use $max_size anywhere else. left inadvertently from a previous patch. > >> +SELECT a from t into @a; >> +FLUSH LOGS; >> +DELETE FROM t; >> + >> +--exec $MYSQL_BINLOG --debug-binlog-row-event-max-encoded-size=256 >> $MYSQLD_DATADIR/master-bin.000001 > >> $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql >> + >> +--let $assert_text= BINLOG is fragmented >> +--let $assert_select= BINLOG @binlog_fragment_0, @binlog_fragment_1 >> +--let $assert_count= 1 >> +--let $assert_file= $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql >> +--source include/assert_grep.inc > > no, please, use search_pattern_in_file.inc instead. ack > >> + >> +--exec $MYSQL test < $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql >> + >> +SELECT a LIKE @a as 'true' FROM t; >> +SELECT @binlog_fragment_0, @binlog_fragment_1 as 'NULL'; > > that makes no sense, @binlog_fragment_0 and _1 were set in a separate > client session. You cannot test whether they were cleared or not there, > by looking at the values here You caught me here! :-). In the new patch I relocate the check in the other test file. > >> + >> +# improper syntax error >> +--echo BINLOG number-of-fragments must be exactly two >> +--error ER_PARSE_ERROR >> +BINLOG @binlog_fragment; >> +--error ER_PARSE_ERROR >> +BINLOG @binlog_fragment, @binlog_fragment, @binlog_fragment; >> + >> +# corrupted fragments error check (to the expected error code notice, >> +# the same error code occurs in a similar unfragmented case) >> +SET @binlog_fragment_0='012345'; >> +SET @binlog_fragment_1='012345'; >> +--error ER_SYNTAX_ERROR >> +BINLOG @binlog_fragment_0, @binlog_fragment_1; >> + >> +# Not existing fragment is not allowed >> +SET @binlog_fragment_0='012345'; >> +--error ER_WRONG_TYPE_FOR_VAR >> +BINLOG @binlog_fragment_0, @binlog_fragment_not_exist; >> + >> +--echo # Cleanup >> +--remove_file $MYSQLTEST_VARDIR/tmp/mysqlbinlog.sql >> +DROP TABLE t; >> diff --git a/mysys/mf_iocache2.c b/mysys/mf_iocache2.c >> --- a/mysys/mf_iocache2.c >> +++ b/mysys/mf_iocache2.c >> @@ -22,52 +22,53 @@ >> #include <stdarg.h> >> #include <m_ctype.h> >> >> -/* >> - Copy contents of an IO_CACHE to a file. >> - >> - SYNOPSIS >> - my_b_copy_to_file() >> - cache IO_CACHE to copy from >> - file File to copy to >> - >> - DESCRIPTION >> - Copy the contents of the cache to the file. The cache will be >> - re-inited to a read cache and will read from the beginning of the >> - cache. >> - >> - If a failure to write fully occurs, the cache is only copied >> - partially. >> +/** >> + Copy the cache to the file. Copying can be constrained to @c count >> + number of bytes when the parameter is less than SIZE_T_MAX. The >> + cache will be optionally re-inited to a read cache and will read >> + from the beginning of the cache. If a failure to write fully >> + occurs, the cache is only copied partially. >> >> TODO >> - Make this function solid by handling partial reads from the cache >> - in a correct manner: it should be atomic. >> - >> - RETURN VALUE >> - 0 All OK >> - 1 An error occurred >> + Make this function solid by handling partial reads from the cache >> + in a correct manner: it should be atomic. >> + >> + @param cache IO_CACHE to copy from >> + @param file File to copy to >> + @param do_reinit whether to turn the cache to read mode >> + @param count the copied size or the max of the type >> + when the whole cache is to be copied. >> + @return >> + 0 All OK >> + 1 An error occurred >> */ >> int >> -my_b_copy_to_file(IO_CACHE *cache, FILE *file) >> +my_b_copy_to_file(IO_CACHE *cache, FILE *file, >> + my_bool do_reinit, >> + size_t count) >> { >> - size_t bytes_in_cache; >> + size_t curr_write, bytes_in_cache; >> DBUG_ENTER("my_b_copy_to_file"); >> >> /* Reinit the cache to read from the beginning of the cache */ >> - if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) >> + if (do_reinit && reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE)) > > generally, when there's a function that is always called with a > constant (compile-time) argument, I prefer to split the code > compile-time too, if it isn't too much trouble. In this case it would > mean a new function like > > int my_b_copy_all_to_file(IO_CACHE *cache, FILE *file) > { > if (reinit_io_cache(cache, READ_CACHE, 0L, FALSE, FALSE) > return 1; > return my_b_copy_to_file(cache, file, SIZE_T_MAX); > } > > and all old code will be changed to use my_b_copy_all_to_file(). > Old my_b_copy_to_file() won't need to do reinit_io_cache() anymore and > your code will use it directly. > >> DBUG_RETURN(1); >> bytes_in_cache= my_b_bytes_in_cache(cache); >> do >> { >> - if (my_fwrite(file, cache->read_pos, bytes_in_cache, >> + curr_write= MY_MIN(bytes_in_cache, count); >> + if (my_fwrite(file, cache->read_pos, curr_write, >> MYF(MY_WME | MY_NABP)) == (size_t) -1) >> DBUG_RETURN(1); >> - } while ((bytes_in_cache= my_b_fill(cache))); >> + >> + cache->read_pos += curr_write; >> + count -= curr_write; >> + } while (count && (bytes_in_cache= my_b_fill(cache))); >> if(cache->error == -1) >> DBUG_RETURN(1); >> DBUG_RETURN(0); >> } >> >> - >> my_off_t my_b_append_tell(IO_CACHE* info) >> { >> /* >> diff --git a/sql/log_event.cc b/sql/log_event.cc >> index e07b7002398..aeca794f0cd 100644 >> --- a/sql/log_event.cc >> +++ b/sql/log_event.cc >> @@ -10474,12 +10488,151 @@ void Rows_log_event::pack_info(Protocol *protocol) >> #endif >> >> #ifdef MYSQL_CLIENT >> +/** >> + Print an event "body" cache to @c file possibly in multiple fragements. >> + Each fragement is optionally per @c do_wrap to procude an SQL statement. >> + >> + @param file a file to print to >> + @param body the "body" IO_CACHE of event >> + @param do_wrap whether to wrap base64-encoded strings with >> + SQL cover. >> + The function signals on any error through setting @c body->error to -1. >> +*/ >> +void copy_cache_to_file_wrapped(FILE *file, >> + IO_CACHE *body, >> + bool do_wrap, >> + const char *delimiter) >> +{ >> + uint n_frag= 1; >> + const char* before_frag= NULL; >> + char* after_frag= NULL; >> + char* after_last= NULL; >> + /* >> + 2 fragments can always represent near 1GB row-based >> + base64-encoded event as two strings each of size less than >> + max(max_allowed_packet). Greater number of fragments does not >> + save from potential need to tweak (increase) @@max_allowed_packet >> + before to process the fragments. So 2 is safe and enough. >> + */ >> + const char fmt_last_frag2[]= >> + "\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n"; >> + const char fmt_before_frag[]= "\nSET /* ONE_SHOT */ @binlog_fragment_%d >> ='\n"; > > this ONE_SHOT is confusing, even if in a comment. Better not to do it :) ack > >> + /* >> + Buffer to hold computed formatted strings according to specifiers. >> + The sizes may depend on an actual fragment number size in terms of >> decimal >> + signs so its maximum is estimated (not precisely yet safely) below. >> + */ >> + char buf[sizeof(fmt_before_frag) + sizeof(fmt_last_frag2) >> + + ((sizeof(n_frag) * 8)/3 + 1) // max of decimal >> index >> + + sizeof(PRINT_EVENT_INFO::max_delimiter_len) + 3]; // delim, \n >> and 0 > > sizeof(max_delimiter_len) ? it's sizeof(uint), right? Did you mean > > sizeof(PRINT_EVENT_INFO::delimiter) > > or simply > > PRINT_EVENT_INFO::max_delimiter_len > > without sizeof? Indeed! > >> + >> + if (do_wrap) >> + { >> + after_frag= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); >> + sprintf(after_frag, "'%s\n", delimiter); >> + if (my_b_tell(body) > opt_binlog_rows_event_max_encoded_size) >> + n_frag= 2; >> + if (n_frag > 1) >> + { >> + before_frag= fmt_before_frag; >> + after_last= (char*) my_malloc(sizeof(buf), MYF(MY_WME)); >> + sprintf(after_last, fmt_last_frag2, (char*) delimiter); >> + } >> + else >> + { >> + before_frag= "\nBINLOG '\n"; // single "fragment" >> + } >> + } >> + >> + size_t total_size= my_b_tell(body), total_written= 0; >> + size_t frag_size= total_size / n_frag + 1, curr_size; >> + >> + if (reinit_io_cache(body, READ_CACHE, 0L, FALSE, FALSE)) >> + { >> + body->error= -1; >> + goto err; >> + } >> + >> + for (uint i= 0; i < n_frag; i++, total_written += curr_size) >> + { >> + curr_size= i < n_frag - 1 ? frag_size : total_size - total_written; >> + >> + DBUG_ASSERT(i < n_frag - 1 || curr_size <= frag_size); >> + >> + if (before_frag) >> + { >> + sprintf(buf, before_frag, i); >> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); >> + } >> + if (my_b_copy_to_file(body, file, FALSE, curr_size)) >> + { >> + body->error= -1; >> + goto err; >> + } >> + if (after_frag) >> + { >> + sprintf(buf, after_frag, NULL); >> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); >> + } >> + } > > Hmm, dunno. I suspect you can do it three times shorter and five times > easier to read if you wouldn't try to generalize it for a arbitrary > number of fragments with arbitrary prefixes and suffixes. Just > > if (my_b_tell(body) < opt_binlog_rows_event_max_encoded_size - margin) > { > my_fprintf(file, "BINLOG '"); > my_b_copy_to_file(body, file); > my_fprintf(file, "'%s\n", delimiter); > } > else > { > my_fprintf(file, "SET @binlog_fragment_0='"); > my_b_copy_to_file(body, file, opt_binlog_rows_event_max_encoded_size); > my_fprintf(file, "'%s\nSET @binlog_fragment_1='", delimiter); > my_b_copy_to_file(body, file, SIZE_T_MAX); > my_fprintf(file, "'%s\nBINLOG @binlog_fragment_0, @binlog_fragment_1%s\n", > delimiter, delimiter); > } > > See? Very true. I did not do consider this simple method just "historically" having it all started with an idea of arbitrary number of fragments. As result now no loop in the new patch. > >> + >> + if (after_last) >> + { >> + sprintf(buf, after_last, n_frag); >> + my_fwrite(file, (uchar*) buf, strlen(buf), MYF(MY_WME | MY_NABP)); >> + } >> + reinit_io_cache(body, WRITE_CACHE, 0, FALSE, TRUE); >> + >> +err: >> + my_free(after_frag); >> + my_free(after_last); >> +} >> + >> +/** >> + The function invokes base64 encoder to run on the current >> + event string and store the result into two caches. >> + When the event ends the current statement the caches are is copied into >> + the argument file. >> + Copying is also concerned how to wrap the event, specifically to produce >> + a valid SQL syntax. >> + When the encoded data size is within max(MAX_ALLOWED_PACKET) >> + a regular BINLOG query is composed. Otherwise it is build as fragmented >> + >> + SET @binlog_fragment_0='...'; >> + SET @binlog_fragment_1='...'; >> + BINLOG DEFRAGMENT(@binlog_fragment_0, @binlog_fragment_1); >> + >> + where fragments are represented by a sequence of "indexed" user >> + variables. >> + Two more statements are composed as well >> + >> + SET @binlog_fragment_0=NULL; >> + SET @binlog_fragment_1=NULL; >> + >> + to promptly release memory. > > No, they aren't ack > >> + >> + NOTE. > > @note > >> + If any changes made don't forget to duplicate them to >> + Old_rows_log_event as long as it's supported. >> + >> + @param file pointer to IO_CACHE >> + @param print_event_info pointer to print_event_info specializing >> + what out of and how to print the event >> + @param name the name of a table that the event operates on >> + >> + The function signals on any error of cache access through setting >> + that cache's @c error to -1. >> +*/ >> void Rows_log_event::print_helper(FILE *file, >> PRINT_EVENT_INFO *print_event_info, >> char const *const name) >> { >> IO_CACHE *const head= &print_event_info->head_cache; >> IO_CACHE *const body= &print_event_info->body_cache; >> + bool do_print_encoded= >> + print_event_info->base64_output_mode != BASE64_OUTPUT_DECODE_ROWS && >> + !print_event_info->short_form; >> + >> if (!print_event_info->short_form) >> { >> bool const last_stmt_event= get_flags(STMT_END_F); >> diff --git a/sql/log_event.h b/sql/log_event.h >> index 90900f63533..28277e659d2 100644 >> --- a/sql/log_event.h >> +++ b/sql/log_event.h >> @@ -749,6 +749,7 @@ typedef struct st_print_event_info >> that was printed. We cache these so that we don't have to print >> them if they are unchanged. >> */ >> + static const uint max_delimiter_len= 16; > > why did you introduce this max_delimiter_len, if all you use > is sizeof(delimiter) anyway? (and even that is not needed) I use the new introduced constant in the new patch which is renamed though to hint at its 'size' rather than 'length' semantics. > >> // TODO: have the last catalog here ?? >> char db[FN_REFLEN+1]; // TODO: make this a LEX_STRING when thd->db is >> bool flags2_inited; >> @@ -798,7 +799,7 @@ typedef struct st_print_event_info >> bool printed_fd_event; >> my_off_t hexdump_from; >> uint8 common_header_len; >> - char delimiter[16]; >> + char delimiter[max_delimiter_len]; >> >> uint verbose; >> table_mapping m_table_map; >> diff --git a/sql/sql_binlog.cc b/sql/sql_binlog.cc >> index 91cf038907e..b4e3342d8f3 100644 >> --- a/sql/sql_binlog.cc >> +++ b/sql/sql_binlog.cc >> @@ -28,6 +28,70 @@ >> // START_EVENT_V3, >> // Log_event_type, >> // Log_event >> + >> +/** >> + Copy fragments into the standard placeholder thd->lex->comment.str. >> + >> + Compute the size of the (still) encoded total, >> + allocate and then copy fragments one after another. >> + The size can exceed max(max_allowed_packet) which is not a >> + problem as no String instance is created off this char array. >> + >> + @param thd THD handle >> + @return >> + 0 at success, >> + -1 otherwise. >> +*/ >> +int binlog_defragment(THD *thd) >> +{ >> + user_var_entry *entry[2]; >> + LEX_STRING name[2]= { thd->lex->comment, thd->lex->ident }; >> + >> + /* compute the total size */ >> + thd->lex->comment.str= NULL; >> + thd->lex->comment.length= 0; >> + for (uint k= 0; k < 2; k++) >> + { >> + entry[k]= >> + (user_var_entry*) my_hash_search(&thd->user_vars, (uchar*) >> name[k].str, >> + name[k].length); >> + if (!entry[k] || entry[k]->type != STRING_RESULT) >> + { >> + my_error(ER_WRONG_TYPE_FOR_VAR, MYF(0), name[k].str); >> + return -1; >> + } >> + thd->lex->comment.length += entry[k]->length; >> + } >> + >> + thd->lex->comment.str= // to be freed by the >> caller >> + (char *) my_malloc(thd->lex->comment.length, MYF(MY_WME)); >> + if (!thd->lex->comment.str) >> + { >> + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); >> + return -1; >> + } >> + >> + /* fragments are merged into allocated buf while the user var:s get reset >> */ >> + size_t gathered_length= 0; >> + for (uint k=0; k < 2; k++) >> + { >> + memcpy(thd->lex->comment.str + gathered_length, entry[k]->value, >> entry[k]->length); >> + gathered_length += entry[k]->length; >> + if (update_hash(entry[k], true, NULL, 0, STRING_RESULT, >> &my_charset_bin, 0)) >> + { >> + my_printf_error(ER_WRONG_TYPE_FOR_VAR, >> + "%s: BINLOG fragment user " >> + "variable '%s' could not be unset", MYF(0), >> + ER_THD(thd, ER_WRONG_TYPE_FOR_VAR), entry[k]->value); >> + } > > I don't see how update_hash(entry[k], true, ...) can ever fail, so > there's no need to pretend that it can. Right. I did not look into its branches. The 'true' one does not error out. > >> + } >> + >> + DBUG_ASSERT(gathered_length == thd->lex->comment.length); >> + >> + return 0; >> +} >> + >> + >> /** >> Execute a BINLOG statement. >> >> @@ -119,6 +175,23 @@ void mysql_client_binlog_statement(THD* thd) >> rli->sql_driver_thd= thd; >> rli->no_storage= TRUE; >> >> + if (unlikely(is_fragmented= thd->lex->comment.str && thd->lex->ident.str)) >> + if (binlog_defragment(thd)) >> + goto end; >> + >> + if (!(coded_len= thd->lex->comment.length)) >> + { >> + my_error(ER_SYNTAX_ERROR, MYF(0)); >> + goto end; >> + } >> + >> + decoded_len= base64_needed_decoded_length(coded_len); >> + if (!(buf= (char *) my_malloc(decoded_len, MYF(MY_WME)))) >> + { >> + my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR), 1); >> + goto end; >> + } >> + > > Technically, it should be possible to decode base64 in-place and avoid > allocating a second 3GB buffer. But let's not do it in this MDEV :) To my defenses I only can repeat the no-limit argument :-). Huge bunch of thanks! Andrei > >> for (char const *strptr= thd->lex->comment.str ; >> strptr < thd->lex->comment.str + thd->lex->comment.length ; ) >> { > > Regards, > Sergei > Chief Architect MariaDB > and secur...@mariadb.org > > _______________________________________________ > Mailing list: https://launchpad.net/~maria-developers > Post to : maria-developers@lists.launchpad.net > Unsubscribe : https://launchpad.net/~maria-developers > More help : https://help.launchpad.net/ListHelp _______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : maria-developers@lists.launchpad.net Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp