bernardodemarco commented on code in PR #12758:
URL: https://github.com/apache/cloudstack/pull/12758#discussion_r3391140382


##########
api/src/main/java/org/apache/cloudstack/backup/InternalBackupProvider.java:
##########
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.backup;
+
+import com.cloud.storage.Volume;
+import com.cloud.uservm.UserVm;
+import com.cloud.utils.Pair;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.snapshot.VMSnapshot;
+import org.apache.cloudstack.framework.config.ConfigKey;
+
+import java.util.Set;
+
+public interface InternalBackupProvider extends BackupProvider {
+    String VM_WORK_JOB_HANDLER = InternalBackupService.class.getSimpleName();
+
+    ConfigKey<Integer> backupCompressionTimeout = new ConfigKey<>("Advanced", 
Integer.class, "backup.compression.timeout", "28800", "Backup compression 
timeout (in " +
+            "seconds). Will only start counting once the backup compression 
async job actually starts.", true, ConfigKey.Scope.Cluster);

Review Comment:
   I think that it would be interesting to mention that these settings are 
currently only applicable for KBOSS



##########
api/src/main/java/org/apache/cloudstack/api/command/user/backup/FinishBackupChainCmd.java:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.command.user.backup;
+
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.NetworkRuleConflictException;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.user.Account;
+import com.cloud.vm.VirtualMachine;
+import org.apache.cloudstack.api.ACL;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.response.SuccessResponse;
+import org.apache.cloudstack.api.response.VirtualMachineResponse;
+import org.apache.cloudstack.backup.InternalBackupService;
+
+import javax.inject.Inject;
+
+@APICommand(name = "finishBackupChain", description = "Finish backup chain of 
VM.",
+        responseObject = SuccessResponse.class, since = "4.23.0.0", 
requestHasSensitiveInfo = false,

Review Comment:
   ```suggestion
   @APICommand(name = "finishBackupChain", description = "Finish backup chain 
of VM.",
           responseObject = SuccessResponse.class, since = "4.23.0", 
requestHasSensitiveInfo = false,
   ```



##########
api/src/main/java/org/apache/cloudstack/api/command/user/backup/FinishBackupChainCmd.java:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.command.user.backup;
+
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.NetworkRuleConflictException;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.user.Account;
+import com.cloud.vm.VirtualMachine;
+import org.apache.cloudstack.api.ACL;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.response.SuccessResponse;
+import org.apache.cloudstack.api.response.VirtualMachineResponse;
+import org.apache.cloudstack.backup.InternalBackupService;
+
+import javax.inject.Inject;
+
+@APICommand(name = "finishBackupChain", description = "Finish backup chain of 
VM.",
+        responseObject = SuccessResponse.class, since = "4.23.0.0", 
requestHasSensitiveInfo = false,
+        responseHasSensitiveInfo = false)
+public class FinishBackupChainCmd extends BaseCmd {
+    @Inject
+    private InternalBackupService internalBackupService;
+
+    /////////////////////////////////////////////////////
+    //////////////// API parameters /////////////////////
+    /////////////////////////////////////////////////////
+
+    @ACL
+    @Parameter(name = ApiConstants.VIRTUAL_MACHINE_ID, type = 
CommandType.UUID, entityType = VirtualMachineResponse.class, required = true,
+            description = "Id of the VM to finish the chain.")

Review Comment:
   ```suggestion
               description = "ID of the VM to finish the chain.")
   ```



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtValidateKbossVmCommandWrapper.java:
##########
@@ -0,0 +1,215 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.to.VirtualMachineTO;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.script.Script;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.json.JsonSanitizer;
+import org.apache.cloudstack.backup.ValidateKbossVmAnswer;
+import org.apache.cloudstack.backup.ValidateKbossVmCommand;
+import org.libvirt.Domain;
+import org.libvirt.LibvirtException;
+
+import javax.imageio.ImageIO;
+import java.awt.image.BufferedImage;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Random;
+
+@ResourceWrapper(handles =  ValidateKbossVmCommand.class)
+public class LibvirtValidateKbossVmCommandWrapper extends 
CommandWrapper<ValidateKbossVmCommand, Answer, LibvirtComputingResource> {
+
+    private static final String SCREENSHOT_COMMAND = "virsh screenshot 
--domain %s --file %s";
+    private static final String GUEST_SYNC_COMMAND = "{\"execute\": 
\"guest-sync\", \"arguments\":{\"id\":%s}}";
+    private static final String GUEST_EXEC_COMMAND = "{\"execute\": 
\"guest-exec\", 
\"arguments\":{\"path\":\"%s\",\"arg\":%s,\"capture-output\":true}}";
+    private static final String GUEST_EXEC_STATUS_COMMAND = "{\"execute\": 
\"guest-exec-status\", \"arguments\":{\"pid\":%s}}";
+
+    @Override
+    public Answer execute(ValidateKbossVmCommand command, 
LibvirtComputingResource serverResource) {
+        VirtualMachineTO vmTo = command.getVm();
+        KVMStoragePool secondaryStorage = null;
+        KVMStoragePoolManager storagePoolMgr = 
serverResource.getStoragePoolMgr();
+        try {
+            Domain vm = 
serverResource.getDomain(serverResource.getLibvirtUtilitiesHelper().getConnection(),
 vmTo.getName());
+            secondaryStorage = 
storagePoolMgr.getStoragePoolByURI(command.getBackupDeltaTO().getDataStore().getUrl());
+            logger.info("Validating VM [{}].", vm.getName());
+            boolean bootValidated = waitForBoot(command, vm);
+            String screenshotPath = takeScreenshot(command, vm, 
secondaryStorage, serverResource);
+            String scriptResult = runScript(command, vm);
+            return new  ValidateKbossVmAnswer(command, bootValidated, 
screenshotPath, scriptResult);
+        } catch (LibvirtException e) {
+            logger.error("Received libvirt exception while trying to validate 
VM [{}].", vmTo.getName(), e);

Review Comment:
   ```suggestion
               logger.error("Received Libvirt exception while trying to 
validate VM [{}].", vmTo.getName(), e);
   ```



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtTakeKbossBackupCommandWrapper.java:
##########
@@ -0,0 +1,392 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.BackupException;
+import org.apache.cloudstack.backup.TakeKbossBackupAnswer;
+import org.apache.cloudstack.backup.TakeKbossBackupCommand;
+import org.apache.cloudstack.storage.to.BackupDeltaTO;
+import org.apache.cloudstack.storage.to.DeltaMergeTreeTO;
+import org.apache.cloudstack.storage.to.KbossTO;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.cloudstack.utils.qemu.QemuImageOptions;
+import org.apache.cloudstack.utils.qemu.QemuImg;
+import org.apache.cloudstack.utils.qemu.QemuImgException;
+import org.apache.cloudstack.utils.qemu.QemuImgFile;
+import org.libvirt.LibvirtException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+@ResourceWrapper(handles = TakeKbossBackupCommand.class)
+public class LibvirtTakeKbossBackupCommandWrapper extends 
CommandWrapper<TakeKbossBackupCommand, Answer, LibvirtComputingResource> {
+    @Override
+    public Answer execute(TakeKbossBackupCommand command, 
LibvirtComputingResource resource) {
+        String vmName = command.getVmName();
+        logger.info("Starting backup process for VM [{}].", vmName);
+        List<KbossTO> kbossTOS = command.getKbossTOs();
+        List<Pair<VolumeObjectTO, String>> volumeTosAndNewPaths =
+                kbossTOS.stream().map(kbossTO -> new 
Pair<>(kbossTO.getVolumeObjectTO(), 
kbossTO.getDeltaPathOnPrimary())).collect(Collectors.toList());
+
+        Map<String, Pair<String, Long>> 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize = new HashMap<>();
+        Map<String, String> mapVolumeUuidToNewVolumePath = new HashMap<>();
+
+        KVMStoragePoolManager storagePoolManager = 
resource.getStoragePoolMgr();
+        boolean runningVM = command.isRunningVM();
+
+        try {
+            if (runningVM) {
+                
resource.createDiskOnlyVmSnapshotForRunningVm(volumeTosAndNewPaths, vmName, 
UUID.randomUUID().toString(), command.isQuiesceVm());
+            } else {
+                
resource.createDiskOnlyVMSnapshotOfStoppedVm(volumeTosAndNewPaths, vmName);
+            }
+
+            backupVolumes(command, resource, storagePoolManager, kbossTOS, 
volumeTosAndNewPaths, vmName, runningVM, 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize);
+
+            cleanupVm(command, resource, kbossTOS, vmName, runningVM, 
mapVolumeUuidToNewVolumePath);
+        } catch (BackupException ex) {
+            return new TakeKbossBackupAnswer(command, ex);
+        }
+
+        return new TakeKbossBackupAnswer(command, true, 
mapVolumeUuidToNewVolumePath, mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize);
+    }
+
+    /**
+     * Backup (copy) volumes to secondary storage. Will also populate the 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize argument.
+     * The timeout for this method is guided by the wait time for the given 
command, if the wait time is bigger than 24 days, there will be an overflow on 
the timeout.
+     * <br/>
+     * If an exception is caught while copying the volumes, will try to 
recover the VM to the previous state so that it is consistent.
+     * */
+    protected void backupVolumes(TakeKbossBackupCommand command, 
LibvirtComputingResource resource, KVMStoragePoolManager storagePoolManager, 
List<KbossTO> kbossTOS,
+            List<Pair<VolumeObjectTO, String>> volumeTosAndNewPaths, String 
vmName, boolean runningVM,
+            Map<String, Pair<String, Long>> 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize) {
+        try {
+            int maxWaitInMillis = command.getWait() * 1000;
+            for (KbossTO kbossTO : kbossTOS) {
+                long startTimeMillis = System.currentTimeMillis();
+                VolumeObjectTO volumeObjectTO = kbossTO.getVolumeObjectTO();
+                String volumeUuid = volumeObjectTO.getUuid();
+
+                logger.debug("Backing up volume [{}].", volumeUuid);
+                Pair<String, Long> deltaPathOnSecondaryAndSize = 
copyBackupDeltaToSecondary(storagePoolManager, kbossTO, 
command.getBackupChainImageStoreUrls(),
+                        command.getImageStoreUrl(), maxWaitInMillis);
+
+                
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize.put(volumeUuid, 
deltaPathOnSecondaryAndSize);
+                maxWaitInMillis = calculateRemainingTime(maxWaitInMillis, 
startTimeMillis);
+            }
+        } catch (Exception ex) {
+            logger.error("There has been an exception during the backup 
creation process. We will try to revert the VM [{}] to its previous state. The 
exception is: {}", vmName,
+                    ex.getMessage(), ex);
+            recoverPreviousVmStateAndDeletePartialBackup(resource, 
volumeTosAndNewPaths, vmName, runningVM, 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize, storagePoolManager,
+                    command.getImageStoreUrl());
+
+            throw new BackupException(String.format("There was an exception 
during the backup process for VM [%s], but the VM has been successfully 
normalized.", vmName), ex,
+                    true);
+        }
+    }
+
+    protected int calculateRemainingTime(int maxWaitInMillis, long 
startTimeMillis) throws TimeoutException {
+        maxWaitInMillis -= (int)(System.currentTimeMillis() - startTimeMillis);
+        if (maxWaitInMillis < 0) {
+            throw new TimeoutException("Timeout while converting backups to 
secondary storage.");
+        }
+        return maxWaitInMillis;
+    }
+
+    /**
+     * For each KbossTO, will merge its DeltaMergeTreeTO (if it exists). Also, 
if this is the end of the chain, will also end the chain for the volume.
+     * Will populate the mapVolumeUuidToNewVolumePath argument.
+     * */
+    protected void cleanupVm(TakeKbossBackupCommand command, 
LibvirtComputingResource resource, List<KbossTO> kbossTOS, String vmName, 
boolean runningVM,
+            Map<String, String> mapVolumeUuidToNewVolumePath) {
+        for (KbossTO kbossTO : kbossTOS) {
+            VolumeObjectTO volumeObjectTO = kbossTO.getVolumeObjectTO();
+            String currentVolumePath = volumeObjectTO.getPath();
+            String volumeUuid = volumeObjectTO.getUuid();
+            DeltaMergeTreeTO deltaMergeTreeTO = kbossTO.getDeltaMergeTreeTO();
+            volumeObjectTO.setPath(kbossTO.getDeltaPathOnPrimary());
+
+            if (deltaMergeTreeTO != null) {
+                List<String> snapshotDataStoreVos = 
kbossTO.getVmSnapshotDeltaPaths();
+                mergeBackupDelta(resource, deltaMergeTreeTO, volumeObjectTO, 
vmName, runningVM, volumeUuid, snapshotDataStoreVos.isEmpty());
+            }
+
+            if (command.isEndChain() || command.isIsolated()) {
+                String baseVolumePath = currentVolumePath;
+                if (deltaMergeTreeTO != null && 
deltaMergeTreeTO.getChild().getPath().equals(baseVolumePath)) {
+                    baseVolumePath = deltaMergeTreeTO.getParent().getPath();
+                }
+                endChainForVolume(resource, volumeObjectTO, vmName, runningVM, 
volumeUuid, baseVolumePath);
+                mapVolumeUuidToNewVolumePath.put(volumeUuid, baseVolumePath);
+            } else {
+                mapVolumeUuidToNewVolumePath.put(volumeUuid, 
kbossTO.getDeltaPathOnPrimary());
+            }
+        }
+    }
+
+    /**
+     * Copy the backup delta to the secondary storage. Since we created a 
snapshot on top of the volume, the volume is now the backup delta.
+     * If there were snapshots created after the last backup, they'll be 
copied alongside and merged in the secondary storage.
+     * */
+    protected Pair<String, Long> 
copyBackupDeltaToSecondary(KVMStoragePoolManager storagePoolManager, KbossTO 
kbossTO, List<String> chainImageStoreUrls, String imageStoreUrl,
+            int waitInMillis) {
+        VolumeObjectTO delta = kbossTO.getVolumeObjectTO();
+        String parentDeltaPathOnSecondary = 
kbossTO.getPathBackupParentOnSecondary();
+        List<String> deltaPathsToCopy = kbossTO.getVmSnapshotDeltaPaths();
+        deltaPathsToCopy.add(delta.getPath());
+
+        KVMStoragePool parentImagePool = null;
+        List<KVMStoragePool> chainImagePools = null;
+        KVMStoragePool imagePool = null;
+        long backupSize;
+        final String backupOnSecondary = kbossTO.getDeltaPathOnSecondary();
+        ArrayList<String> temporaryDeltasToRemove = new ArrayList<>();
+        boolean result = false;
+        try {
+            imagePool = storagePoolManager.getStoragePoolByURI(imageStoreUrl);
+            if (chainImageStoreUrls != null) {
+                parentImagePool = 
storagePoolManager.getStoragePoolByURI(chainImageStoreUrls.get(0));
+                chainImagePools = chainImageStoreUrls.subList(1, 
chainImageStoreUrls.size()).stream().map(storagePoolManager::getStoragePoolByURI).collect(Collectors.toList());
+            }
+
+            PrimaryDataStoreTO primaryDataStoreTO = (PrimaryDataStoreTO) 
delta.getDataStore();
+            KVMStoragePool primaryPool = 
storagePoolManager.getStoragePool(primaryDataStoreTO.getPoolType(), 
primaryDataStoreTO.getUuid());
+
+            String topDelta = backupOnSecondary;
+            while (!deltaPathsToCopy.isEmpty()) {
+                String backupDeltaFullPathOnSecondary = 
imagePool.getLocalPathFor(topDelta);
+                temporaryDeltasToRemove.add(backupDeltaFullPathOnSecondary);
+                String parentBackupFullPath = null;
+
+                if (parentDeltaPathOnSecondary != null) {
+                    parentBackupFullPath = 
parentImagePool.getLocalPathFor(parentDeltaPathOnSecondary);
+                }
+
+                String backupDeltaFullPathOnPrimary = 
primaryPool.getLocalPathFor(deltaPathsToCopy.remove(0));
+                convertDeltaToSecondary(backupDeltaFullPathOnPrimary, 
backupDeltaFullPathOnSecondary, parentBackupFullPath, delta.getUuid(), 
waitInMillis);
+
+                if (!deltaPathsToCopy.isEmpty()) {
+                    parentDeltaPathOnSecondary = topDelta;
+                    topDelta = 
getRelativePathOnSecondaryForBackup(delta.getAccountId(), delta.getVolumeId(), 
UUID.randomUUID().toString());
+                    parentImagePool = imagePool;
+                }
+            }
+
+            String backupOnSecondaryFullPath = 
imagePool.getLocalPathFor(backupOnSecondary);
+
+            commitTopDeltaOnBaseBackupOnSecondaryIfNeeded(topDelta, 
backupOnSecondary, imagePool, backupOnSecondaryFullPath, waitInMillis);
+
+            backupSize = Files.size(Path.of(backupOnSecondaryFullPath));
+            result = true;
+        } catch (LibvirtException | QemuImgException | IOException e) {
+            logger.error("Exception while converting backup [{}] to secondary 
storage [{}] due to: [{}].", delta.getPath(), imagePool, e.getMessage(), e);
+            throw new BackupException("Exception while converting backup to 
secondary storage.", e, true);
+        } finally {
+            removeTemporaryDeltas(temporaryDeltasToRemove, result);
+
+            if (parentImagePool != null) {
+                
storagePoolManager.deleteStoragePool(parentImagePool.getType(), 
parentImagePool.getUuid());
+            }
+            if (chainImagePools != null) {
+                chainImagePools.forEach(pool -> 
storagePoolManager.deleteStoragePool(pool.getType(), pool.getUuid()));
+            }
+            if (imagePool != null) {
+                storagePoolManager.deleteStoragePool(imagePool.getType(), 
imagePool.getUuid());
+            }
+        }
+        return new Pair<>(backupOnSecondary, backupSize);
+    }
+
+    /**
+     * If there were VM snapshots created after the last backup, we will have 
copied them alongside the backup delta. If this is the case, we will commit all 
of them into a single
+     * base file so that we are left with one file per volume per backup.
+     * */
+    protected void commitTopDeltaOnBaseBackupOnSecondaryIfNeeded(String 
topDelta, String backupOnSecondary, KVMStoragePool imagePool, String 
backupOnSecondaryFullPath,
+            int waitInMillis) throws LibvirtException, QemuImgException {
+        if (topDelta.equals(backupOnSecondary)) {
+            return;
+        }
+
+        QemuImg qemuImg = new QemuImg(waitInMillis);
+        QemuImgFile topDeltaImg = new 
QemuImgFile(imagePool.getLocalPathFor(topDelta), 
QemuImg.PhysicalDiskFormat.QCOW2);
+        QemuImgFile baseDeltaImg = new QemuImgFile(backupOnSecondaryFullPath, 
QemuImg.PhysicalDiskFormat.QCOW2);
+
+        logger.debug("Committing top delta [{}] on base delta [{}].", 
topDeltaImg, baseDeltaImg);
+        qemuImg.commit(topDeltaImg, baseDeltaImg, true);
+    }
+
+    /**
+     * Will remove any temporary deltas created on secondary storage. If 
result is true, this means that the backup was a success and the first 
"temporary delta" is our backup, so
+     * it will not be removed.
+     * <br/>
+     * There are two uses for this method:<br/>
+     * - If we fail to backup we have to clean up the secondary storage.<br/>
+     * - If we had VM snapshots created after the last backup, we copied 
multiple files to secondary storage, and thus we have to clean them up after 
merging them.
+     * */
+    protected void removeTemporaryDeltas(List<String> temporaryDeltasToRemove, 
boolean result) {
+        if (result) {
+            temporaryDeltasToRemove.remove(0);
+        }
+        logger.debug("Removing temporary deltas [{}].", 
temporaryDeltasToRemove);
+        for (String delta : temporaryDeltasToRemove) {
+            try {
+                Files.deleteIfExists(Path.of(delta));
+            } catch (IOException ex) {
+                logger.error("Failed to remove temporary delta [{}]. Will not 
stop the backup process, but this should be investigated.", delta, ex);
+            }
+        }
+    }
+
+    /**
+     * Converts a delta from primary storage to secondary storage, if a parent 
was given, will set it as the backing file for the delta being copied.
+     *
+     * @param pathDeltaOnPrimary absolute path of the delta to be copied.
+     * @param pathDeltaOnSecondary absolute path of the destination of the 
delta to be copied.
+     * @param pathParentOnSecondary absolute path of the parent delta, if it 
exists.
+     * @param volumeUuid volume uuid, used for logging.
+     * @param waitInMillis timeout in milliseconds.
+     * */
+    protected void convertDeltaToSecondary(String pathDeltaOnPrimary, String 
pathDeltaOnSecondary, String pathParentOnSecondary, String volumeUuid, int 
waitInMillis)
+            throws QemuImgException, LibvirtException {
+        QemuImgFile backupDestination = new QemuImgFile(pathDeltaOnSecondary, 
QemuImg.PhysicalDiskFormat.QCOW2);
+        QemuImgFile backupOrigin = new QemuImgFile(pathDeltaOnPrimary, 
QemuImg.PhysicalDiskFormat.QCOW2);
+        QemuImgFile parentBackup = null;
+
+        if (pathParentOnSecondary != null) {
+            parentBackup = new QemuImgFile(pathParentOnSecondary, 
QemuImg.PhysicalDiskFormat.QCOW2);
+        }
+
+        logger.debug("Converting delta [{}] to [{}] with {}", backupOrigin, 
backupDestination, parentBackup == null ? "no parent." : String.format("parent 
[%s].", parentBackup));
+
+        createDirsIfNeeded(pathDeltaOnSecondary, volumeUuid);
+
+        QemuImg qemuImg = new QemuImg(waitInMillis);
+        qemuImg.convert(backupOrigin, backupDestination, parentBackup, null, 
null,  new QemuImageOptions(backupOrigin.getFormat(), 
backupOrigin.getFileName(), null), null,
+                true, false, false, false, null, null);
+    }
+
+
+    protected void endChainForVolume(LibvirtComputingResource resource, 
VolumeObjectTO volumeObjectTO, String vmName, boolean isVmRunning, String 
volumeUuid, String baseVolumePath)
+            throws BackupException {
+
+        BackupDeltaTO baseVolume = new 
BackupDeltaTO(volumeObjectTO.getDataStore(), Hypervisor.HypervisorType.KVM, 
baseVolumePath);
+        DeltaMergeTreeTO deltaMergeTreeTO = new 
DeltaMergeTreeTO(volumeObjectTO, baseVolume, volumeObjectTO, new ArrayList<>());
+
+        logger.debug("Ending backup chain for volume [{}], the next backup 
will be a full backup.", volumeObjectTO.getUuid());
+
+        mergeBackupDelta(resource, deltaMergeTreeTO, volumeObjectTO, vmName, 
isVmRunning, volumeUuid, false);
+    }
+
+    /**
+     * Tries to recover the previous state of the VM. Should only be called if 
an exception in the backup creation process happened.<br/>
+     * For each volume, will:<br/>
+     *  - Merge back any backup deltas created;
+     *  - Remove the data backed up to the secondary storage;
+     * */
+    protected void 
recoverPreviousVmStateAndDeletePartialBackup(LibvirtComputingResource resource, 
List<Pair<VolumeObjectTO, String>> volumeTosAndNewPaths, String vmName,
+            boolean runningVm, Map<String, Pair<String, Long>> 
mapVolumeUuidToDeltaPathOnSecondaryAndSize, KVMStoragePoolManager 
storagePoolManager, String imageStoreUrl) {
+        for (Pair<VolumeObjectTO, String> volumeObjectTOAndNewPath : 
volumeTosAndNewPaths) {
+            VolumeObjectTO volumeObjectTO = volumeObjectTOAndNewPath.first();
+            String volumeUuid = volumeObjectTO.getUuid();
+
+            BackupDeltaTO oldDelta = new 
BackupDeltaTO(volumeObjectTO.getDataStore(), Hypervisor.HypervisorType.KVM, 
volumeObjectTO.getPath());
+            volumeObjectTO.setPath(volumeObjectTOAndNewPath.second());
+            DeltaMergeTreeTO deltaMergeTreeTO = new 
DeltaMergeTreeTO(volumeObjectTO, oldDelta, volumeObjectTO, new ArrayList<>());
+
+            mergeBackupDelta(resource, deltaMergeTreeTO, volumeObjectTO, 
vmName, runningVm, volumeUuid, false);
+
+            Pair<String, Long> deltaPathOnSecondaryAndSize = 
mapVolumeUuidToDeltaPathOnSecondaryAndSize.get(volumeUuid);
+            if (deltaPathOnSecondaryAndSize == null) {
+                continue;
+            }
+
+            cleanupDeltaOnSecondary(storagePoolManager, imageStoreUrl, 
deltaPathOnSecondaryAndSize.first());
+        }
+    }
+
+    protected void cleanupDeltaOnSecondary(KVMStoragePoolManager 
storagePoolManager, String imageStoreUrl, String deltaPath) {
+        KVMStoragePool imagePool = null;
+
+        try {
+            imagePool = storagePoolManager.getStoragePoolByURI(imageStoreUrl);
+            String fullDeltaPath = imagePool.getLocalPathFor(deltaPath);
+
+            logger.debug("Cleaning up delta at [{}] as part of the post backup 
error normalization effort.", fullDeltaPath);
+
+            Files.deleteIfExists(Path.of(fullDeltaPath));
+        } catch (IOException e) {
+            logger.error("Exception while trying to cleanup delta at [{}].", 
deltaPath, e);
+        } finally {
+            if (imagePool != null) {
+                storagePoolManager.deleteStoragePool(imagePool.getType(), 
imagePool.getUuid());
+            }
+        }
+    }
+
+
+    protected void mergeBackupDelta(LibvirtComputingResource resource, 
DeltaMergeTreeTO deltaMergeTreeTO, VolumeObjectTO volumeObjectTO, String 
vmName, boolean isVmRunning,
+            String volumeUuid, boolean countNewestDeltaAsGrandchild) throws 
BackupException {
+        try {
+            if (isVmRunning) {
+                resource.mergeDeltaForRunningVm(deltaMergeTreeTO, vmName, 
volumeObjectTO);
+            } else {
+                if (countNewestDeltaAsGrandchild) {
+                    deltaMergeTreeTO.addGrandChild(volumeObjectTO);
+                }
+                resource.mergeDeltaForStoppedVm(deltaMergeTreeTO);
+            }
+        } catch (LibvirtException | QemuImgException | IOException e) {
+            logger.error("Exception while merging the last backup delta using 
delta merge tree [{}] for VM [{}] and volume [{}].", deltaMergeTreeTO, vmName, 
volumeUuid, e);
+            throw new BackupException(String.format("Exception during backup 
wrap-up phase for VM [%s].", vmName), e, false);
+        }
+    }
+
+    protected String getRelativePathOnSecondaryForBackup(long accountId, long 
volumeId, String backupPath) {
+        return String.format("%s%s%s%s%s%s%s", "backups", File.separator, 
accountId, File.separator, volumeId, File.separator, backupPath);

Review Comment:
   Wouldn't it be more readable to use String concatenation here?



##########
api/src/main/java/org/apache/cloudstack/api/command/user/backup/FinishBackupChainCmd.java:
##########
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.cloudstack.api.command.user.backup;
+
+import com.cloud.exception.ConcurrentOperationException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.NetworkRuleConflictException;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.user.Account;
+import com.cloud.vm.VirtualMachine;
+import org.apache.cloudstack.api.ACL;
+import org.apache.cloudstack.api.APICommand;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.BaseCmd;
+import org.apache.cloudstack.api.Parameter;
+import org.apache.cloudstack.api.ServerApiException;
+import org.apache.cloudstack.api.response.SuccessResponse;
+import org.apache.cloudstack.api.response.VirtualMachineResponse;
+import org.apache.cloudstack.backup.InternalBackupService;
+
+import javax.inject.Inject;
+
+@APICommand(name = "finishBackupChain", description = "Finish backup chain of 
VM.",
+        responseObject = SuccessResponse.class, since = "4.23.0.0", 
requestHasSensitiveInfo = false,
+        responseHasSensitiveInfo = false)

Review Comment:
   The API returns a successful message for VMs that are not associated with a 
backup offering. I believe that it would be interesting to return a more 
semantic message for instances that are not related with KBOSS offerings



##########
plugins/hypervisors/kvm/src/main/java/com/cloud/hypervisor/kvm/resource/wrapper/LibvirtTakeKbossBackupCommandWrapper.java:
##########
@@ -0,0 +1,392 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package com.cloud.hypervisor.kvm.resource.wrapper;
+
+import com.cloud.agent.api.Answer;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.kvm.resource.LibvirtComputingResource;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePool;
+import com.cloud.hypervisor.kvm.storage.KVMStoragePoolManager;
+import com.cloud.resource.CommandWrapper;
+import com.cloud.resource.ResourceWrapper;
+import com.cloud.utils.Pair;
+import com.cloud.utils.exception.BackupException;
+import org.apache.cloudstack.backup.TakeKbossBackupAnswer;
+import org.apache.cloudstack.backup.TakeKbossBackupCommand;
+import org.apache.cloudstack.storage.to.BackupDeltaTO;
+import org.apache.cloudstack.storage.to.DeltaMergeTreeTO;
+import org.apache.cloudstack.storage.to.KbossTO;
+import org.apache.cloudstack.storage.to.PrimaryDataStoreTO;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.cloudstack.utils.qemu.QemuImageOptions;
+import org.apache.cloudstack.utils.qemu.QemuImg;
+import org.apache.cloudstack.utils.qemu.QemuImgException;
+import org.apache.cloudstack.utils.qemu.QemuImgFile;
+import org.libvirt.LibvirtException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+@ResourceWrapper(handles = TakeKbossBackupCommand.class)
+public class LibvirtTakeKbossBackupCommandWrapper extends 
CommandWrapper<TakeKbossBackupCommand, Answer, LibvirtComputingResource> {
+    @Override
+    public Answer execute(TakeKbossBackupCommand command, 
LibvirtComputingResource resource) {
+        String vmName = command.getVmName();
+        logger.info("Starting backup process for VM [{}].", vmName);
+        List<KbossTO> kbossTOS = command.getKbossTOs();
+        List<Pair<VolumeObjectTO, String>> volumeTosAndNewPaths =
+                kbossTOS.stream().map(kbossTO -> new 
Pair<>(kbossTO.getVolumeObjectTO(), 
kbossTO.getDeltaPathOnPrimary())).collect(Collectors.toList());
+
+        Map<String, Pair<String, Long>> 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize = new HashMap<>();
+        Map<String, String> mapVolumeUuidToNewVolumePath = new HashMap<>();
+
+        KVMStoragePoolManager storagePoolManager = 
resource.getStoragePoolMgr();
+        boolean runningVM = command.isRunningVM();
+
+        try {
+            if (runningVM) {
+                
resource.createDiskOnlyVmSnapshotForRunningVm(volumeTosAndNewPaths, vmName, 
UUID.randomUUID().toString(), command.isQuiesceVm());
+            } else {
+                
resource.createDiskOnlyVMSnapshotOfStoppedVm(volumeTosAndNewPaths, vmName);
+            }
+
+            backupVolumes(command, resource, storagePoolManager, kbossTOS, 
volumeTosAndNewPaths, vmName, runningVM, 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize);
+
+            cleanupVm(command, resource, kbossTOS, vmName, runningVM, 
mapVolumeUuidToNewVolumePath);
+        } catch (BackupException ex) {
+            return new TakeKbossBackupAnswer(command, ex);
+        }
+
+        return new TakeKbossBackupAnswer(command, true, 
mapVolumeUuidToNewVolumePath, mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize);
+    }
+
+    /**
+     * Backup (copy) volumes to secondary storage. Will also populate the 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize argument.
+     * The timeout for this method is guided by the wait time for the given 
command, if the wait time is bigger than 24 days, there will be an overflow on 
the timeout.
+     * <br/>
+     * If an exception is caught while copying the volumes, will try to 
recover the VM to the previous state so that it is consistent.
+     * */
+    protected void backupVolumes(TakeKbossBackupCommand command, 
LibvirtComputingResource resource, KVMStoragePoolManager storagePoolManager, 
List<KbossTO> kbossTOS,
+            List<Pair<VolumeObjectTO, String>> volumeTosAndNewPaths, String 
vmName, boolean runningVM,
+            Map<String, Pair<String, Long>> 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize) {
+        try {
+            int maxWaitInMillis = command.getWait() * 1000;
+            for (KbossTO kbossTO : kbossTOS) {
+                long startTimeMillis = System.currentTimeMillis();
+                VolumeObjectTO volumeObjectTO = kbossTO.getVolumeObjectTO();
+                String volumeUuid = volumeObjectTO.getUuid();
+
+                logger.debug("Backing up volume [{}].", volumeUuid);
+                Pair<String, Long> deltaPathOnSecondaryAndSize = 
copyBackupDeltaToSecondary(storagePoolManager, kbossTO, 
command.getBackupChainImageStoreUrls(),
+                        command.getImageStoreUrl(), maxWaitInMillis);
+
+                
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize.put(volumeUuid, 
deltaPathOnSecondaryAndSize);
+                maxWaitInMillis = calculateRemainingTime(maxWaitInMillis, 
startTimeMillis);
+            }
+        } catch (Exception ex) {
+            logger.error("There has been an exception during the backup 
creation process. We will try to revert the VM [{}] to its previous state. The 
exception is: {}", vmName,
+                    ex.getMessage(), ex);
+            recoverPreviousVmStateAndDeletePartialBackup(resource, 
volumeTosAndNewPaths, vmName, runningVM, 
mapVolumeUuidToDeltaPathOnSecondaryAndDeltaSize, storagePoolManager,
+                    command.getImageStoreUrl());
+
+            throw new BackupException(String.format("There was an exception 
during the backup process for VM [%s], but the VM has been successfully 
normalized.", vmName), ex,
+                    true);
+        }
+    }
+
+    protected int calculateRemainingTime(int maxWaitInMillis, long 
startTimeMillis) throws TimeoutException {
+        maxWaitInMillis -= (int)(System.currentTimeMillis() - startTimeMillis);
+        if (maxWaitInMillis < 0) {
+            throw new TimeoutException("Timeout while converting backups to 
secondary storage.");
+        }
+        return maxWaitInMillis;
+    }
+
+    /**
+     * For each KbossTO, will merge its DeltaMergeTreeTO (if it exists). Also, 
if this is the end of the chain, will also end the chain for the volume.
+     * Will populate the mapVolumeUuidToNewVolumePath argument.
+     * */
+    protected void cleanupVm(TakeKbossBackupCommand command, 
LibvirtComputingResource resource, List<KbossTO> kbossTOS, String vmName, 
boolean runningVM,
+            Map<String, String> mapVolumeUuidToNewVolumePath) {
+        for (KbossTO kbossTO : kbossTOS) {
+            VolumeObjectTO volumeObjectTO = kbossTO.getVolumeObjectTO();
+            String currentVolumePath = volumeObjectTO.getPath();
+            String volumeUuid = volumeObjectTO.getUuid();
+            DeltaMergeTreeTO deltaMergeTreeTO = kbossTO.getDeltaMergeTreeTO();
+            volumeObjectTO.setPath(kbossTO.getDeltaPathOnPrimary());
+
+            if (deltaMergeTreeTO != null) {
+                List<String> snapshotDataStoreVos = 
kbossTO.getVmSnapshotDeltaPaths();
+                mergeBackupDelta(resource, deltaMergeTreeTO, volumeObjectTO, 
vmName, runningVM, volumeUuid, snapshotDataStoreVos.isEmpty());
+            }
+
+            if (command.isEndChain() || command.isIsolated()) {
+                String baseVolumePath = currentVolumePath;
+                if (deltaMergeTreeTO != null && 
deltaMergeTreeTO.getChild().getPath().equals(baseVolumePath)) {
+                    baseVolumePath = deltaMergeTreeTO.getParent().getPath();
+                }
+                endChainForVolume(resource, volumeObjectTO, vmName, runningVM, 
volumeUuid, baseVolumePath);
+                mapVolumeUuidToNewVolumePath.put(volumeUuid, baseVolumePath);
+            } else {
+                mapVolumeUuidToNewVolumePath.put(volumeUuid, 
kbossTO.getDeltaPathOnPrimary());
+            }
+        }
+    }
+
+    /**
+     * Copy the backup delta to the secondary storage. Since we created a 
snapshot on top of the volume, the volume is now the backup delta.
+     * If there were snapshots created after the last backup, they'll be 
copied alongside and merged in the secondary storage.
+     * */
+    protected Pair<String, Long> 
copyBackupDeltaToSecondary(KVMStoragePoolManager storagePoolManager, KbossTO 
kbossTO, List<String> chainImageStoreUrls, String imageStoreUrl,
+            int waitInMillis) {
+        VolumeObjectTO delta = kbossTO.getVolumeObjectTO();
+        String parentDeltaPathOnSecondary = 
kbossTO.getPathBackupParentOnSecondary();
+        List<String> deltaPathsToCopy = kbossTO.getVmSnapshotDeltaPaths();
+        deltaPathsToCopy.add(delta.getPath());
+
+        KVMStoragePool parentImagePool = null;
+        List<KVMStoragePool> chainImagePools = null;
+        KVMStoragePool imagePool = null;
+        long backupSize;
+        final String backupOnSecondary = kbossTO.getDeltaPathOnSecondary();
+        ArrayList<String> temporaryDeltasToRemove = new ArrayList<>();
+        boolean result = false;
+        try {
+            imagePool = storagePoolManager.getStoragePoolByURI(imageStoreUrl);
+            if (chainImageStoreUrls != null) {
+                parentImagePool = 
storagePoolManager.getStoragePoolByURI(chainImageStoreUrls.get(0));
+                chainImagePools = chainImageStoreUrls.subList(1, 
chainImageStoreUrls.size()).stream().map(storagePoolManager::getStoragePoolByURI).collect(Collectors.toList());
+            }
+
+            PrimaryDataStoreTO primaryDataStoreTO = (PrimaryDataStoreTO) 
delta.getDataStore();
+            KVMStoragePool primaryPool = 
storagePoolManager.getStoragePool(primaryDataStoreTO.getPoolType(), 
primaryDataStoreTO.getUuid());
+
+            String topDelta = backupOnSecondary;
+            while (!deltaPathsToCopy.isEmpty()) {
+                String backupDeltaFullPathOnSecondary = 
imagePool.getLocalPathFor(topDelta);
+                temporaryDeltasToRemove.add(backupDeltaFullPathOnSecondary);
+                String parentBackupFullPath = null;
+
+                if (parentDeltaPathOnSecondary != null) {
+                    parentBackupFullPath = 
parentImagePool.getLocalPathFor(parentDeltaPathOnSecondary);
+                }
+
+                String backupDeltaFullPathOnPrimary = 
primaryPool.getLocalPathFor(deltaPathsToCopy.remove(0));
+                convertDeltaToSecondary(backupDeltaFullPathOnPrimary, 
backupDeltaFullPathOnSecondary, parentBackupFullPath, delta.getUuid(), 
waitInMillis);
+
+                if (!deltaPathsToCopy.isEmpty()) {
+                    parentDeltaPathOnSecondary = topDelta;
+                    topDelta = 
getRelativePathOnSecondaryForBackup(delta.getAccountId(), delta.getVolumeId(), 
UUID.randomUUID().toString());
+                    parentImagePool = imagePool;
+                }
+            }
+
+            String backupOnSecondaryFullPath = 
imagePool.getLocalPathFor(backupOnSecondary);
+
+            commitTopDeltaOnBaseBackupOnSecondaryIfNeeded(topDelta, 
backupOnSecondary, imagePool, backupOnSecondaryFullPath, waitInMillis);
+
+            backupSize = Files.size(Path.of(backupOnSecondaryFullPath));
+            result = true;
+        } catch (LibvirtException | QemuImgException | IOException e) {
+            logger.error("Exception while converting backup [{}] to secondary 
storage [{}] due to: [{}].", delta.getPath(), imagePool, e.getMessage(), e);
+            throw new BackupException("Exception while converting backup to 
secondary storage.", e, true);
+        } finally {
+            removeTemporaryDeltas(temporaryDeltasToRemove, result);
+
+            if (parentImagePool != null) {
+                
storagePoolManager.deleteStoragePool(parentImagePool.getType(), 
parentImagePool.getUuid());
+            }
+            if (chainImagePools != null) {
+                chainImagePools.forEach(pool -> 
storagePoolManager.deleteStoragePool(pool.getType(), pool.getUuid()));
+            }
+            if (imagePool != null) {
+                storagePoolManager.deleteStoragePool(imagePool.getType(), 
imagePool.getUuid());
+            }
+        }
+        return new Pair<>(backupOnSecondary, backupSize);
+    }
+
+    /**
+     * If there were VM snapshots created after the last backup, we will have 
copied them alongside the backup delta. If this is the case, we will commit all 
of them into a single
+     * base file so that we are left with one file per volume per backup.
+     * */
+    protected void commitTopDeltaOnBaseBackupOnSecondaryIfNeeded(String 
topDelta, String backupOnSecondary, KVMStoragePool imagePool, String 
backupOnSecondaryFullPath,
+            int waitInMillis) throws LibvirtException, QemuImgException {
+        if (topDelta.equals(backupOnSecondary)) {
+            return;
+        }
+
+        QemuImg qemuImg = new QemuImg(waitInMillis);
+        QemuImgFile topDeltaImg = new 
QemuImgFile(imagePool.getLocalPathFor(topDelta), 
QemuImg.PhysicalDiskFormat.QCOW2);
+        QemuImgFile baseDeltaImg = new QemuImgFile(backupOnSecondaryFullPath, 
QemuImg.PhysicalDiskFormat.QCOW2);
+
+        logger.debug("Committing top delta [{}] on base delta [{}].", 
topDeltaImg, baseDeltaImg);
+        qemuImg.commit(topDeltaImg, baseDeltaImg, true);
+    }
+
+    /**
+     * Will remove any temporary deltas created on secondary storage. If 
result is true, this means that the backup was a success and the first 
"temporary delta" is our backup, so
+     * it will not be removed.
+     * <br/>
+     * There are two uses for this method:<br/>
+     * - If we fail to backup we have to clean up the secondary storage.<br/>
+     * - If we had VM snapshots created after the last backup, we copied 
multiple files to secondary storage, and thus we have to clean them up after 
merging them.
+     * */
+    protected void removeTemporaryDeltas(List<String> temporaryDeltasToRemove, 
boolean result) {
+        if (result) {
+            temporaryDeltasToRemove.remove(0);
+        }
+        logger.debug("Removing temporary deltas [{}].", 
temporaryDeltasToRemove);

Review Comment:
   Since `temporaryDeltasToRemove` is a list, double square brackets are logged 
here:
   
   ```
   2026-06-11 16:23:54,972 DEBUG 
[resource.wrapper.LibvirtTakeKbossBackupCommandWrapper] 
(AgentRequest-Handler-3:[]) (logid:e836e939) Removing temporary deltas 
[[/mnt/ca6a1a22-9c32-30b7-bcd8-2306a91be230/backups/2/14/00bde721-8fd7-4043-bbe9-f53eeee517a1]].
   ```
   
   ```suggestion
           logger.debug("Removing temporary deltas {}.", 
temporaryDeltasToRemove);
   ```



##########
plugins/backup/kboss/src/main/java/org/apache/cloudstack/backup/KbossBackupProvider.java:
##########
@@ -0,0 +1,2916 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.cloudstack.backup;
+
+import com.cloud.agent.AgentManager;
+import com.cloud.agent.api.Answer;
+import com.cloud.agent.api.Command;
+import com.cloud.agent.api.storage.MergeDiskOnlyVmSnapshotCommand;
+import com.cloud.agent.api.to.DataStoreTO;
+import com.cloud.agent.api.to.DataTO;
+import com.cloud.agent.api.to.VirtualMachineTO;
+import com.cloud.agent.manager.Commands;
+import com.cloud.alert.AlertManager;
+import com.cloud.exception.AgentUnavailableException;
+import com.cloud.exception.InsufficientCapacityException;
+import com.cloud.exception.InvalidParameterValueException;
+import com.cloud.exception.OperationTimedoutException;
+import com.cloud.exception.ResourceAllocationException;
+import com.cloud.exception.ResourceUnavailableException;
+import com.cloud.host.HostVO;
+import com.cloud.host.Status;
+import com.cloud.host.dao.HostDao;
+import com.cloud.hypervisor.Hypervisor;
+import com.cloud.hypervisor.HypervisorGuru;
+import com.cloud.hypervisor.HypervisorGuruManager;
+import com.cloud.resource.ResourceState;
+import com.cloud.storage.DataStoreRole;
+import com.cloud.storage.DiskOfferingVO;
+import com.cloud.storage.Storage;
+import com.cloud.storage.Volume;
+import com.cloud.storage.VolumeApiService;
+import com.cloud.storage.VolumeApiServiceImpl;
+import com.cloud.storage.VolumeVO;
+import com.cloud.storage.dao.DiskOfferingDao;
+import com.cloud.storage.dao.VolumeDao;
+import com.cloud.uservm.UserVm;
+import com.cloud.utils.DateUtil;
+import com.cloud.utils.Pair;
+import com.cloud.utils.Predicate;
+import com.cloud.utils.component.AdapterBase;
+import com.cloud.utils.db.EntityManager;
+import com.cloud.utils.db.Transaction;
+import com.cloud.utils.db.TransactionCallback;
+import com.cloud.utils.db.TransactionLegacy;
+import com.cloud.utils.exception.BackupException;
+import com.cloud.utils.exception.BackupProviderException;
+import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.fsm.NoTransitionException;
+import com.cloud.vm.NicVO;
+import com.cloud.vm.UserVmManager;
+import com.cloud.vm.UserVmVO;
+import com.cloud.vm.VMInstanceDetailVO;
+import com.cloud.vm.VirtualMachine;
+import com.cloud.vm.VirtualMachineManager;
+import com.cloud.vm.VirtualMachineManagerImpl;
+import com.cloud.vm.VirtualMachineProfileImpl;
+import com.cloud.vm.VmDetailConstants;
+import com.cloud.vm.VmWork;
+import com.cloud.vm.VmWorkConstants;
+import com.cloud.vm.VmWorkDeleteBackup;
+import com.cloud.vm.VmWorkRestoreBackup;
+import com.cloud.vm.VmWorkRestoreVolumeBackupAndAttach;
+import com.cloud.vm.VmWorkSerializer;
+import com.cloud.vm.VmWorkTakeBackup;
+import com.cloud.vm.dao.NicDao;
+import com.cloud.vm.dao.UserVmDao;
+import com.cloud.vm.dao.VMInstanceDetailsDao;
+import com.cloud.vm.snapshot.VMSnapshot;
+import com.cloud.vm.snapshot.VMSnapshotDetailsVO;
+import com.cloud.vm.snapshot.VMSnapshotVO;
+import com.cloud.vm.snapshot.dao.VMSnapshotDao;
+import com.cloud.vm.snapshot.dao.VMSnapshotDetailsDao;
+import org.apache.cloudstack.alert.AlertService;
+import org.apache.cloudstack.api.ApiConstants;
+import org.apache.cloudstack.api.command.user.vm.DestroyVMCmd;
+import org.apache.cloudstack.backup.dao.BackupDao;
+import org.apache.cloudstack.backup.dao.BackupDetailsDao;
+import org.apache.cloudstack.backup.dao.BackupOfferingDao;
+import org.apache.cloudstack.backup.dao.BackupOfferingDetailsDao;
+import org.apache.cloudstack.backup.dao.InternalBackupDataStoreDao;
+import org.apache.cloudstack.backup.dao.InternalBackupJoinDao;
+import org.apache.cloudstack.backup.dao.InternalBackupServiceJobDao;
+import org.apache.cloudstack.backup.dao.InternalBackupStoragePoolDao;
+import org.apache.cloudstack.context.CallContext;
+import 
org.apache.cloudstack.engine.orchestration.service.VolumeOrchestrationService;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStore;
+import org.apache.cloudstack.engine.subsystem.api.storage.DataStoreManager;
+import org.apache.cloudstack.engine.subsystem.api.storage.EndPoint;
+import org.apache.cloudstack.engine.subsystem.api.storage.EndPointSelector;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeDataFactory;
+import org.apache.cloudstack.engine.subsystem.api.storage.VolumeInfo;
+import org.apache.cloudstack.framework.config.ConfigKey;
+import org.apache.cloudstack.framework.config.Configurable;
+import org.apache.cloudstack.framework.jobs.AsyncJob;
+import org.apache.cloudstack.framework.jobs.AsyncJobExecutionContext;
+import org.apache.cloudstack.framework.jobs.AsyncJobManager;
+import org.apache.cloudstack.framework.jobs.Outcome;
+import org.apache.cloudstack.framework.jobs.impl.AsyncJobVO;
+import org.apache.cloudstack.framework.jobs.impl.OutcomeImpl;
+import org.apache.cloudstack.framework.jobs.impl.VmWorkJobVO;
+import org.apache.cloudstack.jobs.JobInfo;
+import org.apache.cloudstack.secstorage.heuristics.HeuristicType;
+import org.apache.cloudstack.storage.command.BackupDeleteAnswer;
+import org.apache.cloudstack.storage.command.DeleteCommand;
+import org.apache.cloudstack.storage.datastore.db.ImageStoreDao;
+import org.apache.cloudstack.storage.datastore.db.ImageStoreVO;
+import org.apache.cloudstack.storage.datastore.db.PrimaryDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreDao;
+import org.apache.cloudstack.storage.datastore.db.SnapshotDataStoreVO;
+import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
+import org.apache.cloudstack.storage.heuristics.HeuristicRuleHelper;
+import org.apache.cloudstack.storage.to.BackupDeltaTO;
+import org.apache.cloudstack.storage.to.DeltaMergeTreeTO;
+import org.apache.cloudstack.storage.to.KbossTO;
+import org.apache.cloudstack.storage.to.VolumeObjectTO;
+import org.apache.cloudstack.storage.vmsnapshot.VMSnapshotHelper;
+import org.apache.cloudstack.storage.volume.VolumeObject;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static org.apache.cloudstack.backup.dao.BackupDetailsDao.BACKUP_HASH;
+import static org.apache.cloudstack.backup.dao.BackupDetailsDao.CURRENT;
+import static org.apache.cloudstack.backup.dao.BackupDetailsDao.END_OF_CHAIN;
+import static org.apache.cloudstack.backup.dao.BackupDetailsDao.IMAGE_STORE_ID;
+import static org.apache.cloudstack.backup.dao.BackupDetailsDao.ISOLATED;
+import static org.apache.cloudstack.backup.dao.BackupDetailsDao.PARENT_ID;
+import static 
org.apache.cloudstack.backup.dao.BackupDetailsDao.SCREENSHOT_PATH;
+
+public class KbossBackupProvider extends AdapterBase implements 
InternalBackupProvider, Configurable {
+    protected ConfigKey<Integer> backupChainSize = new ConfigKey<>("Advanced", 
Integer.class, "backup.chain.size", "8", "Determines the max size of a backup 
chain." +
+            " Currently only used by the KBOSS provider. If cloud admins set 
it to 1 , all the backups will be full backups. With values lower than 1, the 
backup chain will be " +
+            "unlimited, unless it is stopped by another process. Please note 
that unlimited backup chains have a higher chance of getting corrupted, as new 
backups will be" +
+            " dependant on all of the older ones.", true, 
ConfigKey.Scope.Zone);
+
+    protected ConfigKey<Integer> backupTimeout = new ConfigKey<>("Advanced", 
Integer.class, "kboss.timeout", "43200", "Timeout, in seconds, to execute KBOSS 
commands. After the " +
+            "command times out, the Management Server will still wait for 
another kboss.timeout seconds to receive a response from the Agent.", true, 
ConfigKey.Scope.Zone);
+
+    @Inject
+    private AsyncJobManager jobManager;
+    @Inject
+    private EntityManager entityManager;
+
+    @Inject
+    private VirtualMachineManager virtualMachineManager;
+
+    @Inject
+    private UserVmDao userVmDao;
+
+    @Inject
+    private VMInstanceDetailsDao vmInstanceDetailsDao;
+
+    @Inject
+    private VMSnapshotHelper vmSnapshotHelper;
+
+    @Inject
+    private SnapshotDataStoreDao snapshotDataStoreDao;
+
+    @Inject
+    private VMSnapshotDao vmSnapshotDao;
+
+    @Inject
+    private VMSnapshotDetailsDao vmSnapshotDetailsDao;
+
+    @Inject
+    private BackupDao backupDao;
+
+    @Inject
+    private InternalBackupJoinDao internalBackupJoinDao;
+
+    @Inject
+    private BackupDetailsDao backupDetailDao;
+
+    @Inject
+    private InternalBackupStoragePoolDao internalBackupStoragePoolDao;
+
+    @Inject
+    private InternalBackupDataStoreDao internalBackupDataStoreDao;
+
+    @Inject
+    private BackupOfferingDao backupOfferingDao;
+
+    @Inject
+    private BackupOfferingDetailsDao backupOfferingDetailsDao;
+
+    @Inject
+    private HeuristicRuleHelper heuristicRuleHelper;
+
+    @Inject
+    private DataStoreManager dataStoreManager;
+
+    @Inject
+    private AgentManager agentManager;
+
+    @Inject
+    private EndPointSelector endPointSelector;
+
+    @Inject
+    private VolumeDao volumeDao;
+
+    @Inject
+    private ImageStoreDao imageStoreDao;
+
+    @Inject
+    private VolumeApiService volumeApiService;
+
+    @Inject
+    private PrimaryDataStoreDao storagePoolDao;
+
+    @Inject
+    private HostDao hostDao;
+
+    @Inject
+    private UserVmManager userVmManager;
+
+    @Inject
+    private VolumeOrchestrationService volumeOrchestrationService;
+
+    @Inject
+    private VolumeDataFactory volumeDataFactory;
+    @Inject
+    private InternalBackupServiceJobDao internalBackupServiceJobDao;
+
+    @Inject
+    private BackupManager backupManager;
+
+    @Inject
+    private DiskOfferingDao diskOfferingDao;
+
+    @Inject
+    private HypervisorGuruManager hypervisorGuruManager;
+
+    @Inject
+    private NicDao nicDao;
+
+    @Inject
+    private AlertManager alertManager;
+
+    protected final List<Backup.Status> validChildStatesToRemoveBackup = 
List.of(Backup.Status.Expunged, Backup.Status.Error, Backup.Status.Failed);
+
+    private final List<Storage.StoragePoolType> supportedStoragePoolTypes = 
List.of(Storage.StoragePoolType.Filesystem, 
Storage.StoragePoolType.NetworkFilesystem,
+            Storage.StoragePoolType.SharedMountPoint);
+
+    private final List<Backup.Status> allowedBackupStatesToRemove = 
List.of(Backup.Status.BackedUp, Backup.Status.Failed, Backup.Status.Error);
+
+    private final List<Backup.Status> allowedBackupStatesToCompress = 
List.of(Backup.Status.BackedUp, Backup.Status.Restoring);
+
+    private final List<Backup.Status> allowedBackupStatesToValidate = 
List.of(Backup.Status.BackedUp, Backup.Status.Restoring);
+
+    private final List<VirtualMachine.State> allowedVmStates = 
Arrays.asList(VirtualMachine.State.Running, VirtualMachine.State.Stopped);
+    @Override
+    public String getDescription() {
+        return "Native Incremental KVM Backup Plugin";
+    }
+
+    @Override
+    public List<BackupOffering> listBackupOfferings(Long zoneId) {
+        return List.of();
+    }
+
+    @Override
+    public boolean isValidProviderOffering(Long zoneId, String uuid) {
+        return true;
+    }
+
+    @Override
+    public boolean assignVMToBackupOffering(VirtualMachine vm, BackupOffering 
backupOffering) {
+        logger.debug("Assigning VM [{}] to KBOSS backup offering with 
name:[{}], uuid: [{}].", vm.getUuid(), backupOffering.getName(), 
backupOffering.getUuid());
+        if (!Hypervisor.HypervisorType.KVM.equals(vm.getHypervisorType())) {
+            logger.error("KVM Native Incremental Backup provider is only 
supported for KVM.");
+            return false;
+        }
+
+        for (VMSnapshotVO vmSnapshotVO : 
vmSnapshotDao.findByVmAndByType(vm.getId(), VMSnapshot.Type.Disk)) {
+            List<VMSnapshotDetailsVO> vmSnapshotDetails = 
vmSnapshotDetailsDao.listDetails(vmSnapshotVO.getId());
+            if (!vmSnapshotDetails.stream().allMatch(vmSnapshotDetailsVO -> 
vmSnapshotDetailsVO.getName().equals(VolumeApiServiceImpl.KVM_FILE_BASED_STORAGE_SNAPSHOT)))
 {
+                logger.error("KBOSS is only supported with disk-only VM 
snapshots using [{}] strategy. Found a disk-only VM snapshot using another 
strategy for the VM.",
+                        VolumeApiServiceImpl.KVM_FILE_BASED_STORAGE_SNAPSHOT);
+                logger.debug("Found VM snapshot details [{}].", () -> 
vmSnapshotDetails.stream().map(VMSnapshotDetailsVO::getName).collect(Collectors.toList()));
+                return false;
+            }
+        }
+
+        return 
CollectionUtils.isEmpty(vmSnapshotDao.findByVmAndByType(vm.getId(), 
VMSnapshot.Type.DiskAndMemory));
+    }
+
+    @Override
+    public boolean removeVMFromBackupOffering(VirtualMachine vm) {
+        logger.info("Removing VM [{}] from KBOSS backup offering.", 
vm.getUuid());
+
+        validateVmState(vm, "remove backup offering", 
VirtualMachine.State.Expunging, VirtualMachine.State.Destroyed);
+        if (endBackupChain(vm)) {
+            return true;
+        }
+        UserVmVO vmVO = userVmDao.findById(vm.getId());
+        logger.error("Failed to merge deltas for VM [{}] during backup 
offering removal process. Changing its state to [{}].", vm, 
VirtualMachine.State.BackupError);
+        vmInstanceDetailsDao.addDetail(vm.getId(), 
ApiConstants.LAST_KNOWN_STATE, vmVO.getState().name(), false);
+        vmVO.setState(VirtualMachine.State.BackupError);
+        userVmDao.update(vmVO.getId(), vmVO);
+
+        return false;
+    }
+
+    @Override
+    public boolean willDeleteBackupsOnOfferingRemoval() {
+        return false;
+    }
+
+    @Override
+    public Pair<Boolean, Backup> takeBackup(VirtualMachine vm, Boolean 
quiesceVm, boolean isolated) {
+        logger.debug("Queueing backup on VM [{}].", vm.getUuid());
+        Outcome<?> outcome = createBackupThroughJobQueue(vm, 
ObjectUtils.defaultIfNull(quiesceVm, false), isolated);
+
+        try {
+            outcome.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CloudRuntimeException(String.format("Unable to retrieve 
result from job takeBackup due to [%s]. VM [%s].", e.getMessage(), 
vm.getUuid()), e);
+        }
+
+        Object jobResult = jobManager.unmarshallResultObject(outcome.getJob());
+
+        if (jobResult instanceof BackupProviderException) {
+            throw (BackupProviderException) jobResult;
+        } else if (jobResult instanceof Throwable) {
+            throw new CloudRuntimeException(String.format("Exception while 
taking KVM native incremental backup for VM [%s]. Check the logs for more 
information.", vm.getUuid()));
+        }
+
+        Pair<Boolean, Long> result = (Pair<Boolean, Long>)jobResult;
+        Pair<Boolean, Backup> returnValue = new Pair<>(result.first(), null);
+        if (result.first()) {
+            returnValue.second(backupDao.findById(result.second()));
+        }
+        return returnValue;
+    }
+
+    @Override
+    public Pair<Boolean, Long> orchestrateTakeBackup(Backup backup, boolean 
quiesceVm, boolean isolated) {
+        BackupVO backupVO = (BackupVO) backup;
+        long vmId = backup.getVmId();
+        VirtualMachine userVm = virtualMachineManager.findById(vmId);
+        Long hostId = vmSnapshotHelper.pickRunningHost(vmId);
+        HostVO hostVO = hostDao.findById(hostId);
+
+        if (hostVO.getStatus() != Status.Up || hostVO.getResourceState() != 
ResourceState.Enabled) {
+            backupVO.setStatus(Backup.Status.Failed);
+            backupDao.update(backupVO.getId(), backupVO);
+
+            logger.error("No available host found to create backup [{}] of VM 
[{}]. Setting the backup as Failed.", backupVO.getUuid(), userVm.getUuid());
+            return new Pair<>(Boolean.FALSE,  backup.getId());
+        }
+
+        List<VolumeObjectTO> volumeTOs;
+        try {
+            validateVmState(userVm, "take backup");
+            volumeTOs = vmSnapshotHelper.getVolumeTOList(userVm.getId());
+            validateStorages(volumeTOs, userVm.getUuid());
+        } catch (Exception e) {
+            backupVO.setStatus(Backup.Status.Failed);
+            backupDao.update(backupVO.getId(), backupVO);
+            throw e;
+        }
+
+        logger.info("Starting VM backup process for VM [{}].", 
userVm.getUuid());
+
+        BackupOfferingVO backupOfferingVO = 
backupOfferingDao.findByIdIncludingRemoved(backup.getBackupOfferingId());
+
+        backupVO.setDate(new Date());
+        List<InternalBackupJoinVO> backupChain = 
getBackupJoinParents(backupVO, true);
+        InternalBackupJoinVO parentBackup = null;
+        if (isolated) {
+            setBackupAsIsolated(backupVO);
+        } else {
+            parentBackup = getParentAndSetEndOfChain(backupVO, backupChain, 
backupOfferingVO);
+        }
+        InternalBackupJoinVO newBackupJoin = 
internalBackupJoinDao.findById(backup.getId());
+        boolean fullBackup = parentBackup == null;
+        List<InternalBackupStoragePoolVO> parentBackupDeltasOnPrimary = new 
ArrayList<>();
+        List<InternalBackupDataStoreVO> parentBackupDeltasOnSecondary = new 
ArrayList<>();
+        List<String> chainImageStoreUrls = null;
+        List<KbossTO> kbossTOS = new ArrayList<>();
+        HashMap<String, InternalBackupStoragePoolVO> 
volumeUuidToDeltaPrimaryRef = new HashMap<>();
+        HashMap<String, InternalBackupDataStoreVO> 
volumeUuidToDeltaSecondaryRef = new HashMap<>();
+
+        if (!fullBackup) {
+            parentBackupDeltasOnPrimary = 
internalBackupStoragePoolDao.listByBackupId(parentBackup.getId());
+            parentBackupDeltasOnSecondary = 
internalBackupDataStoreDao.listByBackupId(parentBackup.getId());
+
+            chainImageStoreUrls = getChainImageStoreUrls(backupChain);
+        }
+
+        boolean runningVm = userVm.getState() == VirtualMachine.State.Running;
+        transitVmStateWithoutThrow(userVm, 
VirtualMachine.Event.BackupRequested, hostId);
+        updateBackupStatusToBackingUp(volumeTOs, backupVO);
+
+        DataStore imageStore = 
getImageStoreForBackup(userVm.getDataCenterId(), backupVO);
+        createBasicBackupDetails(imageStore.getId(), fullBackup ? 0L : 
parentBackup.getId(), backupVO);
+
+        List<VMSnapshotVO> succeedingVmSnapshotList = 
getSucceedingVmSnapshotList(parentBackup);
+        VMSnapshotVO succeedingVmSnapshot = succeedingVmSnapshotList.isEmpty() 
? null : succeedingVmSnapshotList.get(0);
+
+        Map<Long, List<SnapshotDataStoreVO>> volumeIdToSnapshotDataStoreList = 
mapVolumesToVmSnapshotReferences(volumeTOs, succeedingVmSnapshotList);
+        for (VolumeObjectTO volumeObjectTO : volumeTOs) {
+            KbossTO kbossTO = new KbossTO(volumeObjectTO, 
volumeIdToSnapshotDataStoreList.getOrDefault(volumeObjectTO.getId(), new 
ArrayList<>()));
+            kbossTOS.add(kbossTO);
+            createDeltaReferences(fullBackup, 
!succeedingVmSnapshotList.isEmpty(), runningVm, backup, 
parentBackupDeltasOnSecondary,
+                    parentBackupDeltasOnPrimary, volumeUuidToDeltaPrimaryRef, 
volumeUuidToDeltaSecondaryRef, succeedingVmSnapshot, kbossTO);
+        }
+
+        TakeKbossBackupCommand command = new TakeKbossBackupCommand(quiesceVm, 
runningVm, newBackupJoin.getEndOfChain(), userVm.getInstanceName(), 
imageStore.getUri(),
+                chainImageStoreUrls, kbossTOS, isolated);
+
+        Answer answer = sendBackupCommand(hostId, command);
+
+        if (answer == null || !answer.getResult()) {
+            processBackupFailure(answer, userVm, hostId, runningVm, backupVO);
+            return new Pair<>(Boolean.FALSE, null);
+        }
+
+        processBackupSuccess(runningVm, volumeTOs, 
volumeUuidToDeltaPrimaryRef, volumeUuidToDeltaSecondaryRef, 
(TakeKbossBackupAnswer)answer, parentBackupDeltasOnPrimary,
+                succeedingVmSnapshotList, backupVO, fullBackup, userVm, 
hostId, newBackupJoin.getEndOfChain(), isolated);
+
+        if (!isolated) {
+            updateCurrentBackup(newBackupJoin);
+        }
+
+        if (offeringSupportsCompression(newBackupJoin)) {
+            compressBackupAsync(newBackupJoin, backup.getZoneId(), 
userVm.getAccountId());
+        } else {
+            validateBackupAsyncIfHasOfferingSupport(newBackupJoin, 
backup.getZoneId(), userVm.getAccountId());
+        }
+        return new Pair<>(Boolean.TRUE, backupVO.getId());
+    }
+
+    @Override
+    public boolean deleteBackup(Backup backup, boolean forced) {
+        logger.debug("Queueing backup [{}] deletion.", backup.getUuid());
+        Outcome<Boolean> outcome = deleteBackupThroughJobQueue(backup, forced);
+
+        try {
+            outcome.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CloudRuntimeException(String.format("Unable to retrieve 
result from job deleteBackup due to [%s]. Backup [%s].", e.getMessage(), 
backup.getUuid()), e);
+        }
+
+        Object jobResult = jobManager.unmarshallResultObject(outcome.getJob());
+
+        if (jobResult instanceof Throwable) {
+            if (jobResult instanceof BackupProviderException) {
+                throw (BackupProviderException) jobResult;
+            }
+            throw new CloudRuntimeException(String.format("Exception while 
deleting KVM native incremental backup [%s]. Check the logs for more 
information.", backup.getUuid()));
+        }
+
+        return BooleanUtils.isTrue((Boolean) jobResult);
+    }
+
+    @Override
+    public Boolean orchestrateDeleteBackup(Backup backup, boolean forced) {
+        BackupVO backupVO = (BackupVO) backup;
+
+        VirtualMachine virtualMachine = 
virtualMachineManager.findById(backup.getVmId());
+
+        if (virtualMachine != null) {
+            validateVmState(virtualMachine, "delete backup", 
VirtualMachine.State.Destroyed);
+        }
+
+        logger.info("Starting delete process for backup [{}].", backupVO);
+
+        if (!validateBackupStateForRemoval(backupVO.getId())) {
+            return false;
+        }
+
+        checkErrorBackup(backupVO, virtualMachine);
+        if (deleteFailedBackup(backupVO)) {
+            return true;
+        }
+
+        InternalBackupJoinVO childBackup = 
internalBackupJoinDao.findByParentId(backup.getId());
+
+        if (childBackup != null && 
!validChildStatesToRemoveBackup.contains(childBackup.getStatus())) {
+            logger.debug("Backup [{}] has children that are not in one of the 
following states [{}]; will mark it as removed on the database but the files 
will not be deleted " +
+                    "from secondary storage until the children are also 
expunged.", backup.getUuid(), validChildStatesToRemoveBackup);
+            backupVO.setStatus(Backup.Status.Removed);
+            backupDao.update(backupVO.getId(), backupVO);
+            return true;
+        }
+
+        InternalBackupJoinVO backupJoinVO = 
internalBackupJoinDao.findById(backup.getId());
+        if (backupJoinVO.getCurrent()) {
+            if (!mergeCurrentBackupDeltas(backupJoinVO)) {
+                return false;
+            }
+            InternalBackupJoinVO parent = 
internalBackupJoinDao.findById(backupJoinVO.getParentId());
+            if (parent != null && parent.getStatus() == 
Backup.Status.BackedUp) {
+                backupDetailDao.persist(new BackupDetailVO(parent.getId(), 
END_OF_CHAIN, Boolean.TRUE.toString(), false));
+            }
+        }
+
+        Commands deleteCommands = new Commands(Command.OnError.Continue);
+
+        DataStore dataStore = addBackupDeltasToDeleteCommand(backup.getId(), 
deleteCommands);
+        Pair<List<InternalBackupJoinVO>, InternalBackupJoinVO> 
backupParentsToBeRemovedAndLastAliveBackup = 
getParentsToBeExpungedWithBackupAndAddThemToListOfDeleteCommands(backupVO,
+                deleteCommands);
+
+        EndPoint endPoint = endPointSelector.select(dataStore);
+        if (endPoint == null) {
+            logger.error("Unable to find SSVM to delete backup [{}]. Check if 
SSVM is up for the zone.", backup);
+            throw new CloudRuntimeException(String.format("Unable to delete 
backup [%s]. Please check the logs.", backup.getUuid()));
+        }
+        Answer[] deleteAnswers;
+        try {
+            deleteAnswers = sendBackupCommands(endPoint.getId(), 
deleteCommands);
+        } catch (AgentUnavailableException | OperationTimedoutException e) {
+            throw new CloudRuntimeException(e);
+        }
+
+        List<Long> removedBackupIds = 
backupParentsToBeRemovedAndLastAliveBackup.first().stream().map(InternalBackupJoinVO::getId).collect(Collectors.toList());
+        removedBackupIds.add(backup.getId());
+
+        boolean isFailedSetEmpty = processRemoveBackupFailures(forced, 
deleteAnswers, removedBackupIds, backupJoinVO);
+
+        processRemovedBackups(removedBackupIds);
+
+        if (backupParentsToBeRemovedAndLastAliveBackup.second() != null) {
+            backupDetailDao.persist(new 
BackupDetailVO(backupParentsToBeRemovedAndLastAliveBackup.second().getId(), 
END_OF_CHAIN, Boolean.TRUE.toString(), false));
+        }
+
+        return isFailedSetEmpty;
+    }
+
+    @Override
+    public boolean restoreVMFromBackup(VirtualMachine vm, Backup backup, 
boolean quickRestore, Long hostId) {
+        logger.debug("Queueing backup [{}] restore for VM [{}].", 
backup.getUuid(), vm.getUuid());
+        validateQuickRestore(backup, quickRestore);
+
+        Outcome<Boolean> outcome = restoreVMFromBackupThroughJobQueue(vm, 
backup, quickRestore, hostId);
+
+        try {
+            outcome.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new CloudRuntimeException(String.format("Unable to retrieve 
result from job restoreVMFromBackup due to [%s]. Backup [%s].", e.getMessage(), 
backup.getUuid()), e);
+        } finally {
+            BackupVO backupVO = backupDao.findById(backup.getId());
+            backupVO.setStatus(Backup.Status.BackedUp);
+            backupDao.update(backupVO.getId(), backupVO);
+        }
+
+        Object jobResult = jobManager.unmarshallResultObject(outcome.getJob());
+
+        handleRestoreException(backup, vm, jobResult);
+
+        return BooleanUtils.isTrue((Boolean) jobResult);
+    }
+
+    @Override
+    public Boolean orchestrateRestoreVMFromBackup(Backup backup, 
VirtualMachine vm, boolean quickRestore, Long hostId, boolean sameVmAsBackup) {
+        logger.info("Starting restore backup process for VM [{}] and backup 
[{}].", vm.getUuid(), backup);

Review Comment:
   It is currently possible to create a new VM from a backup with quick restore 
even when the backup offering does not allow quick restore. It would be great 
if we added a validation in this workflow



##########
ui/src/views/compute/backup/BackupSchedule.vue:
##########
@@ -157,6 +160,11 @@ export default {
           title: this.$t('label.timezone'),
           dataIndex: 'timezone'
         },
+        {
+          key: 'isolated',
+          title: this.$t('label.isolated'),
+          dataIndex: 'isolated'
+        },

Review Comment:
   Here, for portuguese, it is displayed `Isolada`. I believe `Isolado` would 
fit better in this context 
   
   <img width="677" height="563" alt="Image" 
src="https://github.com/user-attachments/assets/1368a150-beb4-408c-bbdf-26c515dd701b";
 />
   
   <img width="665" height="330" alt="Image" 
src="https://github.com/user-attachments/assets/c2d50583-58d3-411d-9f73-6b91a5e870f2";
 />



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to