Replication work via Copy API, so for stop replication walcender.c wait CopyDone. My communication seems like this
FE=> StartReplication(query: START_REPLICATION SLOT pgjdbc_logical_replication_slot LOGICAL 0/18FCFD0 ("include-xids" 'false', "skip-empty-xacts" 'true')) FE=> Query(CopyStart) <=BE CopyBothResponse FE=> StandbyStatusUpdate(received: 0/18FCFD0, flushed: 0/0, applied: 0/0, clock: Tue May 03 22:13:14 MSK 2016) FE=> CopyData(34) <=BE CopyData <=BE CopyData <=BE Keepalive(lastServerWal: 0/18FCFD0, clock: Tue May 03 22:13:14 MSK 2016 needReply: false) <=BE XLogData(currWal: 0/0, lastServerWal: 0/0, clock: 0) <=BE CopyData <=BE XLogData(currWal: 0/18FD0A0, lastServerWal: 0/18FD0A0, clock: 0) <=BE CopyData <=BE XLogData(currWal: 0/18FD1B0, lastServerWal: 0/18FD1B0, clock: 0) FE=> StopReplication <=BE CopyData FE=> CopyDone <=BE CopyDone <=BE CopyData ... after few seconds <=BE CommandStatus(COPY 0) <=BE ReadyForQuery(I) The main idea that i want work with same connection without close it. 2016-05-06 19:45 GMT+03:00 Oleksandr Shulgin <oleksandr.shul...@zalando.de>: > On Fri, May 6, 2016 at 5:23 PM, Vladimir Gordiychuk <fol...@gmail.com> > wrote: > >> Hi all, >> >> During implementing logical replication protocol for pgjdbc >> https://github.com/pgjdbc/pgjdbc/pull/550 I faced with strange behavior >> of the *walcender.c*: >> >> 1. When WAL consumer catchup master and change his state to >> streaming, not available normally complete replication by send CopyDone >> message until will not generate/create new WAL record. It occurs because >> logical decoding located in *WalSndWaitForWal* until will not >> available next WAL record, and it method receive message from consumer >> even >> reply on CopyDone with CopyDone but ignore exit from loop and we can wait >> many times waiting CommandStatus & ReadyForQuery packages on consumer. >> 2. Logical decoding ignore message from consumer during decoding and >> writing transaction in socket(*WalSndWriteData*). It affect long >> transactions with many changes. Because for example if we start decoding >> transaction that insert 1 million records and after consume 1% of it date >> we >> decide stop replication, it will be not available until whole million >> record will not send to consumer. >> >> How exactly are you stopping the replication? If you just stop reading > you'll likely to hit some problems, but what if you also close the > connection? I don't think there is any other supported way to do it. > > I was working last year on adding support for replication protocol to the > Python driver: https://github.com/psycopg/psycopg2/pull/322 It would be > nice if you could skim through this implementation, since it didn't receive > a whole lot of review. > > Cheers. > -- > Alex > >