On 4/12/20 6:37 PM, Andres Freund wrote:
Hi,

On 2020-04-12 17:57:05 -0400, David Steele wrote:
On 4/12/20 3:17 PM, Andres Freund wrote:
[proposal outline[

This is pretty much what pgBackRest does. We call them "local" processes and
they do most of the work during backup/restore/archive-get/archive-push.

Hah. I swear, I didn't look.

I believe you. If you spend enough time thinking about this (and we've spent a lot) then I think this is is where you arrive.

The obvious problem with that proposal is that we don't want to
unnecessarily store the incoming data on the system pg_basebackup is
running on, just for the subcommand to get access to them. More on that
in a second.

We also implement "remote" processes so the local processes can get data
that doesn't happen to be local, i.e. on a remote PostgreSQL cluster.

What is the interface between those? I.e. do the files have to be
spooled as a whole locally?

Currently we use SSH to talk to a remote, but we are planning on using our own TLS servers in the future. We don't spool anything -- the file is streamed from the PostgreSQL server (via remote protocol if needed) to the repo (which could also be remote, e.g. S3) without spoolng to disk. We have buffers, of course, which are configurable with the buffer-size option.

There's various ways we could address the issue for how the subcommand
can access the file data. The most flexible probably would be to rely on
exchanging file descriptors between basebackup and the subprocess (these
days all supported platforms have that, I think).  Alternatively we
could invoke the subcommand before really starting the backup, and ask
how many files it'd like to receive in parallel, and restart the
subcommand with that number of file descriptors open.

We don't exchange FDs. Each local is responsible for getting the data from
PostgreSQL or the repo based on knowing the data source and a path. For
pg_basebackup, however, I'd imagine each local would want a replication
connection with the ability to request specific files that were passed to it
by the main process.

I don't like this much. It'll push more complexity into each of the
"targets" and we can't easily share that complexity. And also, needing
to request individual files will add a lot of back/forth, and thus
latency issues. The server would always have to pre-send a list of
files, we'd have to deal with those files vanishing, etc.

Sure, unless we had a standard interface to "get a file from the PostgreSQL cluster", which is what pgBackRest has via the storage interface.

Attached is our implementation for "backupFile". I think it's pretty concise considering what it does. Most of it is dedicated to checksum deltas and backup resume. The straight copy with filters starts at line 189.

[2] yes, I already hear json. A line deliminated format would have some
advantages though.

We use JSON, but each protocol request/response is linefeed-delimited. So
for example here's what it looks like when the main process requests a local
process to backup a specific file:

{"{"cmd":"backupFile","param":["base/32768/33001",true,65536,null,true,0,"pg_data/base/32768/33001",false,0,3,"20200412-213313F",false,null]}"}

And the local responds with:

{"{"out":[1,65536,65536,"6bf316f11d28c28914ea9be92c00de9bea6d9a6b",{"align":true,"error":[0,[3,5],7],"valid":false}]}"}

As long as it's line delimited, I don't really care :)

Agreed.

We are considering a move to HTTP since lots of services (e.g. S3, GCS,
Azure, etc.) require it (so we implement it) and we're not sure it makes
sense to maintain our own protocol format. That said, we'd still prefer to
use JSON for our payloads (like GCS) rather than XML (as S3 does).

I'm not quite sure what you mean here? You mean actual requests for each
of what currently are lines? If so, that sounds *terrible*.

I know it sounds like a lot, but in practice the local (currently) only performs four operations: backup file, restore file, push file to archive, get file from archive. In that context a little protocol overhead won't be noticed so if it means removing redundant code I'm all for it. That said, we have not done this yet -- it's just under consideration.

Regards,
--
-David
da...@pgmasters.net
/***********************************************************************************************************************************
Backup File
***********************************************************************************************************************************/
#include "build.auto.h"

#include <string.h>

#include "command/backup/file.h"
#include "command/backup/pageChecksum.h"
#include "common/crypto/cipherBlock.h"
#include "common/crypto/hash.h"
#include "common/debug.h"
#include "common/io/filter/group.h"
#include "common/io/filter/size.h"
#include "common/io/io.h"
#include "common/log.h"
#include "common/regExp.h"
#include "common/type/convert.h"
#include "postgres/interface.h"
#include "storage/helper.h"

/***********************************************************************************************************************************
Helper functions
***********************************************************************************************************************************/
static unsigned int
segmentNumber(const String *pgFile)
{
    FUNCTION_TEST_BEGIN();
        FUNCTION_TEST_PARAM(STRING, pgFile);
    FUNCTION_TEST_END();

    // Determine which segment number this is by checking for a numeric 
extension.  No extension means segment 0.
    FUNCTION_TEST_RETURN(regExpMatchOne(STRDEF("\\.[0-9]+$"), pgFile) ? 
cvtZToUInt(strrchr(strPtr(pgFile), '.') + 1) : 0);
}

/**********************************************************************************************************************************/
BackupFileResult
backupFile(
    const String *pgFile, bool pgFileIgnoreMissing, uint64_t pgFileSize, const 
String *pgFileChecksum, bool pgFileChecksumPage,
    uint64_t pgFileChecksumPageLsnLimit, const String *repoFile, bool 
repoFileHasReference, CompressType repoFileCompressType,
    int repoFileCompressLevel, const String *backupLabel, bool delta, 
CipherType cipherType, const String *cipherPass)
{
    FUNCTION_LOG_BEGIN(logLevelDebug);
        FUNCTION_LOG_PARAM(STRING, pgFile);                         // Database 
file to copy to the repo
        FUNCTION_LOG_PARAM(BOOL, pgFileIgnoreMissing);              // Is it OK 
if the database file is missing?
        FUNCTION_LOG_PARAM(UINT64, pgFileSize);                     // Size of 
the database file
        FUNCTION_LOG_PARAM(STRING, pgFileChecksum);                 // Checksum 
to verify the database file
        FUNCTION_LOG_PARAM(BOOL, pgFileChecksumPage);               // Should 
page checksums be validated
        FUNCTION_LOG_PARAM(UINT64, pgFileChecksumPageLsnLimit);     // Upper 
LSN limit to which page checksums must be valid
        FUNCTION_LOG_PARAM(STRING, repoFile);                       // 
Destination in the repo to copy the pg file
        FUNCTION_LOG_PARAM(BOOL, repoFileHasReference);             // Does the 
repo file exist in a prior backup in the set?
        FUNCTION_LOG_PARAM(ENUM, repoFileCompressType);             // Compress 
type for repo file
        FUNCTION_LOG_PARAM(INT,  repoFileCompressLevel);            // 
Compression level for repo file
        FUNCTION_LOG_PARAM(STRING, backupLabel);                    // Label of 
current backup
        FUNCTION_LOG_PARAM(BOOL, delta);                            // Is the 
delta option on?
        FUNCTION_LOG_PARAM(ENUM, cipherType);                       // 
Encryption type
        FUNCTION_TEST_PARAM(STRING, cipherPass);                    // Password 
to access the repo file if encrypted
    FUNCTION_LOG_END();

    ASSERT(pgFile != NULL);
    ASSERT(repoFile != NULL);
    ASSERT(backupLabel != NULL);
    ASSERT((cipherType == cipherTypeNone && cipherPass == NULL) || (cipherType 
!= cipherTypeNone && cipherPass != NULL));

    // Backup file results
    BackupFileResult result = {.backupCopyResult = backupCopyResultCopy};

    MEM_CONTEXT_TEMP_BEGIN()
    {
        // Generate complete repo path and add compression extension if needed
        const String *repoPathFile = strNewFmt(
            STORAGE_REPO_BACKUP "/%s/%s%s", strPtr(backupLabel), 
strPtr(repoFile), strPtr(compressExtStr(repoFileCompressType)));

        // If checksum is defined then the file needs to be checked. If delta 
option then check the DB and possibly the repo, else
        // just check the repo.
        if (pgFileChecksum != NULL)
        {
            // Does the file in pg match the checksum and size passed?
            bool pgFileMatch = false;

            // If delta, then check the DB checksum and possibly the repo. If 
the checksum does not match in either case then
            // recopy.
            if (delta)
            {
                // Generate checksum/size for the pg file. Only read as many 
bytes as passed in pgFileSize.  If the file has grown
                // since the manifest was built we don't need to consider the 
extra bytes since they will be replayed from WAL
                // during recovery.
                IoRead *read = storageReadIo(
                    storageNewReadP(storagePg(), pgFile, .ignoreMissing = 
pgFileIgnoreMissing, .limit = VARUINT64(pgFileSize)));
                ioFilterGroupAdd(ioReadFilterGroup(read), 
cryptoHashNew(HASH_TYPE_SHA1_STR));
                ioFilterGroupAdd(ioReadFilterGroup(read), ioSizeNew());

                // If the pg file exists check the checksum/size
                if (ioReadDrain(read))
                {
                    const String *pgTestChecksum = varStr(
                        ioFilterGroupResult(ioReadFilterGroup(read), 
CRYPTO_HASH_FILTER_TYPE_STR));
                    uint64_t pgTestSize = 
varUInt64Force(ioFilterGroupResult(ioReadFilterGroup(read), 
SIZE_FILTER_TYPE_STR));

                    // Does the pg file match?
                    if (pgFileSize == pgTestSize && strEq(pgFileChecksum, 
pgTestChecksum))
                    {
                        pgFileMatch = true;

                        // If it matches and is a reference to a previous 
backup then no need to copy the file
                        if (repoFileHasReference)
                        {
                            MEM_CONTEXT_PRIOR_BEGIN()
                            {
                                result.backupCopyResult = backupCopyResultNoOp;
                                result.copySize = pgTestSize;
                                result.copyChecksum = strDup(pgTestChecksum);
                            }
                            MEM_CONTEXT_PRIOR_END();
                        }
                    }
                }
                // Else the source file is missing from the database so skip 
this file
                else
                    result.backupCopyResult = backupCopyResultSkip;
            }

            // If this is not a delta backup or it is and the file exists and 
the checksum from the DB matches, then also test the
            // checksum of the file in the repo (unless it is in a prior 
backup) and if the checksum doesn't match, then there may
            // be corruption in the repo, so recopy
            if (!delta || !repoFileHasReference)
            {
                // If this is a delta backup and the file is missing from the 
DB, then remove it from the repo (backupManifestUpdate
                // will remove it from the manifest)
                if (result.backupCopyResult == backupCopyResultSkip)
                {
                    storageRemoveP(storageRepoWrite(), repoPathFile);
                }
                else if (!delta || pgFileMatch)
                {
                    // Check the repo file in a try block because on error 
(e.g. missing or corrupt file that can't be decrypted or
                    // decompressed) we should recopy rather than ending the 
backup.
                    TRY_BEGIN()
                    {
                        // Generate checksum/size for the repo file
                        IoRead *read = 
storageReadIo(storageNewReadP(storageRepo(), repoPathFile));

                        if (cipherType != cipherTypeNone)
                        {
                            ioFilterGroupAdd(
                                ioReadFilterGroup(read), 
cipherBlockNew(cipherModeDecrypt, cipherType, BUFSTR(cipherPass), NULL));
                        }

                        // Decompress the file if compressed
                        if (repoFileCompressType != compressTypeNone)
                            ioFilterGroupAdd(ioReadFilterGroup(read), 
decompressFilter(repoFileCompressType));

                        ioFilterGroupAdd(ioReadFilterGroup(read), 
cryptoHashNew(HASH_TYPE_SHA1_STR));
                        ioFilterGroupAdd(ioReadFilterGroup(read), ioSizeNew());

                        ioReadDrain(read);

                        // Test checksum/size
                        const String *pgTestChecksum = varStr(
                            ioFilterGroupResult(ioReadFilterGroup(read), 
CRYPTO_HASH_FILTER_TYPE_STR));
                        uint64_t pgTestSize = 
varUInt64Force(ioFilterGroupResult(ioReadFilterGroup(read), 
SIZE_FILTER_TYPE_STR));

                        // No need to recopy if checksum/size match
                        if (pgFileSize == pgTestSize && strEq(pgFileChecksum, 
pgTestChecksum))
                        {
                            MEM_CONTEXT_PRIOR_BEGIN()
                            {
                                result.backupCopyResult = 
backupCopyResultChecksum;
                                result.copySize = pgTestSize;
                                result.copyChecksum = strDup(pgTestChecksum);
                            }
                            MEM_CONTEXT_PRIOR_END();
                        }
                        // Else recopy when repo file is not as expected
                        else
                            result.backupCopyResult = backupCopyResultReCopy;
                    }
                    // Recopy on any kind of error
                    CATCH_ANY()
                    {
                        result.backupCopyResult = backupCopyResultReCopy;
                    }
                    TRY_END();
                }
            }
        }

        // Copy the file
        if (result.backupCopyResult == backupCopyResultCopy || 
result.backupCopyResult == backupCopyResultReCopy)
        {
            // Is the file compressible during the copy?
            bool compressible = repoFileCompressType == compressTypeNone && 
cipherType == cipherTypeNone;

            // Setup pg file for read. Only read as many bytes as passed in 
pgFileSize.  If the file is growing it does no good to
            // copy data past the end of the size recorded in the manifest 
since those blocks will need to be replayed from WAL
            // during recovery.
            StorageRead *read = storageNewReadP(
                storagePg(), pgFile, .ignoreMissing = pgFileIgnoreMissing, 
.compressible = compressible,
                .limit = VARUINT64(pgFileSize));
            ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(read)), 
cryptoHashNew(HASH_TYPE_SHA1_STR));
            ioFilterGroupAdd(ioReadFilterGroup(storageReadIo(read)), 
ioSizeNew());

            // Add page checksum filter
            if (pgFileChecksumPage)
            {
                ioFilterGroupAdd(
                    ioReadFilterGroup(storageReadIo(read)), 
pageChecksumNew(segmentNumber(pgFile), PG_SEGMENT_PAGE_DEFAULT,
                    pgFileChecksumPageLsnLimit));
            }

            // Add compression
            if (repoFileCompressType != compressTypeNone)
            {
                ioFilterGroupAdd(
                    ioReadFilterGroup(storageReadIo(read)), 
compressFilter(repoFileCompressType, repoFileCompressLevel));
            }

            // If there is a cipher then add the encrypt filter
            if (cipherType != cipherTypeNone)
            {
                ioFilterGroupAdd(
                    ioReadFilterGroup(
                        storageReadIo(read)), cipherBlockNew(cipherModeEncrypt, 
cipherType, BUFSTR(cipherPass), NULL));
            }

            // Setup the repo file for write
            StorageWrite *write = storageNewWriteP(storageRepoWrite(), 
repoPathFile, .compressible = compressible);
            ioFilterGroupAdd(ioWriteFilterGroup(storageWriteIo(write)), 
ioSizeNew());

            // Open the source and destination and copy the file
            if (storageCopy(read, write))
            {
                MEM_CONTEXT_PRIOR_BEGIN()
                {
                    // Get sizes and checksum
                    result.copySize = varUInt64Force(
                        
ioFilterGroupResult(ioReadFilterGroup(storageReadIo(read)), 
SIZE_FILTER_TYPE_STR));
                    result.copyChecksum = strDup(
                        
varStr(ioFilterGroupResult(ioReadFilterGroup(storageReadIo(read)), 
CRYPTO_HASH_FILTER_TYPE_STR)));
                    result.repoSize =
                        
varUInt64Force(ioFilterGroupResult(ioWriteFilterGroup(storageWriteIo(write)), 
SIZE_FILTER_TYPE_STR));

                    // Get results of page checksum validation
                    if (pgFileChecksumPage)
                    {
                        result.pageChecksumResult = kvDup(
                            
varKv(ioFilterGroupResult(ioReadFilterGroup(storageReadIo(read)), 
PAGE_CHECKSUM_FILTER_TYPE_STR)));
                    }
                }
                MEM_CONTEXT_PRIOR_END();
            }
            // Else if source file is missing and the read setup indicated 
ignore a missing file, the database removed it so skip it
            else
                result.backupCopyResult = backupCopyResultSkip;
        }

        // If the file was copied get the repo size only if the storage can 
store the files with a different size than what was
        // written. This has to be checked after the file is at rest because 
filesystem compression may affect the actual repo size
        // and this cannot be calculated in stream.
        //
        // If the file was checksummed then get the size in all cases since we 
don't already have it.
        if (((result.backupCopyResult == backupCopyResultCopy || 
result.backupCopyResult == backupCopyResultReCopy) &&
                storageFeature(storageRepo(), storageFeatureCompress)) ||
            result.backupCopyResult == backupCopyResultChecksum)
        {
            result.repoSize = storageInfoP(storageRepo(), repoPathFile).size;
        }
    }
    MEM_CONTEXT_TEMP_END();

    FUNCTION_LOG_RETURN(BACKUP_FILE_RESULT, result);
}

Reply via email to