Preparatory patch for pre-computing binlog checksums outside of holding
LOCK_log.

The existing code for MYSQL_BIN_LOG::write_cache() was needlessly complex
and very hard to understand and modify for handling the new case where
pre-computed checksums are already present in the IO_CACHE.

Greatly simplify the logic by replacing the (implicit) state machine with
direct code that pulls the events one by one from the IO_CACHE. This removes
a lot of state flags and avoids duplicate code for handling full vs. split
headers.

This also removes the need for the CacheWriter class. As a bonus, this fixes
the bug that CacheWriter::write() was completely ignoring write errors. No
other functional changes are done with this patch, only code cleanup.

Signed-off-by: Kristian Nielsen <kniel...@knielsen-hq.org>
---
 sql/log.cc | 237 ++++++++++++++++++++---------------------------------
 1 file changed, 89 insertions(+), 148 deletions(-)

diff --git a/sql/log.cc b/sql/log.cc
index bcb89e79dd0..1ab90389a37 100644
--- a/sql/log.cc
+++ b/sql/log.cc
@@ -7485,40 +7485,6 @@ uint MYSQL_BIN_LOG::next_file_id()
   return res;
 }
 
-class CacheWriter: public Log_event_writer
-{
-public:
-  size_t remains;
-
-  CacheWriter(THD *thd_arg, IO_CACHE *file_arg,
-              enum enum_binlog_checksum_alg checksum_alg,
-              Binlog_crypt_data *cr)
-    : Log_event_writer(file_arg, 0, checksum_alg, cr), remains(0), 
thd(thd_arg),
-      first(true)
-  {
-  }
-
-  ~CacheWriter()
-  { status_var_add(thd->status_var.binlog_bytes_written, bytes_written); }
-
-  int write(uchar* pos, size_t len)
-  {
-    DBUG_ENTER("CacheWriter::write");
-    if (first)
-      write_header(pos, len);
-    else
-      write_data(pos, len);
-
-    remains -= len;
-    if ((first= !remains))
-      write_footer();
-    DBUG_RETURN(0);
-  }
-private:
-  THD *thd;
-  bool first;
-};
-
 /*
   Write the contents of a cache to the binary log.
 
@@ -7543,13 +7509,16 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE 
*cache)
   mysql_mutex_assert_owner(&LOCK_log);
   if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
     DBUG_RETURN(ER_ERROR_ON_WRITE);
-  size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
-  size_t val;
+  /* Amount of remaining bytes in the IO_CACHE read buffer. */
+  size_t length= my_b_bytes_in_cache(cache);
+  size_t group;
   size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 
2 t
-  uchar header[LOG_EVENT_HEADER_LEN];
-  CacheWriter writer(thd, &log_file,
-                     (enum_binlog_checksum_alg)binlog_checksum_options,
-                     &crypto);
+  uchar header_buf[LOG_EVENT_HEADER_LEN];
+  Log_event_writer writer(&log_file, 0,
+                          (enum_binlog_checksum_alg)binlog_checksum_options,
+                          &crypto);
+  uint checksum_len= writer.checksum_len;
+  int err= 0;
 
   if (crypto.scheme)
   {
@@ -7575,128 +7544,100 @@ int MYSQL_BIN_LOG::write_cache(THD *thd, IO_CACHE 
*cache)
   */
 
   group= (size_t)my_b_tell(&log_file);
-  hdr_offs= carry= 0;
-
-  do
+  for (;;)
   {
     /*
-      if we only got a partial header in the last iteration,
-      get the other half now and process a full header.
+      Empty cache at an event boundary means we are done (but empty cache
+      elsewhere is an error).
     */
-    if (unlikely(carry > 0))
-    {
-      DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
-      size_t tail= LOG_EVENT_HEADER_LEN - carry;
-
-      /* assemble both halves */
-      memcpy(&header[carry], (char *)cache->read_pos, tail);
-
-      uint32 len= uint4korr(header + EVENT_LEN_OFFSET);
-      writer.remains= len;
-
-      /* fix end_log_pos */
-      end_log_pos_inc += writer.checksum_len;
-      val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc;
-      int4store(header + LOG_POS_OFFSET, val);
-
-      /* fix len */
-      len+= writer.checksum_len;
-      int4store(header + EVENT_LEN_OFFSET, len);
-
-      if (writer.write(header, LOG_EVENT_HEADER_LEN))
-        DBUG_RETURN(ER_ERROR_ON_WRITE);
+    if (length == 0 &&
+        (length= my_b_fill(cache)) == 0)
+      break;
 
-      cache->read_pos+= tail;
-      length-= tail;
-      carry= 0;
+    DBUG_EXECUTE_IF("fail_binlog_write_1",
+                    {
+                      errno= 28;
+                      goto error_in_write;
+                    });
 
-      /* next event header at ... */
-      hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len;
+    /* Get a full header, using local buffer if split in the IO_CACHE. */
+    uchar *header;
+    if (likely(length > LOG_EVENT_HEADER_LEN))
+    {
+      header= cache->read_pos;
+      cache->read_pos+= LOG_EVENT_HEADER_LEN;
+      length-= LOG_EVENT_HEADER_LEN;
     }
-
-    /* if there is anything to write, process it. */
-
-    if (likely(length > 0))
+    else
     {
-      DBUG_EXECUTE_IF("fail_binlog_write_1",
-                      errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
-      /*
-        process all event-headers in this (partial) cache.
-        if next header is beyond current read-buffer,
-        we'll get it later (though not necessarily in the
-        very next iteration, just "eventually").
-      */
-
-      if (hdr_offs >= length)
+      size_t sofar= length;
+      size_t remain= LOG_EVENT_HEADER_LEN - sofar;
+      header= &header_buf[0];
+      memcpy(header, cache->read_pos, sofar);
+      cache->read_pos+= sofar;
+      do
       {
-        if (writer.write(cache->read_pos, length))
-          DBUG_RETURN(ER_ERROR_ON_WRITE);
-      }
-
-      while (hdr_offs < length)
+        length= my_b_fill(cache);
+        if (!length)
+          goto error_in_read;
+        size_t chunk= std::min(length, remain);
+        memcpy(header + sofar, cache->read_pos, chunk);
+        sofar+= chunk;
+        remain-= chunk;
+        cache->read_pos+= chunk;
+        length-= chunk;
+      } while (unlikely(remain > 0));
+    }
+
+    /* Adjust the length and end_log_pos appropriately. */
+    uint ev_len= uint4korr(header + EVENT_LEN_OFFSET); // netto len
+    DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN);
+    if (unlikely(ev_len < LOG_EVENT_HEADER_LEN))
+      goto error_in_read;
+    int4store(header + EVENT_LEN_OFFSET, ev_len + checksum_len);
+    end_log_pos_inc += checksum_len;
+    size_t val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc;
+    int4store(header + LOG_POS_OFFSET, val);
+
+    /* Write the header to the binlog. */
+    if (writer.write_header(header, LOG_EVENT_HEADER_LEN))
+      goto error_in_write;
+    ev_len-= LOG_EVENT_HEADER_LEN;
+
+    /* Write the rest of the event. */
+    while (ev_len > 0)
+    {
+      if (length == 0)
       {
-        /*
-          finish off with remains of the last event that crawls
-          from previous into the current buffer
-        */
-        if (writer.remains != 0)
-        {
-          if (writer.write(cache->read_pos, hdr_offs))
-            DBUG_RETURN(ER_ERROR_ON_WRITE);
-        }
-
-        /*
-          partial header only? save what we can get, process once
-          we get the rest.
-        */
-        if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
-        {
-          carry= length - hdr_offs;
-          memcpy(header, (char *)cache->read_pos + hdr_offs, carry);
-          length= hdr_offs;
-        }
-        else
-        {
-          /* we've got a full event-header, and it came in one piece */
-          uchar *ev= (uchar *)cache->read_pos + hdr_offs;
-          uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
-          uchar *log_pos= ev + LOG_POS_OFFSET;
-
-          end_log_pos_inc += writer.checksum_len;
-          /* fix end_log_pos */
-          val= uint4korr(log_pos) + group + end_log_pos_inc;
-          int4store(log_pos, val);
-
-          /* fix length */
-          int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len);
-
-          writer.remains= ev_len;
-          if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs)))
-            DBUG_RETURN(ER_ERROR_ON_WRITE);
+        length= my_b_fill(cache);
+        if (!length)
+          goto error_in_read;
+      }
+      uint chunk= std::min(ev_len, (uint)length);
+      if (writer.write_data(cache->read_pos, chunk))
+        goto error_in_write;
+      cache->read_pos+= chunk;
+      length-= chunk;
+      ev_len-= chunk;
+    }
 
-          /* next event header at ... */
-          hdr_offs += ev_len; // incr by the netto len
+    if (writer.write_footer())
+      goto error_in_write;
 
-          DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs 
>= length);
-        }
-      }
+  }
+  goto end;    // All OK
 
-      /*
-        Adjust hdr_offs. Note that it may still point beyond the segment
-        read in the next iteration; if the current event is very long,
-        it may take a couple of read-iterations (and subsequent adjustments
-        of hdr_offs) for it to point into the then-current segment.
-        If we have a split header (!carry), hdr_offs will be set at the
-        beginning of the next iteration, overwriting the value we set here:
-      */
-      hdr_offs -= length;
-    }
-  } while ((length= my_b_fill(cache)));
+error_in_write:
+  err= ER_ERROR_ON_WRITE;
+  goto end;
 
-  DBUG_ASSERT(carry == 0);
-  DBUG_ASSERT(!writer.checksum_len || writer.remains == 0);
+error_in_read:
+  err= ER_ERROR_ON_READ;
+  goto end;
 
-  DBUG_RETURN(0);                               // All OK
+end:
+  status_var_add(thd->status_var.binlog_bytes_written, writer.bytes_written);
+  DBUG_RETURN(err);
 }
 
 /*
-- 
2.30.2

_______________________________________________
commits mailing list -- commits@lists.mariadb.org
To unsubscribe send an email to commits-le...@lists.mariadb.org

Reply via email to