On Wed, Jan 20, 2016 at 7:57 AM, Craig Ringer <cr...@2ndquadrant.com> wrote:
> On 15 January 2016 at 16:30, Shulgin, Oleksandr < > oleksandr.shul...@zalando.de> wrote: > > >> I'd like to propose generic functions (probably in an extension, or in >> core if not possible otherwise) to facilitate streaming existing data from >> the database *in the same format* that one would get if these would be the >> changes decoded by a logical decoding plugin. >> > > So effectively produce synthetic logical decoding callbacks to run a bunch > of fake INSERTs, presumably with a fake xid etc? > Exactly. > The idea is to use a snapshot returned from CREATE_REPLICATION_SLOT >> command of the replication protocol to get a consistent snapshot of the >> database, then start listening to new changes on the slot. >> > > My impression is that you want to avoid the current step of "synchronize > database initial contents" when using logical decoding for replication. > Yes, but... > But I guess you're looking to then populate that empty schema in-band via > logical decoding, rather than having to do a --data-only dump or use COPY. > Right? > > That won't help you for schema; presumably you'd still do a pg_dump > --schema-only | pg_restore for that. > > Just like when restoring a --data-only dump or using COPY you'd have to > disable FKs during sync, but that's pretty much unavoidable. > All of this implies another *postgres* database on the receiving side, which is not necessarily the case for my research. The way this initial export phase is implemented there is by providing a >> SQL-callable set returning function which is using SPI to run "SELECT * >> FROM mytable" behind the scenes and runs the resulting tuples through the >> INSERT callback of the logical decoding plugin, which lives in the same >> loadable module as this SQL function. >> > > o_O > > What about the reorder buffer, the logical decoding memory context, etc? > As shown by the POC patch, it is rather straightforward to achieve. Bottled Water logical decoding plugin uses binary protocol based on Avro >> data serialization library. As an experiment I was adding support for JSON >> output format to it, and for that I had to re-implement the aforementioned >> SRF to export initial data to convert tuples to JSON instead. >> > > Have you taken a look at what's been done with pglogical and > pglogical_output? > > We've got extensible protocol support there, and if Avro offers compelling > benefits over the current binary serialization I'm certainly interested in > hearing about it. > This is what I'm going to benchmark. With the generic function I can just create two slots: one for pglogical and another one for BottledWater/Avro and see which one performs better when forced to stream some TB worth of INSERTs through the change callback. What do you say? >> > > Interesting idea. As outlined I think it sounds pretty fragile though; I > really, really don't like the idea of lying to the insert callback by > passing it a fake insert with (presumably) fake reorder buffer txn, etc. > Fair enough. However for performance testing it could be not that bad, even if nothing of that lands in the actual API. What we've done in pglogical is take a --schema-only dump then, on the > downstream, attach to the exported snapshot and use COPY ... TO STDOUT over > a libpq connection to the upstream feed that to COPY ... FROM STDIN on > another libpq connection to "ourselves", i.e. the downstream. Unless Petr > changed it to use COPY innards directly on the downstream; I know he talked > about it but haven't checked if he did. Anyway, either way it's not pretty > and requires a sideband non-replication connection to sync initial state. > The upside is that it can be relatively easily parallelized for faster sync > using multiple connections. > I've also measured that to have a baseline for comparing it to decoding performance. To what extent are you setting up a true logical decoding context here? > It is done in the same way exact pg_logical_slot_get/peek_changes() do. > Where does the xact info come from? The commit record? etc. > palloc0() > You're presumably not forming a reorder buffer then decoding it since it > could create a massive tempfile on disk, so are you just dummying this info > up? > In my experience, it doesn't. We know it's going to be a "committed xact", so we don't really need to queue the changes up before we see a "commit" record. > Or hoping the plugin won't look at it? > Pretty much. :-) The functionality is good and I think that for the SQL level you'd have to > use SET TRANSACTION SNAPSHOT as you show. But I think it should really be > usable from the replication protocol too - and should try to keep the state > as close to that of a normal decoding session as possible. We'd at least > need a new walsender protocol command equivalent that took the snapshot > identifier, relation info and the other decoding params instead of a slot > name. Or, ideally, a variant on START_REPLICATION ... LOGICAL ... that > omits SLOT and instead takes TABLES as an argument, with a list of > relation(s) to sync. Unlike normal START_REPLICATION ... LOGICAL ... it'd > return to walsender protocol mode on completion, like the phys rep protocol > does when it's time for a timeline switch. > I've had similar thoughts. Another consideration is that we might introduce modes for acquiring the slot: Exclusive and Shared access (can be implemented with LWLocks?), so that peek_changes() and stream_relation() could acquire the slot in Shared access mode, thus allowing parallel queries, while START_REPLICATION and get_changes() would require Exclusive access. Rather than lie to the insert callback I'd really rather define a new > logical decoding callback for copying initial records. It doesn't get any > xact info (since it's not useful/relevant) or a full reorder buffer. No > ReorderBufferChange is passed; instead we pass something like a > ReorderBufferSync that contains the new tuple ReorderBufferTupleBuf, origin > id, origin lsn and commit timestamp (if known) and the RelFileNode > affected. The LogicalDecodingContext that's set up for the callback gets > ctx->reorder = NULL . There's no ReorderBufferTxn argument and none is > defined. > > Since it's a new callback the plugin knows the rules, knows it's getting > initial state data to sync over, etc. It doesn't have to try to guess if > it's seeing a real insert and act differently with respect to xact identity > etc. > > Obviously that's 9.6 material at the soonest, and only 9.6 if it could be > done ... well, right about now. So that won't meet your immediate needs, > but I think the same is true of the interface you propose above. > That can be a good approach going forward, yes. What I suggest doing in the mean time is specifying a new callback function > interface for tuple copies as described above, to be implemented by logical > decoding modules that support this extension. In each decoding plugin we > then define a SQL-callable function with 'internal' return type that > returns a pointer to the callback so you can obtain the hook function > address via a fmgr call via pg_proc. The callback would expect a state much > like I describe above and we'd use a SQL-callable function like what you > outlined to set up a fake logical decoding state for it, complete with > decoding context etc. Probably copying & pasting a moderately painful > amount of the logical decoding guts into an ext in the process :( since I > don't think you can easily set up much of the decoding state using the > decoding backend code without having a slot to use. Still, that'd let us > prototype this and prove the idea for inclusion in 9.7 (?) in-core while > retaining the capability via an extension for earlier versions. > > You'd have to do much of the same hoop jumping to call an arbitrary output > plugin's insert callback directly, if not more. > > Alternately, you could just use COPY ;) > Thanks for the thoughtful reply! I'm going to experiment with my toy code a bit more, while keeping in mind what could a more workable approach look like. -- Alex