On 4/12/20 11:04 AM, Robert Haas wrote:
On Sun, Apr 12, 2020 at 10:09 AM Magnus Hagander <mag...@hagander.net> wrote:
There are certainly cases for it. It might not be they have to be the same
connection, but still be the same session, meaning before the first time you
perform some step of authentication, get a token, and then use that for all the
files. You'd need somewhere to maintain that state, even if it doesn't happen
to be a socket. But there are definitely plenty of cases where keeping an open
socket can be a huge performance gain -- especially when it comes to not
re-negotiating encryption etc.
Hmm, OK.
When we implemented connection-sharing for S3 in pgBackRest it was a
significant performance boost, even for large files since they must be
uploaded in parts. The same goes for files transferred over SSH, though
in this case the overhead is per-file and can be mitigated with control
master.
We originally (late 2013) implemented everything with commmand-line
tools during the POC phase. The idea was to get something viable quickly
and then improve as needed. At the time our config file had entries
something like this:
[global:command]
compress=/usr/bin/gzip --stdout %file%
decompress=/usr/bin/gzip -dc %file%
checksum=/usr/bin/shasum %file% | awk '{print $1}'
manifest=/opt/local/bin/gfind %path% -printf
'%P\t%y\t%u\t%g\t%m\t%T@\t%i\t%s\t%l\n'
psql=/Library/PostgreSQL/9.3/bin/psql -X %option%
[db]
psql_options=--cluster=9.3/main
[db:command:option]
psql=--port=6001
These appear to be for MacOS, but Linux would be similar.
This *did* work, but it was really hard to debug when things went wrong,
the per-file cost was high, and the slight differences between the
command-line tools on different platforms was maddening. For example,
lots of versions of 'find' would error if a file disappeared while
building the manifest, which is a pretty common occurrence in PostgreSQL
(most newer distros had an option to fix this). I know that doesn't
apply here, but it's an example. Also, debugging was complicated with so
many processes, with any degree of parallelism the process list got
pretty crazy, fsync was not happening, etc. It's been a long time but I
don't have any good memories of the solution that used all command-line
tools.
Once we had a POC that solved our basic problem, i.e. backup up about
50TB of data reasonably efficiently, we immediately started working on a
version that did not rely on command-line tools and we never looked
back. Currently the only command-line tool we use is ssh.
I'm sure it would be possible to create a solution that worked better
than ours, but I'm pretty certain it would still be hard for users to
make it work correctly and to prove it worked correctly.
For compression and encryption, it could perhaps be as simple as "the command has to
be pipe on both input and output" and basically send the response back to
pg_basebackup.
But that won't help if the target is to relocate things...
Right. And, also, it forces things to be sequential in a way I'm not
too happy about. Like, if we have some kind of parallel backup, which
I hope we will, then you can imagine (among other possibilities)
getting files for each tablespace concurrently, and piping them
through the output command concurrently. But if we emit the result in
a tarfile, then it has to be sequential; there's just no other choice.
I think we should try to come up with something that can work in a
multi-threaded environment.
That is one way to go for it -- and in a case like that, I'd suggest the shellscript
interface would be an implementation of the other API. A number of times through the
years I've bounced ideas around for what to do with archive_command with different people
(never quite to the level of "it's time to write a patch"), and it's mostly
come down to some sort of shlib api where in turn we'd ship a backwards compatible
implementation that would behave like archive_command. I'd envision something similar
here.
I agree. Let's imagine that there are a conceptually unlimited number
of "targets" and "filters". Targets and filters accept data via the
same API, but a target is expected to dispose of the data, whereas a
filter is expected to pass it, via that same API, to a subsequent
filter or target. So filters could include things like "gzip", "lz4",
and "encrypt-with-rot13", whereas targets would include things like
"file" (the thing we have today - write my data into some local
files!), "shell" (which writes my data to a shell command, as
originally proposed), and maybe eventually things like "netbackup" and
"s3". Ideally this will all eventually be via a loadable module
interface so that third-party filters and targets can be fully
supported, but perhaps we could consider that an optional feature for
v1. Note that there is quite a bit of work to do here just to
reorganize the code.
I would expect that we would want to provide a flexible way for a
target or filter to be passed options from the pg_basebackup command
line. So one might for example write this:
pg_basebackup --filter='lz4 -9' --filter='encrypt-with-rot13
rotations=2' --target='shell ssh rhaas@depository pgfile
create-exclusive - %f.lz4'
The idea is that the first word of the filter or target identifies
which one should be used, and the rest is just options text in
whatever form the provider cares to accept them; but with some
%<character> substitutions allowed, for things like the file name.
(The aforementioned escaping problems for things like filenames with
spaces in them still need to be sorted out, but this is just a sketch,
so while I think it's quite solvable, I am going to refrain from
proposing a precise solution here.)
This is basically the solution we have landed on after many iterations.
We implement two types of filters, In and InOut. The In filters process
data and produce a result, e.g. SHA1, size, page checksum, etc. The
InOut filters modify data, e.g. compression, encryption. Yeah, the names
could probably be better...
I have attached our filter interface (filter.intern.h) as a concrete
example of how this works.
We call 'targets' storage and have a standard interface for creating
storage drivers. I have also attached our storage interface
(storage.intern.h) as a concrete example of how this works.
Note that for just performing backup this is overkill, but once you
consider verify this is pretty much the minimum storage interface
needed, according to our experience.
Regards,
--
-David
da...@pgmasters.net
/***********************************************************************************************************************************
IO Filter Interface Internal
Two types of filters are implemented using this interface: In and InOut.
In filters accept input and produce a result, but do not modify the input. An
example is the IoSize filter which counts all bytes
that pass through it.
InOut filters accept input and produce output (and perhaps a result). Because
the input/output buffers may not be the same size the
filter must be prepared to accept the same input again (by implementing
IoFilterInputSame) if the output buffer is too small to
accept all processed data. If the filter holds state even when inputSame is
false then it may also implement IoFilterDone to
indicate that the filter should be flushed (by passing NULL inputs) after all
input has been processed. InOut filters should strive
to fill the output buffer as much as possible, i.e., if the output buffer is
not full after processing then inputSame should be
false. An example is the IoBuffer filter which buffers data between unequally
sized input/output buffers.
Each filter has a type that allows it to be identified in the filter list.
***********************************************************************************************************************************/
#ifndef COMMON_IO_FILTER_FILTER_INTERN_H
#define COMMON_IO_FILTER_FILTER_INTERN_H
#include "common/io/filter/filter.h"
#include "common/type/variantList.h"
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
typedef struct IoFilterInterface
{
// Indicates that filter processing is done. This is used for filters that
have additional data to be flushed even after all
// input has been processed. Compression and encryption filters will
usually need to implement done. If done is not
// implemented then it will always return true if all input has been
consumed, i.e. if inputSame returns false.
bool (*done)(const void *driver);
// Processing function for filters that do not produce output. Note that
result must be implemented in this case (or else what
// would be the point.
void (*in)(void *driver, const Buffer *);
// Processing function for filters that produce output. InOut filters will
typically implement inputSame and may also implement
// done.
void (*inOut)(void *driver, const Buffer *, Buffer *);
// InOut filters must be prepared for an output buffer that is too small to
accept all the processed output. In this case the
// filter must implement inputSame and set it to true when there is more
output to be produced for a given input. On the next
// call to inOut the same input will be passed along with a fresh output
buffer with space for more processed output.
bool (*inputSame)(const void *driver);
// If the filter produces a result then this function must be implemented
to return the result. A result can be anything that
// is not processed output, e.g. a count of total bytes or a cryptographic
hash.
Variant *(*result)(void *driver);
} IoFilterInterface;
#define ioFilterNewP(type, driver, paramList, ...)
\
ioFilterNew(type, driver, paramList, (IoFilterInterface){__VA_ARGS__})
IoFilter *ioFilterNew(const String *type, void *driver, VariantList *paramList,
IoFilterInterface);
/***********************************************************************************************************************************
Functions
***********************************************************************************************************************************/
// Filter input only (a result is expected)
void ioFilterProcessIn(IoFilter *this, const Buffer *input);
// Filter input and produce output
void ioFilterProcessInOut(IoFilter *this, const Buffer *input, Buffer *output);
// Move filter to a new parent mem context
IoFilter *ioFilterMove(IoFilter *this, MemContext *parentNew);
/***********************************************************************************************************************************
Getters/Setters
***********************************************************************************************************************************/
// Is the filter done?
bool ioFilterDone(const IoFilter *this);
// Driver for the filter
void *ioFilterDriver(IoFilter *this);
// Does the filter need the same input again? If the filter cannot get all its
output into the output buffer then it may need access
// to the same input again.
bool ioFilterInputSame(const IoFilter *this);
// Interface for the filter
const IoFilterInterface *ioFilterInterface(const IoFilter *this);
// Does filter produce output? All In filters produce output.
bool ioFilterOutput(const IoFilter *this);
// List of filter parameters
const VariantList *ioFilterParamList(const IoFilter *this);
/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
#define FUNCTION_LOG_IO_FILTER_INTERFACE_TYPE
\
IoFilterInterface *
#define FUNCTION_LOG_IO_FILTER_INTERFACE_FORMAT(value, buffer, bufferSize)
\
objToLog(&value, "IoFilterInterface", buffer, bufferSize)
#endif
/***********************************************************************************************************************************
Storage Interface Internal
Storage drivers are implemented using this interface.
The interface has required and optional functions. Currently the otional
functions are only implemented by the Posix driver which
can store either a repository or a PostgreSQL cluster. Drivers that are
intended to store repositories only need to implement the
required functions.
The behavior of required functions is further modified by storage features
defined by the StorageFeature enum. Details are included
in the description of each function.
***********************************************************************************************************************************/
#ifndef STORAGE_STORAGE_INTERN_H
#define STORAGE_STORAGE_INTERN_H
#include "common/type/param.h"
#include "storage/read.intern.h"
#include "storage/storage.h"
#include "storage/write.intern.h"
/***********************************************************************************************************************************
Default file and path modes
***********************************************************************************************************************************/
#define STORAGE_MODE_FILE_DEFAULT 0640
#define STORAGE_MODE_PATH_DEFAULT 0750
/***********************************************************************************************************************************
Error messages
***********************************************************************************************************************************/
#define STORAGE_ERROR_READ_CLOSE "unable to
close file '%s' after read"
#define STORAGE_ERROR_READ_OPEN "unable to
open file '%s' for read"
#define STORAGE_ERROR_READ_MISSING "unable to
open missing file '%s' for read"
#define STORAGE_ERROR_INFO "unable to
get info for path/file '%s'"
#define STORAGE_ERROR_INFO_MISSING "unable to
get info for missing path/file '%s'"
#define STORAGE_ERROR_LIST_INFO "unable to
list file info for path '%s'"
#define STORAGE_ERROR_LIST_INFO_MISSING "unable to
list file info for missing path '%s'"
#define STORAGE_ERROR_PATH_REMOVE "unable to
remove path '%s'"
#define STORAGE_ERROR_PATH_REMOVE_FILE "unable to
remove file '%s'"
#define STORAGE_ERROR_PATH_REMOVE_MISSING "unable to
remove missing path '%s'"
#define STORAGE_ERROR_PATH_SYNC "unable to
sync path '%s'"
#define STORAGE_ERROR_PATH_SYNC_CLOSE "unable to
close path '%s' after sync"
#define STORAGE_ERROR_PATH_SYNC_OPEN "unable to
open path '%s' for sync"
#define STORAGE_ERROR_PATH_SYNC_MISSING "unable to
sync missing path '%s'"
#define STORAGE_ERROR_WRITE_CLOSE "unable to
close file '%s' after write"
#define STORAGE_ERROR_WRITE_OPEN "unable to
open file '%s' for write"
#define STORAGE_ERROR_WRITE_MISSING "unable to
open file '%s' for write in missing path"
#define STORAGE_ERROR_WRITE_SYNC "unable to
sync file '%s' after write"
/***********************************************************************************************************************************
Path expression callback function type - used to modify paths based on
expressions enclosed in <>
***********************************************************************************************************************************/
typedef String *StoragePathExpressionCallback(const String *expression, const
String *path);
/***********************************************************************************************************************************
Required interface functions
***********************************************************************************************************************************/
// Get information about a file/link/path
//
// The level parameter controls the amount of information that will be
returned. See the StorageInfo type and StorageInfoLevel enum
// for details about information that must be provided at each level. The
driver should only return the amount of information
// requested even if more is available. All drivers must implement the
storageInfoLevelExists and storageInfoLevelBasic levels. Only
// drivers with the storageFeatureInfoDetail feature must implement the
storageInfoLevelDetail level.
typedef struct StorageInterfaceInfoParam
{
VAR_PARAM_HEADER;
// Should symlinks be followed? Only required on storage that supports
symlinks.
bool followLink;
} StorageInterfaceInfoParam;
typedef StorageInfo StorageInterfaceInfo(
void *thisVoid, const String *file, StorageInfoLevel level,
StorageInterfaceInfoParam param);
#define storageInterfaceInfoP(thisVoid, file, level, ...)
\
STORAGE_COMMON_INTERFACE(thisVoid).info(thisVoid, file, level,
(StorageInterfaceInfoParam){VAR_PARAM_INIT, __VA_ARGS__})
//
---------------------------------------------------------------------------------------------------------------------------------
// Create a file read object. The file should not be opened immediately --
open() will be called on the IoRead interface when the
// file needs to be opened.
typedef struct StorageInterfaceNewReadParam
{
VAR_PARAM_HEADER;
// Is the file compressible? This is used when the file must be moved
across a network and temporary compression is helpful.
bool compressible;
// Limit bytes read from the file. NULL for no limit.
const Variant *limit;
} StorageInterfaceNewReadParam;
typedef StorageRead *StorageInterfaceNewRead(
void *thisVoid, const String *file, bool ignoreMissing,
StorageInterfaceNewReadParam param);
#define storageInterfaceNewReadP(thisVoid, file, ignoreMissing, ...)
\
STORAGE_COMMON_INTERFACE(thisVoid).newRead(
\
thisVoid, file, ignoreMissing,
(StorageInterfaceNewReadParam){VAR_PARAM_INIT, __VA_ARGS__})
//
---------------------------------------------------------------------------------------------------------------------------------
// Create a file write object. The file should not be opened immediately --
open() will be called on the IoWrite interface when the
// file needs to be opened.
typedef struct StorageInterfaceNewWriteParam
{
VAR_PARAM_HEADER;
// File/path mode for storage that supports Posix-style permissions.
modePath is only used in conjunction with createPath.
mode_t modeFile;
mode_t modePath;
// User/group name
const String *user;
const String *group;
// Modified time
time_t timeModified;
// Will paths be created as needed?
bool createPath;
// Sync file/path when required by the storage
bool syncFile;
bool syncPath;
// Ensure the file is written atomically. If this is false it's OK to write
atomically if that's all the storage supports
// (e.g. S3). Non-atomic writes are used in some places where there is a
performance advantage and atomicity is not needed.
bool atomic;
// Is the file compressible? This is used when the file must be moved
across a network and temporary compression is helpful.
bool compressible;
} StorageInterfaceNewWriteParam;
typedef StorageWrite *StorageInterfaceNewWrite(void *thisVoid, const String
*file, StorageInterfaceNewWriteParam param);
#define storageInterfaceNewWriteP(thisVoid, file, ...)
\
STORAGE_COMMON_INTERFACE(thisVoid).newWrite(thisVoid, file,
(StorageInterfaceNewWriteParam){VAR_PARAM_INIT, __VA_ARGS__})
//
---------------------------------------------------------------------------------------------------------------------------------
// Get info for a path and all paths/files in the path (does not recurse)
//
// See storageInterfaceInfoP() for usage of the level parameter.
typedef struct StorageInterfaceInfoListParam
{
VAR_PARAM_HEADER;
// Regular expression used to filter the results. The expression is always
checked in the callback passed to
// storageInterfaceInfoListP() so checking the expression in the driver is
entirely optional. The driver should only use the
// expression if it can improve performance or limit network transfer.
//
// Partial matching of the expression is fine as long as nothing that
should match is excluded, e.g. it is OK to prefix match
// using the prefix returned from regExpPrefix(). This may cause extra
results to be sent to the callback but won't exclude
// anything that matches the expression exactly.
const String *expression;
} StorageInterfaceInfoListParam;
typedef bool StorageInterfaceInfoList(
void *thisVoid, const String *path, StorageInfoLevel level,
StorageInfoListCallback callback, void *callbackData,
StorageInterfaceInfoListParam param);
#define storageInterfaceInfoListP(thisVoid, path, level, callback,
callbackData, ...) \
STORAGE_COMMON_INTERFACE(thisVoid).infoList(
\
thisVoid, path, level, callback, callbackData,
(StorageInterfaceInfoListParam){VAR_PARAM_INIT, __VA_ARGS__})
//
---------------------------------------------------------------------------------------------------------------------------------
// Remove a path (and optionally recurse)
typedef struct StorageInterfacePathRemoveParam
{
VAR_PARAM_HEADER;
} StorageInterfacePathRemoveParam;
typedef bool StorageInterfacePathRemove(void *thisVoid, const String *path,
bool recurse, StorageInterfacePathRemoveParam param);
#define storageInterfacePathRemoveP(thisVoid, path, recurse, ...)
\
STORAGE_COMMON_INTERFACE(thisVoid).pathRemove(
\
thisVoid, path, recurse,
(StorageInterfacePathRemoveParam){VAR_PARAM_INIT, __VA_ARGS__})
//
---------------------------------------------------------------------------------------------------------------------------------
// Remove a file
typedef struct StorageInterfaceRemoveParam
{
VAR_PARAM_HEADER;
// Error when the file to delete is missing
bool errorOnMissing;
} StorageInterfaceRemoveParam;
typedef void StorageInterfaceRemove(void *thisVoid, const String *file,
StorageInterfaceRemoveParam param);
#define storageInterfaceRemoveP(thisVoid, file, ...)
\
STORAGE_COMMON_INTERFACE(thisVoid).remove(thisVoid, file,
(StorageInterfaceRemoveParam){VAR_PARAM_INIT, __VA_ARGS__})
/***********************************************************************************************************************************
Optional interface functions
***********************************************************************************************************************************/
// Move a path/file atomically
typedef struct StorageInterfaceMoveParam
{
VAR_PARAM_HEADER;
} StorageInterfaceMoveParam;
typedef bool StorageInterfaceMove(void *thisVoid, StorageRead *source,
StorageWrite *destination, StorageInterfaceMoveParam param);
#define storageInterfaceMoveP(thisVoid, source, destination, ...)
\
STORAGE_COMMON_INTERFACE(thisVoid).move(
\
thisVoid, source, destination,
(StorageInterfaceMoveParam){VAR_PARAM_INIT, __VA_ARGS__})
//
---------------------------------------------------------------------------------------------------------------------------------
// Create a path
typedef struct StorageInterfacePathCreateParam
{
VAR_PARAM_HEADER;
} StorageInterfacePathCreateParam;
typedef void StorageInterfacePathCreate(
void *thisVoid, const String *path, bool errorOnExists, bool
noParentCreate, mode_t mode,
StorageInterfacePathCreateParam param);
#define storageInterfacePathCreateP(thisVoid, path, errorOnExists,
noParentCreate, mode, ...) \
STORAGE_COMMON_INTERFACE(thisVoid).pathCreate(
\
thisVoid, path, errorOnExists, noParentCreate, mode,
(StorageInterfacePathCreateParam){VAR_PARAM_INIT, __VA_ARGS__})
//
---------------------------------------------------------------------------------------------------------------------------------
// Sync a path
typedef struct StorageInterfacePathSyncParam
{
VAR_PARAM_HEADER;
} StorageInterfacePathSyncParam;
typedef void StorageInterfacePathSync(void *thisVoid, const String *path,
StorageInterfacePathSyncParam param);
#define storageInterfacePathSyncP(thisVoid, path, ...)
\
STORAGE_COMMON_INTERFACE(thisVoid).pathSync(thisVoid, path,
(StorageInterfacePathSyncParam){VAR_PARAM_INIT, __VA_ARGS__})
/***********************************************************************************************************************************
Constructors
***********************************************************************************************************************************/
typedef struct StorageInterface
{
// Features implemented by the storage driver
uint64_t feature;
// Required functions
StorageInterfaceInfo *info;
StorageInterfaceInfoList *infoList;
StorageInterfaceNewRead *newRead;
StorageInterfaceNewWrite *newWrite;
StorageInterfacePathRemove *pathRemove;
StorageInterfaceRemove *remove;
// Optional functions
StorageInterfaceMove *move;
StorageInterfacePathCreate *pathCreate;
StorageInterfacePathSync *pathSync;
} StorageInterface;
#define storageNewP(type, path, modeFile, modePath, write,
pathExpressionFunction, driver, ...) \
storageNew(type, path, modeFile, modePath, write, pathExpressionFunction,
driver, (StorageInterface){__VA_ARGS__})
Storage *storageNew(
const String *type, const String *path, mode_t modeFile, mode_t modePath,
bool write,
StoragePathExpressionCallback pathExpressionFunction, void *driver,
StorageInterface interface);
/***********************************************************************************************************************************
Common members to include in every storage driver and macros to extract the
common elements
***********************************************************************************************************************************/
#define STORAGE_COMMON_MEMBER
\
StorageInterface interface /* Storage
interface */
typedef struct StorageCommon
{
STORAGE_COMMON_MEMBER;
} StorageCommon;
#define STORAGE_COMMON(thisVoid)
\
((const StorageCommon *)thisVoid)
#define STORAGE_COMMON_INTERFACE(thisVoid)
\
(STORAGE_COMMON(thisVoid)->interface)
/***********************************************************************************************************************************
Getters/Setters
***********************************************************************************************************************************/
// Storage driver
void *storageDriver(const Storage *this);
// Storage interface
StorageInterface storageInterface(const Storage *this);
/***********************************************************************************************************************************
Macros for function logging
***********************************************************************************************************************************/
#define FUNCTION_LOG_STORAGE_INTERFACE_TYPE
\
StorageInterface
#define FUNCTION_LOG_STORAGE_INTERFACE_FORMAT(value, buffer, bufferSize)
\
objToLog(&value, "StorageInterface", buffer, bufferSize)
#endif