This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 9f82266dbf9 [improve](restore) Compress restore job to reduce editlog size (#42422) 9f82266dbf9 is described below commit 9f82266dbf9b36b7220976157e61a3c7907245fd Author: walter <w41te...@gmail.com> AuthorDate: Thu Oct 31 10:06:40 2024 +0800 [improve](restore) Compress restore job to reduce editlog size (#42422) --- .../main/java/org/apache/doris/common/Config.java | 8 ++++ .../java/org/apache/doris/backup/AbstractJob.java | 6 +-- .../java/org/apache/doris/backup/RestoreJob.java | 53 ++++++++++++++++++++++ 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 437be27a8aa..51ba0593afa 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1482,6 +1482,14 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int max_backup_restore_job_num_per_db = 10; + /** + * A internal config, to reduce the restore job size during serialization by compress. + * + * WARNING: Once this option is enabled and a restore is performed, the FE version cannot be rolled back. + */ + @ConfField(mutable = false) + public static boolean restore_job_compressed_serialization = false; + /** * Control the max num of tablets per backup job involved. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index d2f1dd11979..a7ef5330af3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -40,7 +40,7 @@ import java.util.Map; public abstract class AbstractJob implements Writable { public enum JobType { - BACKUP, RESTORE + BACKUP, RESTORE, RESTORE_COMPRESSED } protected JobType type; @@ -160,8 +160,8 @@ public abstract class AbstractJob implements Writable { JobType type = JobType.valueOf(Text.readString(in)); if (type == JobType.BACKUP) { job = new BackupJob(); - } else if (type == JobType.RESTORE) { - job = new RestoreJob(); + } else if (type == JobType.RESTORE || type == JobType.RESTORE_COMPRESSED) { + job = new RestoreJob(type); } else { throw new IOException("Unknown job type: " + type.name()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 28bcba4adf3..d8f0da5ef8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -98,8 +98,12 @@ import com.google.common.collect.Table.Cell; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -107,6 +111,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public class RestoreJob extends AbstractJob { private static final String PROP_RESERVE_REPLICA = RestoreStmt.PROP_RESERVE_REPLICA; @@ -195,6 +201,10 @@ public class RestoreJob extends AbstractJob { super(JobType.RESTORE); } + public RestoreJob(JobType jobType) { + super(jobType); + } + public RestoreJob(String label, String backupTs, long dbId, String dbName, BackupJobInfo jobInfo, boolean allowLoad, ReplicaAllocation replicaAlloc, long timeoutMs, int metaVersion, boolean reserveReplica, boolean reserveDynamicPartitionEnable, boolean isBeingSynced, boolean isCleanTables, @@ -2421,8 +2431,31 @@ public class RestoreJob extends AbstractJob { @Override public void write(DataOutput out) throws IOException { + if (Config.restore_job_compressed_serialization) { + type = JobType.RESTORE_COMPRESSED; + } super.write(out); + if (Config.restore_job_compressed_serialization) { + type = JobType.RESTORE; + + ByteArrayOutputStream bytesStream = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(bytesStream)) { + try (DataOutputStream stream = new DataOutputStream(gzipStream)) { + writeOthers(stream); + stream.flush(); + } + } + Text text = new Text(bytesStream.toByteArray()); + if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) { + LOG.info("restore job serialized size {}", text.getLength()); + } + text.write(out); + } else { + writeOthers(out); + } + } + private void writeOthers(DataOutput out) throws IOException { Text.writeString(out, backupTimestamp); jobInfo.write(out); out.writeBoolean(allowLoad); @@ -2495,7 +2528,27 @@ public class RestoreJob extends AbstractJob { @Override public void readFields(DataInput in) throws IOException { super.readFields(in); + if (type == JobType.RESTORE_COMPRESSED) { + type = JobType.RESTORE; + + Text text = new Text(); + text.readFields(in); + if (LOG.isDebugEnabled() || text.getLength() > (100 << 20)) { + LOG.info("read restore job compressed size {}", text.getLength()); + } + + ByteArrayInputStream bytesStream = new ByteArrayInputStream(text.getBytes()); + try (GZIPInputStream gzipStream = new GZIPInputStream(bytesStream)) { + try (DataInputStream stream = new DataInputStream(gzipStream)) { + readOthers(stream); + } + } + } else { + readOthers(in); + } + } + private void readOthers(DataInput in) throws IOException { backupTimestamp = Text.readString(in); jobInfo = BackupJobInfo.read(in); allowLoad = in.readBoolean(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org