Hi,

I created a proof of concecpt patch for postgresql JDBC driver that allows the 
caller to do pipelining of requests within a transaction. The pipelining here 
means same as for HTTP: the client can send the next execution already before 
waiting for the response of the previous request to be fully processed.

The goal is to reduce the effects of latency between server and client. The 
pipelining allowed my test with localhost postgresql and jdbc test that queries 
a single value from database 200 times to get a more than 20% speed-up. The 
pipelining version processes the responses every 10 queries. With actual 
latency between the server and client larger speed-ups are of course possible.

I think pipelining + jsonb support would make postgresql even more competive 
key/value and document store.

Example use case:
1) insert to shopping cart table, and 3 inserts shopping cart rows table in one 
pipeline.
  - only one round trip penalty instead of 4
2) query shopping cart row and query shopping cart rows in one pipeline 
  - only one round trip penalty instead of 2

The only alternative way to reduce the round-trip latency is to make every 
operation in single round-trip and that can only be done with pl functions and 
by passing in more complex objects, for example the whole shopping cart with 
rows as json data.

What kind of problems could pipelining cause (assuming we limit it to rather 
simple use cases only)?

-Mikko
From c662b8865c58cba714655148401ac86a21c10f3c Mon Sep 17 00:00:00 2001
From: Mikko Tiihonen <mikko.tiiho...@nitorcreations.com>
Date: Sat, 1 Nov 2014 15:43:19 +0200
Subject: [PATCH] Example pipelined single-shot query

---
 org/postgresql/core/QueryExecutor.java             |  13 +++
 org/postgresql/core/v2/QueryExecutorImpl.java      |   5 +
 org/postgresql/core/v3/QueryExecutorImpl.java      |  41 +++++++
 org/postgresql/jdbc2/AbstractJdbc2Statement.java   |  79 +++++++++++++
 .../test/jdbc2/PipelineExecutionTest.java          | 129 +++++++++++++++++++++
 5 files changed, 267 insertions(+)
 create mode 100644 org/postgresql/test/jdbc2/PipelineExecutionTest.java

diff --git a/org/postgresql/core/QueryExecutor.java b/org/postgresql/core/QueryExecutor.java
index e80a23c..b8e46a6 100644
--- a/org/postgresql/core/QueryExecutor.java
+++ b/org/postgresql/core/QueryExecutor.java
@@ -8,6 +8,7 @@
 */
 package org.postgresql.core;
 
+import java.io.IOException;
 import java.sql.SQLException;
 
 import org.postgresql.copy.CopyOperation;
@@ -101,6 +102,16 @@ public interface QueryExecutor {
     static int QUERY_NO_BINARY_TRANSFER = 256;
 
     /**
+     * Flag for pipeline executions with responses read later.
+     */
+    static int QUERY_PIPELINE = 512;
+
+    /**
+     * Flag for pipeline executions with responses read later.
+     */
+    static int QUERY_DEQUEUE_PIPELINE = 1024;
+
+    /**
      * Execute a Query, passing results to a provided ResultHandler.
      *
      * @param query the query to execute; must be a query returned from
@@ -125,6 +136,8 @@ public interface QueryExecutor {
                  int flags)
     throws SQLException;
 
+    void processPipelinedResult(ResultHandler handler) throws SQLException;
+
     /**
      * Execute several Query, passing results to a provided ResultHandler.
      *
diff --git a/org/postgresql/core/v2/QueryExecutorImpl.java b/org/postgresql/core/v2/QueryExecutorImpl.java
index 33c0048..5a6f607 100644
--- a/org/postgresql/core/v2/QueryExecutorImpl.java
+++ b/org/postgresql/core/v2/QueryExecutorImpl.java
@@ -616,4 +616,9 @@ public class QueryExecutorImpl implements QueryExecutor {
     public CopyOperation startCopy(String sql, boolean suppressBegin) throws SQLException {
         throw new PSQLException(GT.tr("Copy not implemented for protocol version 2"), PSQLState.NOT_IMPLEMENTED);
     }
+
+    @Override
+    public void processPipelinedResult(ResultHandler handler) throws SQLException {
+        throw new PSQLException(GT.tr("Copy not implemented for protocol version 2"), PSQLState.NOT_IMPLEMENTED);
+    }
 }
diff --git a/org/postgresql/core/v3/QueryExecutorImpl.java b/org/postgresql/core/v3/QueryExecutorImpl.java
index 966a6f6..7297764 100644
--- a/org/postgresql/core/v3/QueryExecutorImpl.java
+++ b/org/postgresql/core/v3/QueryExecutorImpl.java
@@ -11,6 +11,7 @@ package org.postgresql.core.v3;
 import org.postgresql.core.*;
 
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Properties;
@@ -1713,7 +1714,33 @@ public class QueryExecutorImpl implements QueryExecutor {
         }
     }
 
+    public void processPipelinedResult(ResultHandler handler) throws SQLException {
+        ResultHandlerHolder holder;
+        while ((holder = pipelineResultHandlers.remove(0)) != null) {
+            try {
+                processResults(holder.handler, holder.flags & (~QUERY_PIPELINE) | QUERY_DEQUEUE_PIPELINE);
+            } catch (IOException e) {
+                protoConnection.close();
+                handler.handleError(new PSQLException(GT.tr("An I/O error occurred while sending to the backend."), PSQLState.CONNECTION_FAILURE, e));
+            }
+            holder.handler.handleCompletion();
+
+            if (holder.handler == handler) {
+                return;
+            }
+        }
+    }
+
     protected void processResults(ResultHandler handler, int flags) throws IOException {
+        if ((flags & QUERY_PIPELINE) != 0) {
+            pipelineResultHandlers.add(new ResultHandlerHolder(handler, flags));
+            return;
+        } else {
+            if (!pipelineResultHandlers.isEmpty() && (flags & QUERY_DEQUEUE_PIPELINE) == 0) {
+                handler.handleError(new PSQLException(GT.tr("Pipelining still in progress pending."), PSQLState.TRANSACTION_STATE_INVALID));
+                return;
+            }
+        }
         boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
         boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;
 
@@ -2019,6 +2046,10 @@ public class QueryExecutorImpl implements QueryExecutor {
                     failedQuery.unprepare();
                 }
 
+                if ((flags & QUERY_DEQUEUE_PIPELINE) != 0 || parseIndex < pendingExecuteQueue.size()) {
+                    break;
+                }
+
                 pendingParseQueue.clear();              // No more ParseComplete messages expected.
                 pendingDescribeStatementQueue.clear();  // No more ParameterDescription messages expected.
                 pendingDescribePortalQueue.clear();     // No more RowDescription messages expected.
@@ -2282,6 +2313,16 @@ public class QueryExecutorImpl implements QueryExecutor {
     private final ArrayList pendingExecuteQueue = new ArrayList(); // list of {SimpleQuery,Portal} object arrays
     private final ArrayList pendingDescribeStatementQueue = new ArrayList(); // list of {SimpleQuery, SimpleParameterList, Boolean} object arrays
     private final ArrayList pendingDescribePortalQueue = new ArrayList(); // list of SimpleQuery
+    private final List<ResultHandlerHolder> pipelineResultHandlers = new LinkedList<ResultHandlerHolder>();
+
+    static class ResultHandlerHolder {
+        final ResultHandler handler;
+        final int flags;
+        public ResultHandlerHolder(ResultHandler handler, int flags) {
+            this.handler = handler;
+            this.flags = flags;
+        }
+    }
 
     private long nextUniqueID = 1;
     private final ProtocolConnectionImpl protoConnection;
diff --git a/org/postgresql/jdbc2/AbstractJdbc2Statement.java b/org/postgresql/jdbc2/AbstractJdbc2Statement.java
index b50aa7a..7260d1e 100644
--- a/org/postgresql/jdbc2/AbstractJdbc2Statement.java
+++ b/org/postgresql/jdbc2/AbstractJdbc2Statement.java
@@ -20,6 +20,10 @@ import java.util.Map;
 import java.util.TimerTask;
 import java.util.TimeZone;
 import java.util.Calendar;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.postgresql.Driver;
 import org.postgresql.largeobject.*;
@@ -33,6 +37,8 @@ import org.postgresql.util.PSQLState;
 import org.postgresql.util.PGobject;
 import org.postgresql.util.GT;
 
+import com.sun.corba.se.spi.legacy.connection.GetEndPointInfoAgainException;
+
 /**
  * This class defines methods of the jdbc2 specification.
  * The real Statement class (for jdbc2) is org.postgresql.jdbc2.Jdbc2Statement
@@ -197,6 +203,50 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
         return false;
     }
 
+    class ResultSetFutureFuture implements Future<ResultSet> {
+        private StatementResultHandler handler;
+
+        public ResultSetFutureFuture(StatementResultHandler handler) {
+            this.handler = handler;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return false;
+        }
+
+        @Override
+        public boolean isDone() {
+            return false;
+        }
+
+        @Override
+        public ResultSet get() throws InterruptedException, ExecutionException {
+            try {
+                connection.getQueryExecutor().processPipelinedResult(handler);
+            } catch (SQLException e) {
+                throw new ExecutionException(e);
+            }
+            handleResults(handler);
+
+            if (result.getNext() != null)
+                throw new ExecutionException(new PSQLException(GT.tr("Multiple ResultSets were returned by the query."),
+                                        PSQLState.TOO_MANY_RESULTS));
+
+            return result.getResultSet();
+        }
+
+        @Override
+        public ResultSet get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return get();
+        }
+    }
+
     //
     // ResultHandler implementations for updates, queries, and either-or.
     //
@@ -294,6 +344,26 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
     }
 
     /*
+     * Execute a SQL statement that returns a single ResultSet
+     *
+     * @param sql typically a static SQL SELECT statement
+     * @return a ResulSet that contains the data produced by the query
+     * @exception SQLException if a database access error occurs
+     */
+    public Future<java.sql.ResultSet> pipelineQuery(String p_sql) throws SQLException
+    {
+        if (preparedQuery != null)
+            throw new PSQLException(GT.tr("Can''t use query methods that take a query string on a PreparedStatement."),
+                                    PSQLState.WRONG_OBJECT_TYPE);
+
+        checkClosed();
+        p_sql = replaceProcessing(p_sql);
+        Query simpleQuery = connection.getQueryExecutor().createSimpleQuery(p_sql);
+        StatementResultHandler handler = asyncExecute(simpleQuery, null, QueryExecutor.QUERY_ONESHOT | QueryExecutor.QUERY_PIPELINE);
+        return new ResultSetFutureFuture(handler);
+    }
+
+    /*
      * A Prepared SQL query is executed and its ResultSet is returned
      *
      * @return a ResultSet that contains the data produced by the
@@ -514,6 +584,11 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
     }
 
     protected void execute(Query queryToExecute, ParameterList queryParameters, int flags) throws SQLException {
+        StatementResultHandler handler = asyncExecute(queryToExecute, queryParameters, flags);
+        handleResults(handler);
+    }
+
+    protected StatementResultHandler asyncExecute(Query queryToExecute, ParameterList queryParameters, int flags) throws SQLException {
         closeForNextExecution();
 
         // Enable cursor-based resultset if possible.
@@ -573,6 +648,10 @@ public abstract class AbstractJdbc2Statement implements BaseStatement
         {
             killTimer();
         }
+        return handler;
+    }
+
+    protected void handleResults(StatementResultHandler handler) {
         result = firstUnclosedResult = handler.getResults();
 
         if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways)
diff --git a/org/postgresql/test/jdbc2/PipelineExecutionTest.java b/org/postgresql/test/jdbc2/PipelineExecutionTest.java
new file mode 100644
index 0000000..8d251b3
--- /dev/null
+++ b/org/postgresql/test/jdbc2/PipelineExecutionTest.java
@@ -0,0 +1,129 @@
+/*-------------------------------------------------------------------------
+ *
+ * Copyright (c) 2004-2014, PostgreSQL Global Development Group
+ *
+ *
+ *-------------------------------------------------------------------------
+ */
+package org.postgresql.test.jdbc2;
+
+import static java.lang.System.currentTimeMillis;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import junit.framework.TestCase;
+
+import org.postgresql.jdbc2.AbstractJdbc2Statement;
+import org.postgresql.test.TestUtil;
+
+/*
+ * TestCase to test the pipelining of queries.
+ */
+public class PipelineExecutionTest extends TestCase {
+    private static final int ITERATIONS = 50;
+    private static final int COUNT = 200;
+
+    private Connection con;
+
+    /*
+     * Constructor
+     */
+    public PipelineExecutionTest(String name) {
+        super(name);
+    }
+
+    // Set up the fixture for this testcase: the tables for this test.
+    protected void setUp() throws Exception {
+        con = TestUtil.openDB();
+        TestUtil.createTable(con, "test_p", "key int4, val text");
+        Statement st = con.createStatement();
+        for (int i = 0; i < COUNT; ++i) {
+            st.executeUpdate("insert into test_p (key, val) values (" + i + ", 'text value " + i + "')");
+        }
+        TestUtil.closeDB(con);
+    }
+
+    // Tear down the fixture for this test case.
+    protected void tearDown() throws Exception {
+        TestUtil.closeDB(con);
+        con = TestUtil.openDB();
+        TestUtil.dropTable(con, "test_p");
+        TestUtil.closeDB(con);
+    }
+
+    public void testNonPipelinedExecuteQuery() throws Exception {
+        con = TestUtil.openDB();
+        long min = Long.MAX_VALUE;
+        for (int n=0; n<50; ++n) {
+            long start = currentTimeMillis();
+            Statement stat = con.createStatement();
+            for (int i = 0; i < COUNT; ++i) {
+                ResultSet rs = stat.executeQuery("select val from test_p where key = " + i);
+                rs.next();
+                assertEquals("text value " + i, rs.getString(1));
+                rs.close();
+            }
+            stat.close();
+            long stop = currentTimeMillis();
+            min = Math.min(min, stop-start);
+            Thread.sleep(50);
+        }
+        
+        System.out.printf("Normal min: %dms%n", min);
+    }
+
+    public void testPipelinedExecuteQuery() throws Exception {
+        con = TestUtil.openDB();
+
+        long min = Long.MAX_VALUE;
+        for (int n=0; n<ITERATIONS; ++n) {
+            long start = currentTimeMillis();
+            AbstractJdbc2Statement stat = (AbstractJdbc2Statement) con.createStatement();
+            Deque<Future<ResultSet>> results = new ArrayDeque<Future<ResultSet>>();
+
+            int count = 0;
+            for (int i = 0; i < COUNT; ++i) {
+                results.add(stat.pipelineQuery("select val from test_p where key = " + i));
+                if (results.size() >= 10) {
+                    count = processFutureResults(results, count);
+                }
+            }
+
+            // proceess any left-over results
+            processFutureResults(results, count);
+            long stop = currentTimeMillis();
+            min = Math.min(min, stop-start);
+            Thread.sleep(50);
+        }
+
+        System.out.printf("Pipeline min: %dms%n", min);
+        
+        // verify that sync query works after pipelined requests
+        Statement stat = con.createStatement();
+        ResultSet rs = stat.executeQuery("select val from test_p where key = 0");
+        rs.next();
+        assertEquals("text value 0", rs.getString(1));
+        rs.close();
+
+        stat.close();
+    }
+
+    private int processFutureResults(Deque<Future<ResultSet>> results, int count) throws InterruptedException,
+            ExecutionException, SQLException {
+        Future<ResultSet> future;
+        while ((future = results.pollFirst()) != null) {
+            ResultSet rs = future.get();
+            rs.next();
+            assertEquals("text value " + (count++), rs.getString(1));
+            rs.close();
+        }
+        return count;
+    }
+}
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to