On 4/12/20 11:04 AM, Robert Haas wrote:
On Sun, Apr 12, 2020 at 10:09 AM Magnus Hagander <mag...@hagander.net> wrote:
There are certainly cases for it. It might not be they have to be the same 
connection, but still be the same session, meaning before the first time you 
perform some step of authentication, get a token, and then use that for all the 
files. You'd need somewhere to maintain that state, even if it doesn't happen 
to be a socket. But there are definitely plenty of cases where keeping an open 
socket can be a huge performance gain -- especially when it comes to not 
re-negotiating encryption etc.

Hmm, OK.

When we implemented connection-sharing for S3 in pgBackRest it was a significant performance boost, even for large files since they must be uploaded in parts. The same goes for files transferred over SSH, though in this case the overhead is per-file and can be mitigated with control master.

We originally (late 2013) implemented everything with commmand-line tools during the POC phase. The idea was to get something viable quickly and then improve as needed. At the time our config file had entries something like this:

[global:command]
compress=/usr/bin/gzip --stdout %file%
decompress=/usr/bin/gzip -dc %file%
checksum=/usr/bin/shasum %file% | awk '{print $1}'
manifest=/opt/local/bin/gfind %path% -printf '%P\t%y\t%u\t%g\t%m\t%T@\t%i\t%s\t%l\n'
psql=/Library/PostgreSQL/9.3/bin/psql -X %option%

[db]
psql_options=--cluster=9.3/main

[db:command:option]
psql=--port=6001

These appear to be for MacOS, but Linux would be similar.

This *did* work, but it was really hard to debug when things went wrong, the per-file cost was high, and the slight differences between the command-line tools on different platforms was maddening. For example, lots of versions of 'find' would error if a file disappeared while building the manifest, which is a pretty common occurrence in PostgreSQL (most newer distros had an option to fix this). I know that doesn't apply here, but it's an example. Also, debugging was complicated with so many processes, with any degree of parallelism the process list got pretty crazy, fsync was not happening, etc. It's been a long time but I don't have any good memories of the solution that used all command-line tools.

Once we had a POC that solved our basic problem, i.e. backup up about 50TB of data reasonably efficiently, we immediately started working on a version that did not rely on command-line tools and we never looked back. Currently the only command-line tool we use is ssh.

I'm sure it would be possible to create a solution that worked better than ours, but I'm pretty certain it would still be hard for users to make it work correctly and to prove it worked correctly.

For compression and encryption, it could perhaps be as simple as "the command has to 
be pipe on both input and output" and basically send the response back to 
pg_basebackup.

But that won't help if the target is to relocate things...

Right. And, also, it forces things to be sequential in a way I'm not
too happy about. Like, if we have some kind of parallel backup, which
I hope we will, then you can imagine (among other possibilities)
getting files for each tablespace concurrently, and piping them
through the output command concurrently. But if we emit the result in
a tarfile, then it has to be sequential; there's just no other choice.
I think we should try to come up with something that can work in a
multi-threaded environment.

That is one way to go for it -- and in a case like that, I'd suggest the shellscript 
interface would be an implementation of the other API. A number of times through the 
years I've bounced ideas around for what to do with archive_command with different people 
(never quite to the level of "it's time to write a patch"), and it's mostly 
come down to some sort of shlib api where in turn we'd ship a backwards compatible 
implementation that would behave like archive_command. I'd envision something similar 
here.

I agree. Let's imagine that there are a conceptually unlimited number
of "targets" and "filters". Targets and filters accept data via the
same API, but a target is expected to dispose of the data, whereas a
filter is expected to pass it, via that same API, to a subsequent
filter or target. So filters could include things like "gzip", "lz4",
and "encrypt-with-rot13", whereas targets would include things like
"file" (the thing we have today - write my data into some local
files!), "shell" (which writes my data to a shell command, as
originally proposed), and maybe eventually things like "netbackup" and
"s3". Ideally this will all eventually be via a loadable module
interface so that third-party filters and targets can be fully
supported, but perhaps we could consider that an optional feature for
v1. Note that there is quite a bit of work to do here just to
reorganize the code.

I would expect that we would want to provide a flexible way for a
target or filter to be passed options from the pg_basebackup command
line. So one might for example write this:

pg_basebackup --filter='lz4 -9' --filter='encrypt-with-rot13
rotations=2' --target='shell ssh rhaas@depository pgfile
create-exclusive - %f.lz4'

The idea is that the first word of the filter or target identifies
which one should be used, and the rest is just options text in
whatever form the provider cares to accept them; but with some
%<character> substitutions allowed, for things like the file name.
(The aforementioned escaping problems for things like filenames with
spaces in them still need to be sorted out, but this is just a sketch,
so while I think it's quite solvable, I am going to refrain from
proposing a precise solution here.)

This is basically the solution we have landed on after many iterations.

We implement two types of filters, In and InOut. The In filters process data and produce a result, e.g. SHA1, size, page checksum, etc. The InOut filters modify data, e.g. compression, encryption. Yeah, the names could probably be better...

I have attached our filter interface (filter.intern.h) as a concrete example of how this works.

We call 'targets' storage and have a standard interface for creating storage drivers. I have also attached our storage interface (storage.intern.h) as a concrete example of how this works.

Note that for just performing backup this is overkill, but once you consider verify this is pretty much the minimum storage interface needed, according to our experience.

Regards,
--
-David
da...@pgmasters.net
/***********************************************************************************************************************************
IO Filter Interface Internal

Two types of filters are implemented using this interface:  In and InOut.

In filters accept input and produce a result, but do not modify the input.  An 
example is the IoSize filter which counts all bytes
that pass through it.

InOut filters accept input and produce output (and perhaps a result).  Because 
the input/output buffers may not be the same size the
filter must be prepared to accept the same input again (by implementing 
IoFilterInputSame) if the output buffer is too small to
accept all processed data.  If the filter holds state even when inputSame is 
false then it may also implement IoFilterDone to
indicate that the filter should be flushed (by passing NULL inputs) after all 
input has been processed.  InOut filters should strive
to fill the output buffer as much as possible, i.e., if the output buffer is 
not full after processing then inputSame should be
false.  An example is the IoBuffer filter which buffers data between unequally 
sized input/output buffers.

Each filter has a type that allows it to be identified in the filter list.
***********************************************************************************************************************************/
#ifndef COMMON_IO_FILTER_FILTER_INTERN_H
#define COMMON_IO_FILTER_FILTER_INTERN_H

#include "common/io/filter/filter.h"
#include "common/type/variantList.h"

/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
typedef struct IoFilterInterface
{
    // Indicates that filter processing is done.  This is used for filters that 
have additional data to be flushed even after all
    // input has been processed.  Compression and encryption filters will 
usually need to implement done.  If done is not
    // implemented then it will always return true if all input has been 
consumed, i.e. if inputSame returns false.
    bool (*done)(const void *driver);

    // Processing function for filters that do not produce output.  Note that 
result must be implemented in this case (or else what
    // would be the point.
    void (*in)(void *driver, const Buffer *);

    // Processing function for filters that produce output.  InOut filters will 
typically implement inputSame and may also implement
    // done.
    void (*inOut)(void *driver, const Buffer *, Buffer *);

    // InOut filters must be prepared for an output buffer that is too small to 
accept all the processed output.  In this case the
    // filter must implement inputSame and set it to true when there is more 
output to be produced for a given input.  On the next
    // call to inOut the same input will be passed along with a fresh output 
buffer with space for more processed output.
    bool (*inputSame)(const void *driver);

    // If the filter produces a result then this function must be implemented 
to return the result.  A result can be anything that
    // is not processed output, e.g. a count of total bytes or a cryptographic 
hash.
    Variant *(*result)(void *driver);
} IoFilterInterface;

#define ioFilterNewP(type, driver, paramList, ...)                              
                                                   \
    ioFilterNew(type, driver, paramList, (IoFilterInterface){__VA_ARGS__})

IoFilter *ioFilterNew(const String *type, void *driver, VariantList *paramList, 
IoFilterInterface);

/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
// Filter input only (a result is expected)
void ioFilterProcessIn(IoFilter *this, const Buffer *input);

// Filter input and produce output
void ioFilterProcessInOut(IoFilter *this, const Buffer *input, Buffer *output);

// Move filter to a new parent mem context
IoFilter *ioFilterMove(IoFilter *this, MemContext *parentNew);

/***********************************************************************************************************************************
Getters/Setters
***********************************************************************************************************************************/
// Is the filter done?
bool ioFilterDone(const IoFilter *this);

// Driver for the filter
void *ioFilterDriver(IoFilter *this);

// Does the filter need the same input again? If the filter cannot get all its 
output into the output buffer then it may need access
// to the same input again.
bool ioFilterInputSame(const IoFilter *this);

// Interface for the filter
const IoFilterInterface *ioFilterInterface(const IoFilter *this);

// Does filter produce output? All In filters produce output.
bool ioFilterOutput(const IoFilter *this);

// List of filter parameters
const VariantList *ioFilterParamList(const IoFilter *this);

/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
#define FUNCTION_LOG_IO_FILTER_INTERFACE_TYPE                                   
                                                   \
    IoFilterInterface *
#define FUNCTION_LOG_IO_FILTER_INTERFACE_FORMAT(value, buffer, bufferSize)      
                                                   \
    objToLog(&value, "IoFilterInterface", buffer, bufferSize)

#endif
/***********************************************************************************************************************************
Storage Interface Internal

Storage drivers are implemented using this interface.

The interface has required and optional functions. Currently the otional 
functions are only implemented by the Posix driver which
can store either a repository or a PostgreSQL cluster. Drivers that are 
intended to store repositories only need to implement the
required functions.

The behavior of required functions is further modified by storage features 
defined by the StorageFeature enum. Details are included
in the description of each function.
***********************************************************************************************************************************/
#ifndef STORAGE_STORAGE_INTERN_H
#define STORAGE_STORAGE_INTERN_H

#include "common/type/param.h"
#include "storage/read.intern.h"
#include "storage/storage.h"
#include "storage/write.intern.h"

/***********************************************************************************************************************************
Default file and path modes
***********************************************************************************************************************************/
#define STORAGE_MODE_FILE_DEFAULT                                   0640
#define STORAGE_MODE_PATH_DEFAULT                                   0750

/***********************************************************************************************************************************
Error messages
***********************************************************************************************************************************/
#define STORAGE_ERROR_READ_CLOSE                                    "unable to 
close file '%s' after read"
#define STORAGE_ERROR_READ_OPEN                                     "unable to 
open file '%s' for read"
#define STORAGE_ERROR_READ_MISSING                                  "unable to 
open missing file '%s' for read"

#define STORAGE_ERROR_INFO                                          "unable to 
get info for path/file '%s'"
#define STORAGE_ERROR_INFO_MISSING                                  "unable to 
get info for missing path/file '%s'"

#define STORAGE_ERROR_LIST_INFO                                     "unable to 
list file info for path '%s'"
#define STORAGE_ERROR_LIST_INFO_MISSING                             "unable to 
list file info for missing path '%s'"

#define STORAGE_ERROR_PATH_REMOVE                                   "unable to 
remove path '%s'"
#define STORAGE_ERROR_PATH_REMOVE_FILE                              "unable to 
remove file '%s'"
#define STORAGE_ERROR_PATH_REMOVE_MISSING                           "unable to 
remove missing path '%s'"

#define STORAGE_ERROR_PATH_SYNC                                     "unable to 
sync path '%s'"
#define STORAGE_ERROR_PATH_SYNC_CLOSE                               "unable to 
close path '%s' after sync"
#define STORAGE_ERROR_PATH_SYNC_OPEN                                "unable to 
open path '%s' for sync"
#define STORAGE_ERROR_PATH_SYNC_MISSING                             "unable to 
sync missing path '%s'"

#define STORAGE_ERROR_WRITE_CLOSE                                   "unable to 
close file '%s' after write"
#define STORAGE_ERROR_WRITE_OPEN                                    "unable to 
open file '%s' for write"
#define STORAGE_ERROR_WRITE_MISSING                                 "unable to 
open file '%s' for write in missing path"
#define STORAGE_ERROR_WRITE_SYNC                                    "unable to 
sync file '%s' after write"

/***********************************************************************************************************************************
Path expression callback function type - used to modify paths based on 
expressions enclosed in <>
***********************************************************************************************************************************/
typedef String *StoragePathExpressionCallback(const String *expression, const 
String *path);

/***********************************************************************************************************************************
Required interface functions
***********************************************************************************************************************************/
// Get information about a file/link/path
//
// The level parameter controls the amount of information that will be 
returned. See the StorageInfo type and StorageInfoLevel enum
// for details about information that must be provided at each level. The 
driver should only return the amount of information
// requested even if more is available. All drivers must implement the 
storageInfoLevelExists and storageInfoLevelBasic levels. Only
// drivers with the storageFeatureInfoDetail feature must implement the 
storageInfoLevelDetail level.
typedef struct StorageInterfaceInfoParam
{
    VAR_PARAM_HEADER;

    // Should symlinks be followed?  Only required on storage that supports 
symlinks.
    bool followLink;
} StorageInterfaceInfoParam;

typedef StorageInfo StorageInterfaceInfo(
    void *thisVoid, const String *file, StorageInfoLevel level, 
StorageInterfaceInfoParam param);

#define storageInterfaceInfoP(thisVoid, file, level, ...)                       
                                                   \
    STORAGE_COMMON_INTERFACE(thisVoid).info(thisVoid, file, level, 
(StorageInterfaceInfoParam){VAR_PARAM_INIT, __VA_ARGS__})

// 
---------------------------------------------------------------------------------------------------------------------------------
// Create a file read object.  The file should not be opened immediately -- 
open() will be called on the IoRead interface when the
// file needs to be opened.
typedef struct StorageInterfaceNewReadParam
{
    VAR_PARAM_HEADER;

    // Is the file compressible? This is used when the file must be moved 
across a network and temporary compression is helpful.
    bool compressible;

    // Limit bytes read from the file. NULL for no limit.
    const Variant *limit;
} StorageInterfaceNewReadParam;

typedef StorageRead *StorageInterfaceNewRead(
    void *thisVoid, const String *file, bool ignoreMissing, 
StorageInterfaceNewReadParam param);

#define storageInterfaceNewReadP(thisVoid, file, ignoreMissing, ...)            
                                                   \
    STORAGE_COMMON_INTERFACE(thisVoid).newRead(                                 
                                                   \
        thisVoid, file, ignoreMissing, 
(StorageInterfaceNewReadParam){VAR_PARAM_INIT, __VA_ARGS__})

// 
---------------------------------------------------------------------------------------------------------------------------------
// Create a file write object.  The file should not be opened immediately -- 
open() will be called on the IoWrite interface when the
// file needs to be opened.
typedef struct StorageInterfaceNewWriteParam
{
    VAR_PARAM_HEADER;

    // File/path mode for storage that supports Posix-style permissions.  
modePath is only used in conjunction with createPath.
    mode_t modeFile;
    mode_t modePath;

    // User/group name
    const String *user;
    const String *group;

    // Modified time
    time_t timeModified;

    // Will paths be created as needed?
    bool createPath;

    // Sync file/path when required by the storage
    bool syncFile;
    bool syncPath;

    // Ensure the file is written atomically. If this is false it's OK to write 
atomically if that's all the storage supports
    // (e.g. S3). Non-atomic writes are used in some places where there is a 
performance advantage and atomicity is not needed.
    bool atomic;

    // Is the file compressible?  This is used when the file must be moved 
across a network and temporary compression is helpful.
    bool compressible;
} StorageInterfaceNewWriteParam;

typedef StorageWrite *StorageInterfaceNewWrite(void *thisVoid, const String 
*file, StorageInterfaceNewWriteParam param);

#define storageInterfaceNewWriteP(thisVoid, file, ...)                          
                                                   \
    STORAGE_COMMON_INTERFACE(thisVoid).newWrite(thisVoid, file, 
(StorageInterfaceNewWriteParam){VAR_PARAM_INIT, __VA_ARGS__})

// 
---------------------------------------------------------------------------------------------------------------------------------
// Get info for a path and all paths/files in the path (does not recurse)
//
// See storageInterfaceInfoP() for usage of the level parameter.
typedef struct StorageInterfaceInfoListParam
{
    VAR_PARAM_HEADER;

    // Regular expression used to filter the results. The expression is always 
checked in the callback passed to
    // storageInterfaceInfoListP() so checking the expression in the driver is 
entirely optional. The driver should only use the
    // expression if it can improve performance or limit network transfer.
    //
    // Partial matching of the expression is fine as long as nothing that 
should match is excluded, e.g. it is OK to prefix match
    // using the prefix returned from regExpPrefix(). This may cause extra 
results to be sent to the callback but won't exclude
    // anything that matches the expression exactly.
    const String *expression;
} StorageInterfaceInfoListParam;

typedef bool StorageInterfaceInfoList(
    void *thisVoid, const String *path, StorageInfoLevel level, 
StorageInfoListCallback callback, void *callbackData,
    StorageInterfaceInfoListParam param);

#define storageInterfaceInfoListP(thisVoid, path, level, callback, 
callbackData, ...)                                              \
    STORAGE_COMMON_INTERFACE(thisVoid).infoList(                                
                                                   \
        thisVoid, path, level, callback, callbackData, 
(StorageInterfaceInfoListParam){VAR_PARAM_INIT, __VA_ARGS__})

// 
---------------------------------------------------------------------------------------------------------------------------------
// Remove a path (and optionally recurse)
typedef struct StorageInterfacePathRemoveParam
{
    VAR_PARAM_HEADER;
} StorageInterfacePathRemoveParam;

typedef bool StorageInterfacePathRemove(void *thisVoid, const String *path, 
bool recurse, StorageInterfacePathRemoveParam param);

#define storageInterfacePathRemoveP(thisVoid, path, recurse, ...)               
                                                   \
    STORAGE_COMMON_INTERFACE(thisVoid).pathRemove(                              
                                                   \
        thisVoid, path, recurse, 
(StorageInterfacePathRemoveParam){VAR_PARAM_INIT, __VA_ARGS__})

// 
---------------------------------------------------------------------------------------------------------------------------------
// Remove a file
typedef struct StorageInterfaceRemoveParam
{
    VAR_PARAM_HEADER;

    // Error when the file to delete is missing
    bool errorOnMissing;
} StorageInterfaceRemoveParam;

typedef void StorageInterfaceRemove(void *thisVoid, const String *file, 
StorageInterfaceRemoveParam param);

#define storageInterfaceRemoveP(thisVoid, file, ...)                            
                                                   \
    STORAGE_COMMON_INTERFACE(thisVoid).remove(thisVoid, file, 
(StorageInterfaceRemoveParam){VAR_PARAM_INIT, __VA_ARGS__})

/***********************************************************************************************************************************
Optional interface functions
***********************************************************************************************************************************/
// Move a path/file atomically
typedef struct StorageInterfaceMoveParam
{
    VAR_PARAM_HEADER;
} StorageInterfaceMoveParam;

typedef bool StorageInterfaceMove(void *thisVoid, StorageRead *source, 
StorageWrite *destination, StorageInterfaceMoveParam param);

#define storageInterfaceMoveP(thisVoid, source, destination, ...)               
                                                   \
    STORAGE_COMMON_INTERFACE(thisVoid).move(                                    
                                                   \
        thisVoid, source, destination, 
(StorageInterfaceMoveParam){VAR_PARAM_INIT, __VA_ARGS__})

// 
---------------------------------------------------------------------------------------------------------------------------------
// Create a path
typedef struct StorageInterfacePathCreateParam
{
    VAR_PARAM_HEADER;
} StorageInterfacePathCreateParam;

typedef void StorageInterfacePathCreate(
    void *thisVoid, const String *path, bool errorOnExists, bool 
noParentCreate, mode_t mode,
    StorageInterfacePathCreateParam param);

#define storageInterfacePathCreateP(thisVoid, path, errorOnExists, 
noParentCreate, mode, ...)                                      \
    STORAGE_COMMON_INTERFACE(thisVoid).pathCreate(                              
                                                   \
        thisVoid, path, errorOnExists, noParentCreate, mode, 
(StorageInterfacePathCreateParam){VAR_PARAM_INIT, __VA_ARGS__})

// 
---------------------------------------------------------------------------------------------------------------------------------
// Sync a path
typedef struct StorageInterfacePathSyncParam
{
    VAR_PARAM_HEADER;
} StorageInterfacePathSyncParam;

typedef void StorageInterfacePathSync(void *thisVoid, const String *path, 
StorageInterfacePathSyncParam param);

#define storageInterfacePathSyncP(thisVoid, path, ...)                          
                                                   \
    STORAGE_COMMON_INTERFACE(thisVoid).pathSync(thisVoid, path, 
(StorageInterfacePathSyncParam){VAR_PARAM_INIT, __VA_ARGS__})

/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
typedef struct StorageInterface
{
    // Features implemented by the storage driver
    uint64_t feature;

    // Required functions
    StorageInterfaceInfo *info;
    StorageInterfaceInfoList *infoList;
    StorageInterfaceNewRead *newRead;
    StorageInterfaceNewWrite *newWrite;
    StorageInterfacePathRemove *pathRemove;
    StorageInterfaceRemove *remove;

    // Optional functions
    StorageInterfaceMove *move;
    StorageInterfacePathCreate *pathCreate;
    StorageInterfacePathSync *pathSync;
} StorageInterface;

#define storageNewP(type, path, modeFile, modePath, write, 
pathExpressionFunction, driver, ...)                                    \
    storageNew(type, path, modeFile, modePath, write, pathExpressionFunction, 
driver, (StorageInterface){__VA_ARGS__})

Storage *storageNew(
    const String *type, const String *path, mode_t modeFile, mode_t modePath, 
bool write,
    StoragePathExpressionCallback pathExpressionFunction, void *driver, 
StorageInterface interface);

/***********************************************************************************************************************************
Common members to include in every storage driver and macros to extract the 
common elements
***********************************************************************************************************************************/
#define STORAGE_COMMON_MEMBER                                                   
                                                   \
    StorageInterface interface                                      /* Storage 
interface */

typedef struct StorageCommon
{
    STORAGE_COMMON_MEMBER;
} StorageCommon;

#define STORAGE_COMMON(thisVoid)                                                
                                                   \
    ((const StorageCommon *)thisVoid)
#define STORAGE_COMMON_INTERFACE(thisVoid)                                      
                                                   \
    (STORAGE_COMMON(thisVoid)->interface)

/***********************************************************************************************************************************
Getters/Setters
***********************************************************************************************************************************/
// Storage driver
void *storageDriver(const Storage *this);

// Storage interface
StorageInterface storageInterface(const Storage *this);

/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
#define FUNCTION_LOG_STORAGE_INTERFACE_TYPE                                     
                                                   \
    StorageInterface
#define FUNCTION_LOG_STORAGE_INTERFACE_FORMAT(value, buffer, bufferSize)        
                                                   \
    objToLog(&value, "StorageInterface", buffer, bufferSize)

#endif

Reply via email to