This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit c2bd30a1b3b49ccc7770ec5ab4adeb0b75f40240
Author: Fucun Chu <[email protected]>
AuthorDate: Tue Aug 31 09:50:44 2021 +0800

    IMPALA-5741: Initial support for reading tiny RDBMS tables
    
    This patch uses the "external data source" mechanism in Impala to
    implement data source for querying JDBC.
    It has some limitations due to the restrictions of "external data
    source":
      - It is not distributed, e.g, fragment is unpartitioned. The queries
        are executed on coordinator.
      - Queries which read following data types from external JDBC tables
        are not supported:
        BINARY, CHAR, DATETIME, and COMPLEX.
      - Only support binary predicates with operators =, !=, <=, >=,
        <, > to be pushed to RDBMS.
      - Following data types are not supported for predicates:
        DECIMAL, TIMESTAMP, DATE, and BINARY.
      - External tables with complex types of columns are not supported.
      - Support is limited to the following databases:
        MySQL, Postgres, Oracle, MSSQL, H2, DB2, and JETHRO_DATA.
      - Catalog V2 is not supported (IMPALA-7131).
      - DataSource objects are not persistent (IMPALA-12375).
    
    Additional fixes are planned on top of this patch.
    
    Source files under jdbc/conf, jdbc/dao and jdbc/exception are
    replicated from Hive JDBC Storage Handler.
    
    In order to query the RDBMS tables, the following steps should be
    followed (note that existing data source table will be rebuilt):
    1. Make sure the Impala cluster has been started.
    
    2. Copy the jar files of JDBC drivers and the data source library into
    HDFS.
    ${IMPALA_HOME}/testdata/bin/copy-ext-data-sources.sh
    
    3. Create an `alltypes` table in the Postgres database.
    ${IMPALA_HOME}/testdata/bin/load-ext-data-sources.sh
    
    4. Create data source tables (alltypes_jdbc_datasource and
    alltypes_jdbc_datasource_2).
    ${IMPALA_HOME}/bin/impala-shell.sh -f\
      ${IMPALA_HOME}/testdata/bin/create-ext-data-source-table.sql
    
    5. It's ready to run query to access data source tables created
    in last step. Don't need to restart Impala cluster.
    
    Testing:
     - Added unit-test for Postgres and ran unit-test with JDBC driver
       postgresql-42.5.1.jar.
     - Ran manual unit-test for MySql with JDBC driver
       mysql-connector-j-8.1.0.jar.
     - Ran core tests successfully.
    
    Change-Id: I8244e978c7717c6f1452f66f1630b6441392e7d2
    Reviewed-on: http://gerrit.cloudera.org:8080/17842
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Reviewed-by: Kurt Deschler <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 bin/rat_exclude_files.txt                          |   1 +
 .../extdatasource/ExternalDataSourceExecutor.java  |  27 ++
 .../org/apache/impala/service/FrontendTest.java    |   6 +-
 java/ext-data-source/jdbc/pom.xml                  |  84 +++++
 .../impala/extdatasource/jdbc/JdbcDataSource.java  | 338 +++++++++++++++++++++
 .../org/apache/impala/extdatasource/jdbc/README.md |  60 ++++
 .../extdatasource/jdbc/conf/DatabaseType.java      |  28 ++
 .../extdatasource/jdbc/conf/JdbcStorageConfig.java |  68 +++++
 .../jdbc/conf/JdbcStorageConfigManager.java        |  98 ++++++
 .../jdbc/dao/DB2DatabaseAccessor.java              |  45 +++
 .../extdatasource/jdbc/dao/DatabaseAccessor.java   |  33 ++
 .../jdbc/dao/DatabaseAccessorFactory.java          |  75 +++++
 .../jdbc/dao/GenericJdbcDatabaseAccessor.java      | 299 ++++++++++++++++++
 .../extdatasource/jdbc/dao/JdbcRecordIterator.java | 188 ++++++++++++
 .../jdbc/dao/JethroDatabaseAccessor.java           |  42 +++
 .../jdbc/dao/MsSqlDatabaseAccessor.java            |  47 +++
 .../jdbc/dao/MySqlDatabaseAccessor.java            |  49 +++
 .../jdbc/dao/OracleDatabaseAccessor.java           |  55 ++++
 .../jdbc/dao/PostgresDatabaseAccessor.java         |  45 +++
 .../exception/JdbcDatabaseAccessException.java     |  39 +++
 .../jdbc/util/QueryConditionUtil.java              | 103 +++++++
 .../extdatasource/jdbc/JdbcDataSourceTest.java     | 253 +++++++++++++++
 .../jdbc/src/test/resources/log4j.properties       |  22 +-
 .../jdbc/src/test/resources/test_script.sql        |  47 +++
 java/ext-data-source/pom.xml                       |   1 +
 testdata/bin/copy-ext-data-sources.sh              |  56 ++++
 testdata/bin/create-data-source-table.sql          |  47 ---
 testdata/bin/create-ext-data-source-table.sql      |  98 ++++++
 testdata/bin/create-load-data.sh                   |   6 +-
 testdata/bin/load-ext-data-sources.sh              |  83 +++++
 .../queries/QueryTest/jdbc-data-source.test        | 105 +++++++
 tests/query_test/test_ext_data_sources.py          |  84 +++++
 tests/query_test/test_queries.py                   |   4 -
 33 files changed, 2469 insertions(+), 67 deletions(-)

diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt
index fa4c7f0f1..c00efa683 100644
--- a/bin/rat_exclude_files.txt
+++ b/bin/rat_exclude_files.txt
@@ -136,6 +136,7 @@ fe/src/test/resources/hbase-jaas-client.conf.template
 fe/src/test/resources/hbase-jaas-server.conf.template
 fe/src/test/resources/users.ldif
 java/.mvn/maven.config
+java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md
 java/toolchains.xml.tmpl
 testdata/AllTypesError/*.txt
 testdata/AllTypesErrorNoNulls/*.txt
diff --git 
a/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java
 
b/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java
index cd0644b7d..ff041314c 100644
--- 
a/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java
+++ 
b/fe/src/main/java/org/apache/impala/extdatasource/ExternalDataSourceExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.impala.extdatasource;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -83,6 +84,9 @@ public class ExternalDataSourceExecutor {
   // Protects cachedClasses_, numClassCacheHits_, and numClassCacheMisses_.
   private final static Object cachedClassesLock_ = new Object();
 
+  // setup by ctor() and cleared by release()
+  private URLClassLoader classLoader_;
+
   private final ApiVersion apiVersion_;
   private final ExternalDataSource dataSource_;
   private final String jarPath_;
@@ -156,6 +160,8 @@ public class ExternalDataSourceExecutor {
         // Only cache the class if the init string starts with 
CACHE_CLASS_PREFIX
         if (initString_ != null && initString_.startsWith(CACHE_CLASS_PREFIX)) 
{
           cachedClasses_.put(cacheMapKey, c);
+        } else {
+          classLoader_ = loader;
         }
         if (LOG.isTraceEnabled()) {
           LOG.trace("Loaded jar for class {} at path {}", className_, 
jarPath_);
@@ -168,6 +174,27 @@ public class ExternalDataSourceExecutor {
     return c;
   }
 
+  @Override
+  protected void finalize() throws Throwable {
+    release();
+    super.finalize();
+  }
+
+  /**
+   * Release the class loader we have created if the class is not cached.
+   */
+  public void release() {
+    if (classLoader_ != null) {
+      try {
+        classLoader_.close();
+      } catch (IOException e) {
+        // Log and ignore.
+        LOG.warn("Error closing the URLClassloader.", e);
+      }
+      classLoader_ = null;
+    }
+  }
+
   public byte[] prepare(byte[] thriftParams) throws ImpalaException {
     TPrepareParams params = new TPrepareParams();
     JniUtil.deserializeThrift(protocolFactory_, params, thriftParams);
diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java 
b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
index 72c66b723..a62ba3ad1 100644
--- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java
+++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java
@@ -143,13 +143,17 @@ public class FrontendTest extends FrontendTestBase {
     // HiveServer2 GetTables has 5 columns.
     assertEquals(5, resp.schema.columns.size());
     assertEquals(5, resp.rows.get(0).colVals.size());
-    assertEquals(3, resp.rows.size());
+    assertEquals(5, resp.rows.size());
     assertEquals("alltypes_datasource",
         resp.rows.get(0).colVals.get(2).string_val.toLowerCase());
     assertEquals("alltypes_date_partition",
         resp.rows.get(1).colVals.get(2).string_val.toLowerCase());
     assertEquals("alltypes_date_partition_2",
         resp.rows.get(2).colVals.get(2).string_val.toLowerCase());
+    assertEquals("alltypes_jdbc_datasource",
+        resp.rows.get(3).colVals.get(2).string_val.toLowerCase());
+    assertEquals("alltypes_jdbc_datasource_2",
+        resp.rows.get(4).colVals.get(2).string_val.toLowerCase());
   }
 
   @Test
diff --git a/java/ext-data-source/jdbc/pom.xml 
b/java/ext-data-source/jdbc/pom.xml
new file mode 100644
index 000000000..fa5a9db8d
--- /dev/null
+++ b/java/ext-data-source/jdbc/pom.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.impala</groupId>
+    <artifactId>impala-data-source</artifactId>
+    <version>4.4.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>impala-data-source-jdbc</artifactId>
+  <name>Apache Impala External Data Source JDBC Library</name>
+  <description>JDBC External Data Source</description>
+  <packaging>jar</packaging>
+  <url>.</url>
+
+  <properties>
+    <commons-dbcp2.version>2.9.0</commons-dbcp2.version>
+    <h2database.version>1.3.166</h2database.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.impala</groupId>
+      <artifactId>impala-data-source-api</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.impala</groupId>
+      <artifactId>impala-frontend</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-dbcp2</artifactId>
+      <version>${commons-dbcp2.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <version>${h2database.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.11.0</version>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
new file mode 100644
index 000000000..d69ee9f50
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig;
+import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager;
+import org.apache.impala.extdatasource.jdbc.dao.DatabaseAccessor;
+import org.apache.impala.extdatasource.jdbc.dao.DatabaseAccessorFactory;
+import org.apache.impala.extdatasource.jdbc.dao.JdbcRecordIterator;
+import 
org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
+import org.apache.impala.extdatasource.jdbc.util.QueryConditionUtil;
+import org.apache.impala.extdatasource.thrift.TBinaryPredicate;
+import org.apache.impala.extdatasource.thrift.TCloseParams;
+import org.apache.impala.extdatasource.thrift.TCloseResult;
+import org.apache.impala.extdatasource.thrift.TColumnDesc;
+import org.apache.impala.extdatasource.thrift.TComparisonOp;
+import org.apache.impala.extdatasource.thrift.TGetNextParams;
+import org.apache.impala.extdatasource.thrift.TGetNextResult;
+import org.apache.impala.extdatasource.thrift.TOpenParams;
+import org.apache.impala.extdatasource.thrift.TOpenResult;
+import org.apache.impala.extdatasource.thrift.TPrepareParams;
+import org.apache.impala.extdatasource.thrift.TPrepareResult;
+import org.apache.impala.extdatasource.thrift.TRowBatch;
+import org.apache.impala.extdatasource.thrift.TTableSchema;
+import org.apache.impala.extdatasource.v1.ExternalDataSource;
+import org.apache.impala.thrift.TColumnData;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * The Jdbc data source returns the data of the underlying database configured 
by
+ * initString.
+ */
+public class JdbcDataSource implements ExternalDataSource {
+
+  private final static Logger LOG = 
LoggerFactory.getLogger(JdbcDataSource.class);
+
+  /**
+   * @see org.apache.impala.extdatasource.ExternalDataSourceExecutor
+   */
+  private final static String CACHE_CLASS_PREFIX = "CACHE_CLASS::";
+
+  private static final TStatus STATUS_OK =
+          new TStatus(TErrorCode.OK, Lists.newArrayList());
+
+  private boolean eos_;
+  private int batchSize_;
+  private TTableSchema schema_;
+  private DataSourceState state_;
+  // Handle to identify JdbcDataSource object.
+  // It's assigned with random UUID value in open() API, and returned to the 
caller.
+  // In getNext() and close() APIs, compare this value with the handle value 
passed in
+  // input parameters to make sure the object is valid.
+  private String scanHandle_;
+  // Set to true if initString started with "CACHE_CLASS::".
+  // It is passed to DatabaseAccessor::close() to indicate if dataSourceCache 
should be
+  // cleaned when DatabaseAccessor object is closed.
+  private boolean cacheClass_ = false;
+
+  // Properties of external jdbc table, converted from initString which is 
specified in
+  // create table statement.
+  // Supported properties are defined in JdbcStorageConfig.
+  private Configuration tableConfig_;
+  private DatabaseAccessor dbAccessor_ = null;
+  // iterator_ is used when schema_.getColsSize() does not equal 0.
+  private JdbcRecordIterator iterator_ = null;
+  // currRow_ and totalNumberOfRecords_ are used when schema_.getColsSize() 
equals 0.
+  private int currRow_;
+  private long totalNumberOfRecords_ = 0;
+
+  // Enumerates the states of the data source, which indicates which 
ExternalDataSource
+  // API has been called. The states are checked in each API to make sure that 
the APIs
+  // are called in right order, e.g. state transitions must be in the below 
order:
+  // CREATED -> OPENED -> CLOSED.
+  // Note that the ExternalDataSourceExecutors of frontend and backend will 
create
+  // separate JdbcDataSource objects so that the state of JdbcDataSource which 
is set
+  // by frontend will not be transferred to backend. The prepare() is called 
by frontend,
+  // open(), getNext() and close() are called by backend. We don't need to 
change state
+  // in prepare() since the state will not be transferred to other APIs, and 
the input
+  // state for open() must be 'CREATED'.
+  private enum DataSourceState {
+    // The object is created.
+    CREATED,
+    // The open() API is called.
+    OPENED,
+    // The close() API is called.
+    CLOSED
+  }
+
+  public JdbcDataSource() {
+    eos_ = false;
+    currRow_ = 0;
+    state_ = DataSourceState.CREATED;
+  }
+
+  @Override
+  public TPrepareResult prepare(TPrepareParams params) {
+    Preconditions.checkState(state_ == DataSourceState.CREATED);
+    if (!convertInitStringToConfiguration(params.getInit_string())) {
+      return new TPrepareResult(
+          new TStatus(TErrorCode.INTERNAL_ERROR,
+              Lists.newArrayList("Invalid init_string value")));
+    }
+    List<Integer> acceptedPredicates = 
acceptedPredicates(params.getPredicates());
+    return new TPrepareResult(STATUS_OK)
+            .setAccepted_conjuncts(acceptedPredicates);
+  }
+
+  @Override
+  public TOpenResult open(TOpenParams params) {
+    Preconditions.checkState(state_ == DataSourceState.CREATED);
+    state_ = DataSourceState.OPENED;
+    batchSize_ = params.getBatch_size();
+    schema_ = params.getRow_schema();
+    // 1. Check init string again because the call in prepare() was from
+    // the frontend and used a different instance of this JdbcDataSource class.
+    if (!convertInitStringToConfiguration(params.getInit_string())) {
+      return new TOpenResult(
+          new TStatus(TErrorCode.INTERNAL_ERROR,
+              Lists.newArrayList("Invalid init_string value")));
+    }
+    // 2. Build the query and execute it
+    try {
+      dbAccessor_ = DatabaseAccessorFactory.getAccessor(tableConfig_);
+      buildQueryAndExecute(params);
+    } catch (JdbcDatabaseAccessException e) {
+      return new TOpenResult(
+          new TStatus(TErrorCode.INTERNAL_ERROR, 
Lists.newArrayList(e.getMessage())));
+    }
+    scanHandle_ = UUID.randomUUID().toString();
+    return new TOpenResult(STATUS_OK).setScan_handle(scanHandle_);
+  }
+
+  @Override
+  public TGetNextResult getNext(TGetNextParams params) {
+    Preconditions.checkState(state_ == DataSourceState.OPENED);
+    Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_));
+    if (eos_) return new TGetNextResult(STATUS_OK).setEos(eos_);
+
+    List<TColumnData> cols = Lists.newArrayList();
+    long numRows = 0;
+    if (schema_.getColsSize() != 0) {
+      if (iterator_ == null) {
+        return new TGetNextResult(
+            new TStatus(TErrorCode.INTERNAL_ERROR,
+                Lists.newArrayList("Iterator of JDBC resultset is null")));
+      }
+      for (int i = 0; i < schema_.getColsSize(); ++i) {
+        cols.add(new TColumnData().setIs_null(Lists.newArrayList()));
+      }
+
+      boolean hasNext = true;
+      try {
+        while (numRows < batchSize_ && (hasNext = iterator_.hasNext())) {
+          iterator_.next(schema_.getCols(), cols);
+          ++numRows;
+        }
+      } catch (Exception e) {
+        hasNext = false;
+      }
+      if (!hasNext) eos_ = true;
+    } else { // for count(*)
+      if (currRow_ + batchSize_ <= totalNumberOfRecords_) {
+        numRows = batchSize_;
+      } else {
+        numRows = totalNumberOfRecords_ - currRow_;
+      }
+      currRow_ += numRows;
+      if (currRow_ == totalNumberOfRecords_) eos_ = true;
+    }
+    return new TGetNextResult(STATUS_OK).setEos(eos_)
+        .setRows(new TRowBatch().setCols(cols).setNum_rows(numRows));
+  }
+
+  @Override
+  public TCloseResult close(TCloseParams params) {
+    Preconditions.checkState(state_ == DataSourceState.OPENED);
+    Preconditions.checkArgument(params.getScan_handle().equals(scanHandle_));
+    try {
+      if (iterator_ != null) iterator_.close();
+      if (dbAccessor_ != null) dbAccessor_.close(!cacheClass_);
+      state_ = DataSourceState.CLOSED;
+      return new TCloseResult(STATUS_OK);
+    } catch (Exception e) {
+      return new TCloseResult(
+          new TStatus(TErrorCode.INTERNAL_ERROR, 
Lists.newArrayList(e.getMessage())));
+    }
+  }
+
+  protected boolean convertInitStringToConfiguration(String initString) {
+    Preconditions.checkState(initString != null);
+    if (tableConfig_ == null) {
+      try {
+        TypeReference<HashMap<String, String>> typeRef
+            = new TypeReference<HashMap<String, String>>() {
+        };
+        if (initString.startsWith(CACHE_CLASS_PREFIX)) {
+          initString = initString.substring(CACHE_CLASS_PREFIX.length());
+          cacheClass_ = true;
+        }
+        Map<String, String> config = new ObjectMapper().readValue(initString, 
typeRef);
+        tableConfig_ = 
JdbcStorageConfigManager.convertMapToConfiguration(config);
+      } catch (JsonProcessingException e) {
+        String errorMessage = String
+            .format("Invalid JSON from initString_ '%s'", initString);
+        LOG.error(errorMessage, e);
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private List<Integer> acceptedPredicates(List<List<TBinaryPredicate>> 
predicates) {
+    // Return the indexes of accepted predicates.
+    List<Integer> acceptedPredicates = Lists.newArrayList();
+    if (predicates == null || predicates.isEmpty()) {
+      return acceptedPredicates;
+    }
+    for (int i = 0; i < predicates.size(); ++i) {
+      boolean accepted = true;
+      for (TBinaryPredicate predicate : predicates.get(i)) {
+        // Don't support 'IS DISTINCT FROM' and 'IS NOT DISTINCT FROM' 
operators now.
+        if (predicate.getOp() == TComparisonOp.DISTINCT_FROM
+            || predicate.getOp() == TComparisonOp.NOT_DISTINCT) {
+          accepted = false;
+          break;
+        }
+      }
+      if (accepted) acceptedPredicates.add(i);
+    }
+    return acceptedPredicates;
+  }
+
+  private void buildQueryAndExecute(TOpenParams params)
+      throws JdbcDatabaseAccessException {
+    Map<String, String> columnMapping = getColumnMapping(tableConfig_
+        .get(JdbcStorageConfig.COLUMN_MAPPING.getPropertyName()));
+    // Build query statement
+    StringBuilder sb = new StringBuilder("SELECT ");
+    String project;
+    // If cols size equals to 0, it is 'select count(*) from tbl' statement.
+    if (schema_.getColsSize() == 0) {
+      project = "*";
+    } else {
+      project =
+          schema_.getCols().stream().map(
+              TColumnDesc::getName).map(
+              name -> columnMapping.getOrDefault(name, name))
+              .collect(Collectors.joining(", "));
+    }
+    sb.append(project);
+    sb.append(" FROM ");
+    // Make jdbc table name to be quoted with double quotes if columnMapping 
is not empty
+    String jdbcTableName = 
tableConfig_.get(JdbcStorageConfig.TABLE.getPropertyName());
+    if (!columnMapping.isEmpty() && jdbcTableName.charAt(0) != '\"') {
+      StringBuilder sb2 = new StringBuilder("\"");
+      sb2.append(jdbcTableName);
+      sb2.append("\"");
+      jdbcTableName = sb2.toString();
+    }
+    sb.append(jdbcTableName);
+    String condition = QueryConditionUtil
+        .buildCondition(params.getPredicates(), columnMapping);
+    if (StringUtils.isNotBlank(condition)) {
+      sb.append(" WHERE ").append(condition);
+    }
+    // Execute query and get iterator
+    tableConfig_.set(JdbcStorageConfig.QUERY.getPropertyName(), sb.toString());
+
+    if (schema_.getColsSize() != 0) {
+      int limit = -1;
+      if (params.isSetLimit()) limit = (int) params.getLimit();
+      iterator_ = dbAccessor_.getRecordIterator(tableConfig_, limit, 0);
+    } else {
+      totalNumberOfRecords_ = 
dbAccessor_.getTotalNumberOfRecords(tableConfig_);
+    }
+  }
+
+  /*
+   * Return Impala-to-X column mapping, or empty if it is not set.
+   *
+   */
+  private Map<String, String> getColumnMapping(String columnMapping) {
+    if ((columnMapping == null) || (columnMapping.trim().isEmpty())) {
+      return Maps.newHashMap();
+    }
+
+    Map<String, String> columnMap = Maps.newHashMap();
+    String[] mappingPairs = columnMapping.split(",");
+    for (String mapPair : mappingPairs) {
+      String[] columns = mapPair.split("=");
+      // Make jdbc column name to be quoted with double quotes
+      String jdbcColumnName = columns[1].trim();
+      if (!jdbcColumnName.isEmpty() && jdbcColumnName.charAt(0) != '\"') {
+        StringBuilder sb = new StringBuilder("\"");
+        sb.append(jdbcColumnName);
+        sb.append("\"");
+        jdbcColumnName = sb.toString();
+      }
+      columnMap.put(columns[0].trim(), jdbcColumnName);
+    }
+
+    return columnMap;
+  }
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md
new file mode 100644
index 000000000..9bf02f7ea
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/README.md
@@ -0,0 +1,60 @@
+This JDBC External Data Source library is implemented with "External Data 
Source" mechanism
+to query the JDBC table from Impala server.
+
+Following two source files consists Impala specific logic:
+    JdbcDataSource.java
+    util/QueryConditionUtil.java
+
+Other source files, which add supports to access external database tables 
through JDBC
+drivers, are replicated from Hive JDBC Storage Handler with some modifications:
+(https://github.com/apache/hive/tree/master/jdbc-handler/src/main/java/org/apache/hive/storage/jdbc)
+    conf/DatabaseType.java
+        remove dbType for HIVE, DERBY, and METASTORE.
+    conf/JdbcStorageConfig.java
+        don't use org.apache.hadoop.hive.conf.Constants
+    conf/JdbcStorageConfigManager.java
+        add functions: convertMapToConfiguration(), getQueryToExecute(),
+            getOrigQueryToExecute()
+        remove functions: copyConfigurationToJob(), countNonNull(),
+            getPasswordFromProperties(), copySecretsToJob(),
+            convertPropertiesToConfiguration(), resolveMetadata(), 
getMetastoreDatabaseType(),
+            getMetastoreConnectionURL(), getMetastoreDriver(), 
getMetastoreJdbcUser(),
+            getMetastoreJdbcPasswd().
+        modify functions: checkRequiredPropertiesAreDefined()
+    dao/DB2DatabaseAccessor.java
+        remove function constructQuery()
+    dao/DatabaseAccessor.java
+        remoce following APIs:
+            getColumnNames(), getColumnTypes(), getRecordWriter(), getBounds(),
+            needColumnQuote().
+        remove following parameters for API getRecordIterator():
+           'partitionColumn', 'lowerBound', and 'upperBound'.
+    dao/DatabaseAccessorFactory.java
+        remove dbType for HIVE, DERBY, and METASTORE.
+    dao/GenericJdbcDatabaseAccessor.java
+        add member variable: dataSourceCache
+        remove member variable typeInfoTranslator
+        remove functions: getColumnMetadata(), getColumnMetadata(), 
getColumnNames()
+            getColumnTypes(), getColNamesFromRS(),  getColTypesFromRS(),
+            getMetaDataQuery(), getRecordWriter(), constructQuery(),
+            addBoundaryToQuery(), removeDbcpPrefix(), getFromProperties(), 
getBounds(),
+            quote(), needColumnQuote(), getQualifiedTableName(), 
selectAllFromTable(),
+            finalize().
+        remove following parameters for API getRecordIterator():
+           'partitionColumn', 'lowerBound', and 'upperBound'.
+        Modify functions: close().
+    dao/JdbcRecordIterator.java
+        remove member variable accessor, remove parameter 'accessor' from ctor,
+        remove functions: remove()
+        modify functions: next(), close()
+    dao/JethroDatabaseAccessor.java
+        remove getMetaDataQuery()
+    dao/MsSqlDatabaseAccessor.java
+    dao/MySqlDatabaseAccessor.java
+        remove function needColumnQuote()
+    dao/OracleDatabaseAccessor.java
+        remove function constructQuery()
+    dao/PostgresDatabaseAccessor.java
+    exception/JdbcDatabaseAccessException.java
+        renamed from exception/HiveJdbcDatabaseAccessException.java
+
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
new file mode 100644
index 000000000..9b30350bc
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/DatabaseType.java
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.conf;
+
+public enum DatabaseType {
+  MYSQL,
+  H2,
+  DB2,
+  ORACLE,
+  POSTGRES,
+  MSSQL,
+  JETHRO_DATA
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
new file mode 100644
index 000000000..0e1ac5ab3
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfig.java
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.conf;
+
+
+public enum JdbcStorageConfig {
+  // Table properties specified in the create table statement.
+  // The database from which the external table comes, such as MySQL, ORACLE, 
POSTGRES,
+  // and MSSQL, etc.
+  DATABASE_TYPE("database.type", true),
+  // JDBC connection string, including the database type, IP address, port 
number, and
+  // database name. For example, "jdbc:postgresql://127.0.0.1:5432/functional
+  JDBC_URL("jdbc.url", true),
+  // Class name of JDBC driver. For example, "org.postgresql.Driver"
+  JDBC_DRIVER_CLASS("jdbc.driver", true),
+  // Driver URL for downloading the Jar file package that is used to access 
the external
+  // database.
+  JDBC_DRIVER_URL("driver.url", true),
+  // Username for accessing the external database.
+  DBCP_USERNAME("dbcp.username", false),
+  // Password of the user.
+  DBCP_PASSWORD("dbcp.password", false),
+  // Number of rows to fetch in a batch.
+  JDBC_FETCH_SIZE("jdbc.fetch.size", false),
+  // SQL query which specify how to get data from external database.
+  // User need to specify either “table” or “query” in the create table 
statement.
+  QUERY("query", false),
+  // Name of the external table to be mapped in Impala.
+  TABLE("table", true),
+  // Mapping of column names between external table and Impala.
+  COLUMN_MAPPING("column.mapping", false);
+
+  private final String propertyName;
+  private boolean required = false;
+
+
+  JdbcStorageConfig(String propertyName, boolean required) {
+    this.propertyName = propertyName;
+    this.required = required;
+  }
+
+
+  public String getPropertyName() {
+    return propertyName;
+  }
+
+
+  public boolean isRequired() {
+    return required;
+  }
+
+}
+
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
new file mode 100644
index 000000000..e5e2e972f
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/conf/JdbcStorageConfigManager.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.conf;
+
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main configuration handler class
+ */
+public class JdbcStorageConfigManager {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(JdbcStorageConfigManager.class);
+
+  public static Configuration convertMapToConfiguration(Map<String, String> 
props) {
+    checkRequiredPropertiesAreDefined(props);
+    Configuration conf = new Configuration();
+
+    for (Entry<String, String> entry : props.entrySet()) {
+      conf.set(entry.getKey(), entry.getValue());
+    }
+
+    return conf;
+  }
+
+  private static void checkRequiredPropertiesAreDefined(Map<String, String> 
props) {
+
+    try {
+      String dbTypeName = 
props.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName());
+      DatabaseType.valueOf(dbTypeName);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Unknown database type.", e);
+    }
+    // Check the required parameters
+    for (JdbcStorageConfig config : JdbcStorageConfig.values()) {
+      if (config.isRequired() && !props.containsKey(config.getPropertyName())) 
{
+        throw new IllegalArgumentException(String.format("Required config '%s' 
was not "
+            + "present!", config.getPropertyName()));
+      }
+    }
+  }
+
+  public static String getConfigValue(JdbcStorageConfig key, Configuration 
config) {
+    return config.get(key.getPropertyName());
+  }
+
+  public static String getOrigQueryToExecute(Configuration config) {
+    String query;
+    String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
+    if (tableName != null) {
+      // We generate query as 'select * from tbl'
+      query = "select * from " + tableName;
+    } else {
+      query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
+    }
+
+    return query;
+  }
+
+  public static String getQueryToExecute(Configuration config) {
+    String query = config.get(JdbcStorageConfig.QUERY.getPropertyName());
+    if (query != null) {
+      // Query has been defined, return it
+      return query;
+    }
+
+    // We generate query as 'select * from tbl'
+    String tableName = config.get(JdbcStorageConfig.TABLE.getPropertyName());
+    query = "select * from " + tableName;
+
+    return query;
+  }
+
+  private static boolean isEmptyString(String value) {
+    return ((value == null) || (value.trim().isEmpty()));
+  }
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DB2DatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DB2DatabaseAccessor.java
new file mode 100644
index 000000000..d8766429f
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DB2DatabaseAccessor.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * DB2 specific data accessor. DB2 JDBC drivers works similar to Postgres, so 
the current
+ * implementation of DB2DatabaseAccessor is the same as 
PostgresDatabaseAccessor
+ */
+public class DB2DatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      if (limit == -1) {
+        return sql;
+      }
+      return sql + " LIMIT " + limit + " OFFSET " + offset;
+    }
+  }
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit == -1) {
+      return sql;
+    }
+    return sql + " LIMIT " + limit;
+  }
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java
new file mode 100644
index 000000000..597ae9984
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
+
+public interface DatabaseAccessor {
+
+  int getTotalNumberOfRecords(Configuration conf)
+      throws JdbcDatabaseAccessException;
+
+  JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int 
offset)
+      throws JdbcDatabaseAccessException;
+
+  void close(boolean cleanCache);
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
new file mode 100644
index 000000000..5415b5763
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessorFactory.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.impala.extdatasource.jdbc.conf.DatabaseType;
+import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig;
+
+/**
+ * Factory for creating the correct DatabaseAccessor class for the job
+ */
+public class DatabaseAccessorFactory {
+
+  private DatabaseAccessorFactory() {
+  }
+
+  public static DatabaseAccessor getAccessor(DatabaseType dbType) {
+    DatabaseAccessor accessor;
+    switch (dbType) {
+      case MYSQL:
+        accessor = new MySqlDatabaseAccessor();
+        break;
+
+      case JETHRO_DATA:
+        accessor = new JethroDatabaseAccessor();
+        break;
+
+      case POSTGRES:
+        accessor = new PostgresDatabaseAccessor();
+        break;
+
+      case ORACLE:
+        accessor = new OracleDatabaseAccessor();
+        break;
+
+      case MSSQL:
+        accessor = new MsSqlDatabaseAccessor();
+        break;
+
+      case DB2:
+        accessor = new DB2DatabaseAccessor();
+        break;
+
+      default:
+        accessor = new GenericJdbcDatabaseAccessor();
+        break;
+    }
+
+    return accessor;
+  }
+
+
+  public static DatabaseAccessor getAccessor(Configuration conf) {
+    DatabaseType dbType = DatabaseType.valueOf(
+        
conf.get(JdbcStorageConfig.DATABASE_TYPE.getPropertyName()).toUpperCase());
+    return getAccessor(dbType);
+  }
+
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
new file mode 100644
index 000000000..5f74a2a6e
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.sql.DataSource;
+
+import org.apache.commons.dbcp2.BasicDataSource;
+import org.apache.commons.dbcp2.BasicDataSourceFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfig;
+import org.apache.impala.extdatasource.jdbc.conf.JdbcStorageConfigManager;
+import 
org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TCacheJarResult;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+
+/**
+ * A data accessor that should in theory work with all JDBC compliant database 
drivers.
+ */
+public class GenericJdbcDatabaseAccessor implements DatabaseAccessor {
+
+  protected static final Logger LOG = LoggerFactory
+      .getLogger(GenericJdbcDatabaseAccessor.class);
+
+  protected static final String DBCP_CONFIG_PREFIX = "dbcp";
+  protected static final int DEFAULT_FETCH_SIZE = 1000;
+  protected static final int CACHE_EXPIRE_TIMEOUT_S = 1800;
+  protected static final int CACHE_SIZE = 100;
+
+  protected DataSource dbcpDataSource = null;
+  // Cache datasource for sharing
+  public static Cache<String, DataSource> dataSourceCache = CacheBuilder
+      .newBuilder()
+      .removalListener((RemovalListener<String, DataSource>) notification -> {
+        DataSource ds = notification.getValue();
+        if (ds instanceof BasicDataSource) {
+          BasicDataSource dbcpDs = (BasicDataSource) ds;
+          try {
+            dbcpDs.close();
+            LOG.info("Close datasource for '{}'.", notification.getKey());
+          } catch (SQLException e) {
+            LOG.warn("Caught exception during datasource cleanup.", e);
+          }
+        }
+      })
+      .expireAfterAccess(CACHE_EXPIRE_TIMEOUT_S, TimeUnit.SECONDS)
+      .maximumSize(CACHE_SIZE)
+      .build();
+
+  @Override
+  public int getTotalNumberOfRecords(Configuration conf)
+      throws JdbcDatabaseAccessException {
+    Connection conn = null;
+    PreparedStatement ps = null;
+    ResultSet rs = null;
+
+    try {
+      initializeDatabaseSource(conf);
+      String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+      // TODO: If a target database cannot flatten this view query, try to text
+      // replace the generated "select *".
+      String countQuery = "SELECT COUNT(*) FROM (" + sql + ") tmptable";
+      LOG.info("Query to execute is [{}]", countQuery);
+
+      conn = dbcpDataSource.getConnection();
+      ps = conn.prepareStatement(countQuery);
+      rs = ps.executeQuery();
+      if (rs.next()) {
+        return rs.getInt(1);
+      } else {
+        LOG.warn("The count query '{}' did not return any results.", 
countQuery);
+        throw new JdbcDatabaseAccessException(
+            "Count query did not return any results.");
+      }
+    } catch (JdbcDatabaseAccessException he) {
+      throw he;
+    } catch (Exception e) {
+      LOG.error("Caught exception while trying to get the number of records", 
e);
+      throw new JdbcDatabaseAccessException(e);
+    } finally {
+      cleanupResources(conn, ps, rs);
+    }
+  }
+
+
+  @Override
+  public JdbcRecordIterator getRecordIterator(Configuration conf, int limit, 
int offset)
+      throws JdbcDatabaseAccessException {
+
+    Connection conn = null;
+    PreparedStatement ps = null;
+    ResultSet rs = null;
+
+    try {
+      initializeDatabaseSource(conf);
+      String sql = JdbcStorageConfigManager.getQueryToExecute(conf);
+      String partitionQuery = addLimitAndOffsetToQuery(sql, limit, offset);
+
+      LOG.info("Query to execute is [{}]", partitionQuery);
+
+      conn = dbcpDataSource.getConnection();
+      ps = conn.prepareStatement(partitionQuery, ResultSet.TYPE_FORWARD_ONLY,
+          ResultSet.CONCUR_READ_ONLY);
+      ps.setFetchSize(getFetchSize(conf));
+      rs = ps.executeQuery();
+
+      return new JdbcRecordIterator(conn, ps, rs, conf);
+    } catch (Exception e) {
+      LOG.error("Caught exception while trying to execute query", e);
+      cleanupResources(conn, ps, rs);
+      throw new JdbcDatabaseAccessException(
+          "Caught exception while trying to execute query:" + e.getMessage(), 
e);
+    }
+  }
+
+
+  @Override
+  public void close(boolean cleanCache) {
+    dbcpDataSource = null;
+    if (cleanCache && dataSourceCache != null) {
+      dataSourceCache.invalidateAll();
+      dataSourceCache = null;
+    }
+  }
+
+  /**
+   * Uses generic JDBC escape functions to add a limit and offset clause to a 
query
+   * string
+   *
+   * @param sql
+   * @param limit
+   * @param offset
+   * @return
+   */
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else if (limit != -1) {
+      return sql + " {LIMIT " + limit + " OFFSET " + offset + "}";
+    } else {
+      return sql + " {OFFSET " + offset + "}";
+    }
+  }
+
+  /*
+   * Uses generic JDBC escape functions to add a limit clause to a query string
+   */
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit == -1) {
+      return sql;
+    }
+    return sql + " {LIMIT " + limit + "}";
+  }
+
+  protected void cleanupResources(Connection conn, PreparedStatement ps, 
ResultSet rs) {
+    try {
+      if (rs != null) {
+        rs.close();
+      }
+    } catch (SQLException e) {
+      LOG.warn("Caught exception during resultset cleanup.", e);
+    }
+
+    try {
+      if (ps != null) {
+        ps.close();
+      }
+    } catch (SQLException e) {
+      LOG.warn("Caught exception during statement cleanup.", e);
+    }
+
+    try {
+      if (conn != null) {
+        conn.close();
+      }
+    } catch (SQLException e) {
+      LOG.warn("Caught exception during connection cleanup.", e);
+    }
+  }
+
+  protected void initializeDatabaseSource(Configuration conf)
+      throws ExecutionException {
+    if (dbcpDataSource == null) {
+      synchronized (this) {
+        if (dbcpDataSource == null) {
+          Properties props = getConnectionPoolProperties(conf);
+          String jdbcUrl = props.getProperty("url");
+          String username = props.getProperty("username", "-");
+          String cacheMapKey = String.format("%s.%s", jdbcUrl, username);
+          dbcpDataSource = dataSourceCache.get(cacheMapKey,
+              () -> {
+                LOG.info("Datasource for '{}' was not cached. "
+                    + "Loading now.", cacheMapKey);
+                BasicDataSource basicDataSource =
+                    BasicDataSourceFactory.createDataSource(props);
+                // Put jdbc driver to cache
+                String driverUrl = props.getProperty("driverUrl");
+                TCacheJarResult cacheResult = FeSupport.CacheJar(driverUrl);
+                TStatus cacheJarStatus = cacheResult.getStatus();
+                if (cacheJarStatus.getStatus_code() != TErrorCode.OK) {
+                  throw new JdbcDatabaseAccessException(String.format(
+                      "Unable to cache jdbc driver jar at location '%s'. " +
+                      "Check that the file exists and is readable. Message: 
%s",
+                      driverUrl, cacheJarStatus.getError_msgs()));
+                }
+                String driverLocalPath = cacheResult.getLocal_path();
+                // Create class loader for jdbc driver and set it for the
+                // BasicDataSource object so that the driver class could be 
loaded
+                // from jar file without searching classpath.
+                URL driverJarUrl = new File(driverLocalPath).toURI().toURL();
+                URLClassLoader driverLoader =
+                    URLClassLoader.newInstance( new URL[] { driverJarUrl },
+                        getClass().getClassLoader());
+                basicDataSource.setDriverClassLoader(driverLoader);
+                return basicDataSource;
+              });
+        }
+      }
+    }
+  }
+
+  protected Properties getConnectionPoolProperties(Configuration conf) {
+    // Create the default properties object
+    Properties dbProperties = getDefaultDBCPProperties();
+
+    // user properties
+    Map<String, String> userProperties = conf.getValByRegex(DBCP_CONFIG_PREFIX 
+ "\\.*");
+    if ((userProperties != null) && (!userProperties.isEmpty())) {
+      for (Entry<String, String> entry : userProperties.entrySet()) {
+        dbProperties.put(entry.getKey().replaceFirst(DBCP_CONFIG_PREFIX + 
"\\.", ""),
+            entry.getValue());
+      }
+    }
+
+    // essential properties
+    dbProperties.put("url", 
conf.get(JdbcStorageConfig.JDBC_URL.getPropertyName()));
+    dbProperties.put("driverClassName",
+        conf.get(JdbcStorageConfig.JDBC_DRIVER_CLASS.getPropertyName()));
+    dbProperties.put("driverUrl",
+        conf.get(JdbcStorageConfig.JDBC_DRIVER_URL.getPropertyName()));
+    dbProperties.put("type", "javax.sql.DataSource");
+    return dbProperties;
+  }
+
+  protected Properties getDefaultDBCPProperties() {
+    Properties props = new Properties();
+    // Don't set 'initialSize', otherwise the driver class will be loaded in
+    // BasicDataSourceFactory.createDataSource() before the class loader is set
+    // by calling BasicDataSource.setDriverClassLoader.
+    // props.put("initialSize", "1");
+    props.put("maxActive", "3");
+    props.put("maxIdle", "0");
+    props.put("maxWait", "10000");
+    props.put("timeBetweenEvictionRunsMillis", "30000");
+    return props;
+  }
+
+  protected int getFetchSize(Configuration conf) {
+    return conf
+        .getInt(JdbcStorageConfig.JDBC_FETCH_SIZE.getPropertyName(), 
DEFAULT_FETCH_SIZE);
+  }
+
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JdbcRecordIterator.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JdbcRecordIterator.java
new file mode 100644
index 000000000..73dafcbaf
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JdbcRecordIterator.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessException;
+import org.apache.impala.extdatasource.thrift.TColumnDesc;
+import org.apache.impala.extdatasource.util.SerializationUtils;
+import org.apache.impala.thrift.TColumnData;
+import org.apache.impala.thrift.TColumnType;
+import org.apache.impala.thrift.TScalarType;
+import org.apache.impala.thrift.TTypeNodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An iterator that allows iterating through a SQL resultset. Includes methods 
to clear up
+ * resources.
+ */
+public class JdbcRecordIterator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(JdbcRecordIterator.class);
+
+  private final Connection conn;
+  private final PreparedStatement ps;
+  private final ResultSet rs;
+  private final List<String> jdbcColumnNames;
+
+  public JdbcRecordIterator(Connection conn, PreparedStatement ps, ResultSet 
rs,
+      Configuration conf) throws JdbcDatabaseAccessException {
+    this.conn = conn;
+    this.ps = ps;
+    this.rs = rs;
+
+    try {
+      ResultSetMetaData metadata = rs.getMetaData();
+      int numColumns = metadata.getColumnCount();
+      List<String> columnNames = new ArrayList<>(numColumns);
+      List<Integer> jdbcColumnTypes = new ArrayList<>(numColumns);
+      for (int i = 0; i < numColumns; i++) {
+        columnNames.add(metadata.getColumnName(i + 1));
+        jdbcColumnTypes.add(metadata.getColumnType(i + 1));
+      }
+      jdbcColumnNames = columnNames;
+    } catch (Exception e) {
+      LOGGER.error("Error while trying to get column names.", e);
+      throw new JdbcDatabaseAccessException(
+          "Error while trying to get column names: " + e.getMessage(), e);
+    }
+    LOGGER.debug("Iterator ColumnNames = {}", jdbcColumnNames);
+  }
+
+  public boolean hasNext() throws JdbcDatabaseAccessException {
+    try {
+      return rs.next();
+    } catch (Exception e) {
+      LOGGER.warn("hasNext() threw exception", e);
+      throw new JdbcDatabaseAccessException(
+          "Error while retrieving next batch of rows: " + e.getMessage(), e);
+    }
+  }
+
+  public void next(List<TColumnDesc> colDescs, List<TColumnData> colDatas) {
+    Preconditions.checkState(colDescs.size() == colDatas.size());
+    for (int i = 0; i < colDescs.size(); ++i) {
+      TColumnType type = colDescs.get(i).getType();
+      TColumnData colData = colDatas.get(i);
+      if (type.types.get(0).getType() != TTypeNodeType.SCALAR) {
+        // Unsupported non-scalar type.
+        throw new UnsupportedOperationException("Unsupported column type: " +
+            type.types.get(0).getType());
+      }
+      Preconditions.checkState(type.getTypesSize() == 1);
+      TScalarType scalarType = type.types.get(0).scalar_type;
+      try {
+        Object value = rs.getObject(i + 1);
+        if (value == null) {
+          colData.addToIs_null(true);
+          continue;
+        }
+        switch (scalarType.type) {
+          case TINYINT:
+            colData.addToByte_vals(rs.getByte(i + 1));
+            break;
+          case SMALLINT:
+            colData.addToShort_vals(rs.getShort(i + 1));
+            break;
+          case INT:
+            colData.addToInt_vals(rs.getInt(i + 1));
+            break;
+          case DATE:
+            LocalDate localDate = Instant.ofEpochMilli(rs.getDate(i + 
1).getTime())
+                .atZone(ZoneId.systemDefault())
+                .toLocalDate();
+            colData.addToInt_vals((int) localDate.toEpochDay());
+            break;
+          case BIGINT:
+            colData.addToLong_vals(rs.getLong(i + 1));
+            break;
+          case DOUBLE:
+            colData.addToDouble_vals(rs.getDouble(i + 1));
+            break;
+          case FLOAT:
+            colData.addToDouble_vals(rs.getFloat(i + 1));
+            break;
+          case STRING:
+            colData.addToString_vals(rs.getString(i + 1));
+            break;
+          case BOOLEAN:
+            colData.addToBool_vals(rs.getBoolean(i + 1));
+            break;
+          case TIMESTAMP:
+            // Use UTC time zone instead of system default time zone
+            colData.addToBinary_vals(
+                SerializationUtils.encodeTimestamp(rs.getTimestamp(i + 1,
+                    
Calendar.getInstance(TimeZone.getTimeZone(ZoneOffset.UTC)))));
+            break;
+          case DECIMAL:
+            BigDecimal val = rs.getBigDecimal(i + 1);
+            colData.addToBinary_vals(
+                SerializationUtils.encodeDecimal(new 
BigDecimal(val.byteValue())));
+            break;
+          case BINARY:
+          case CHAR:
+          case DATETIME:
+          case INVALID_TYPE:
+          case NULL_TYPE:
+          default:
+            // Unsupported.
+            throw new UnsupportedOperationException("Unsupported column type: 
" +
+                scalarType.getType());
+        }
+        colData.addToIs_null(false);
+      } catch (SQLException throwables) {
+        colData.addToIs_null(true);
+      }
+    }
+  }
+
+  /**
+   * Release all DB resources
+   */
+  public void close() throws JdbcDatabaseAccessException {
+    try {
+      rs.close();
+      ps.close();
+      conn.close();
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while trying to close database objects", 
e);
+      throw new JdbcDatabaseAccessException(
+          "Error while releasing database resources: " + e.getMessage(), e);
+    }
+  }
+
+
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JethroDatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JethroDatabaseAccessor.java
new file mode 100644
index 000000000..8cc2cefe8
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/JethroDatabaseAccessor.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * JethroData specific data accessor. This is needed because JethroData JDBC 
drivers do
+ * not support generic LIMIT and OFFSET escape functions, and has  some special
+ * optimization for getting the query metadata using limit 0.
+ */
+
+public class JethroDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      return sql + " LIMIT " + offset + "," + limit;
+    }
+  }
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    return "Select * from (" + sql + ") as \"tmp\" limit " + limit;
+  }
+
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MsSqlDatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MsSqlDatabaseAccessor.java
new file mode 100644
index 000000000..bdbc61701
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MsSqlDatabaseAccessor.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * MSSQL specific data accessor. This is needed because MSSQL JDBC drivers do 
not support
+ * generic LIMIT and OFFSET escape functions
+ */
+public class MsSqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      if (limit == -1) {
+        return sql;
+      }
+      // Order by is not necessary, but MS SQL require it to use FETCH
+      return sql + " ORDER BY 1 OFFSET " + offset + " ROWS FETCH NEXT " + limit
+          + " ROWS ONLY";
+    }
+  }
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit == -1) {
+      return sql;
+    }
+    return sql + " {LIMIT " + limit + "}";
+  }
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java
new file mode 100644
index 000000000..cd9ada13b
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/MySqlDatabaseAccessor.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * MySQL specific data accessor. This is needed because MySQL JDBC drivers do 
not support
+ * generic LIMIT and OFFSET escape functions
+ */
+public class MySqlDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      if (limit != -1) {
+        return sql + " LIMIT " + offset + "," + limit;
+      } else {
+        return sql;
+      }
+    }
+  }
+
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit != -1) {
+      return sql + " LIMIT " + limit;
+    } else {
+      return sql;
+    }
+  }
+
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/OracleDatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/OracleDatabaseAccessor.java
new file mode 100644
index 000000000..bae31f20c
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/OracleDatabaseAccessor.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * Oracle specific data accessor. This is needed because Oracle JDBC drivers 
do not
+ * support generic LIMIT and OFFSET escape functions
+ */
+public class OracleDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  // Random column name to reduce the chance of conflict
+  static final String ROW_NUM_COLUMN_NAME = "dummy_rownum_col_rn1938392";
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      if (limit == -1) {
+        return sql;
+      }
+      // A simple ROWNUM > offset and ROWNUM <= (offset + limit) won't work, 
it will
+      // return nothing
+      return "SELECT * FROM (SELECT t.*, ROWNUM AS " + ROW_NUM_COLUMN_NAME + " 
FROM ("
+          + sql + ") t) WHERE "
+          + ROW_NUM_COLUMN_NAME + " >" + offset + " AND " + 
ROW_NUM_COLUMN_NAME + " <="
+          + (offset + limit);
+    }
+  }
+
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit == -1) {
+      return sql;
+    }
+    return "SELECT * FROM (" + sql + ") WHERE ROWNUM <= " + limit;
+  }
+
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java
new file mode 100644
index 000000000..9ec0d7ab6
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/dao/PostgresDatabaseAccessor.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.dao;
+
+/**
+ * Postgres specific data accessor. Postgres JDBC drivers do not support 
generic LIMIT and
+ * OFFSET escape functions
+ */
+public class PostgresDatabaseAccessor extends GenericJdbcDatabaseAccessor {
+
+  @Override
+  protected String addLimitAndOffsetToQuery(String sql, int limit, int offset) 
{
+    if (offset == 0) {
+      return addLimitToQuery(sql, limit);
+    } else {
+      if (limit == -1) {
+        return sql;
+      }
+      return sql + " LIMIT " + limit + " OFFSET " + offset;
+    }
+  }
+
+  @Override
+  protected String addLimitToQuery(String sql, int limit) {
+    if (limit == -1) {
+      return sql;
+    }
+    return sql + " LIMIT " + limit;
+  }
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/exception/JdbcDatabaseAccessException.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/exception/JdbcDatabaseAccessException.java
new file mode 100644
index 000000000..c85e4bb86
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/exception/JdbcDatabaseAccessException.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.exception;
+
+public class JdbcDatabaseAccessException extends Exception  {
+
+  private static final long serialVersionUID = -4106595742876276803L;
+
+  public JdbcDatabaseAccessException() {
+    super();
+  }
+
+  public JdbcDatabaseAccessException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public JdbcDatabaseAccessException(String message) {
+    super(message);
+  }
+
+  public JdbcDatabaseAccessException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/util/QueryConditionUtil.java
 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/util/QueryConditionUtil.java
new file mode 100644
index 000000000..4f7d58156
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/main/java/org/apache/impala/extdatasource/jdbc/util/QueryConditionUtil.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc.util;
+
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+
+import org.apache.impala.analysis.BinaryPredicate;
+import org.apache.impala.analysis.BinaryPredicate.Operator;
+import org.apache.impala.extdatasource.thrift.TBinaryPredicate;
+import org.apache.impala.extdatasource.thrift.TComparisonOp;
+import org.apache.impala.thrift.TColumnValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Translates the impala query predicates into a condition that can be run on 
the
+ * underlying database
+ */
+public class QueryConditionUtil {
+
+  private final static Logger LOG = 
LoggerFactory.getLogger(QueryConditionUtil.class);
+
+  public static String buildCondition(List<List<TBinaryPredicate>> predicates,
+      Map<String, String> columnMapping) {
+    List<String> condition = Lists.newArrayList();
+    for (List<TBinaryPredicate> tBinaryPredicates : predicates) {
+      StringJoiner joiner = new StringJoiner(" OR ", "(", ")");
+      for (TBinaryPredicate predicate : tBinaryPredicates) {
+        String name = predicate.getCol().getName();
+        name = columnMapping.getOrDefault(name, name);
+        String op = converse(predicate.getOp());
+        String value = getTColumnValueAsString(predicate.getValue());
+        joiner.add(String.format("%s %s %s", name, op, value));
+      }
+      condition.add(joiner.toString());
+    }
+    return Joiner.on(" AND ").join(condition);
+  }
+
+  /**
+   * Return the value of a defined field as a string. If the "value" is null 
or the type
+   * is not supported, an exception is thrown.
+   *
+   * @see org.apache.impala.planner.DataSourceScanNode#literalToColumnValue
+   */
+  public static String getTColumnValueAsString(TColumnValue value) {
+    Preconditions.checkState(value != null);
+    StringBuilder sb = new StringBuilder();
+    if (value.isSetBool_val()) {
+      sb.append(value.bool_val);
+    } else if (value.isSetByte_val()) {
+      sb.append(value.byte_val);
+    } else if (value.isSetShort_val()) {
+      sb.append(value.short_val);
+    } else if (value.isSetInt_val()) {
+      sb.append(value.int_val);
+    } else if (value.isSetLong_val()) {
+      sb.append(value.long_val);
+    } else if (value.isSetDouble_val()) {
+      sb.append(value.double_val);
+    } else if (value.isSetString_val()) {
+      sb.append(String.format("'%s'", value.string_val));
+    } else {
+      // TODO: Support data types of DECIMAL, TIMESTAMP, DATE and binary for 
predicates.
+      // Keep in-sync with DataSourceScanNode.literalToColumnValue().
+      throw new IllegalArgumentException("Unsupported data type.");
+    }
+    return sb.toString();
+  }
+
+  /**
+   * @see BinaryPredicate.Operator
+   */
+  public static String converse(TComparisonOp op) {
+    for (Operator operator : BinaryPredicate.Operator.values()) {
+      if (operator.getThriftOp() == op) {
+        return operator.toString();
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/java/ext-data-source/jdbc/src/test/java/org/apache/impala/extdatasource/jdbc/JdbcDataSourceTest.java
 
b/java/ext-data-source/jdbc/src/test/java/org/apache/impala/extdatasource/jdbc/JdbcDataSourceTest.java
new file mode 100644
index 000000000..229367c8d
--- /dev/null
+++ 
b/java/ext-data-source/jdbc/src/test/java/org/apache/impala/extdatasource/jdbc/JdbcDataSourceTest.java
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.extdatasource.jdbc;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.impala.extdatasource.thrift.TBinaryPredicate;
+import org.apache.impala.extdatasource.thrift.TCloseParams;
+import org.apache.impala.extdatasource.thrift.TCloseResult;
+import org.apache.impala.extdatasource.thrift.TColumnDesc;
+import org.apache.impala.extdatasource.thrift.TComparisonOp;
+import org.apache.impala.extdatasource.thrift.TGetNextParams;
+import org.apache.impala.extdatasource.thrift.TGetNextResult;
+import org.apache.impala.extdatasource.thrift.TOpenParams;
+import org.apache.impala.extdatasource.thrift.TOpenResult;
+import org.apache.impala.extdatasource.thrift.TPrepareParams;
+import org.apache.impala.extdatasource.thrift.TPrepareResult;
+import org.apache.impala.extdatasource.thrift.TRowBatch;
+import org.apache.impala.extdatasource.thrift.TTableSchema;
+import org.apache.impala.thrift.TColumnData;
+import org.apache.impala.thrift.TColumnType;
+import org.apache.impala.thrift.TColumnValue;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TPrimitiveType;
+import org.apache.impala.thrift.TScalarType;
+import org.apache.impala.thrift.TTypeNode;
+import org.apache.impala.thrift.TTypeNodeType;
+import org.apache.impala.thrift.TUniqueId;
+import org.junit.Assert;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class JdbcDataSourceTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JdbcDataSourceTest.class);
+
+  private static String initString_ = "CACHE_CLASS::{\"database.type\":\"H2\", 
"
+      + "\"jdbc.url\":\"jdbc:h2:mem:test;MODE=MySQL;INIT=runscript from "
+      + "'classpath:test_script.sql'\", "
+      + "\"jdbc.driver\":\"org.h2.Driver\", "
+      + "\"table\":\"test_strategy\","
+      + "\"column.mapping\":\"id=strategy_id\"}";
+
+  // Share data between tests
+  private static JdbcDataSource jdbcDataSource_ = new JdbcDataSource();
+  private static String scanHandle_;
+  private static TTableSchema schema_;
+  private static List<List<TBinaryPredicate>> predicates_ = 
Lists.newArrayList();
+  private static List<List<TBinaryPredicate>> acceptedPredicates_ = 
Lists.newArrayList();
+  private static long expectReturnRows_ = 5L;
+
+  @Test
+  public void test01Init() {
+    String colName = "id";
+
+    TComparisonOp op = TComparisonOp.LE;
+
+    TTypeNode typeNode = new TTypeNode();
+    typeNode.setType(TTypeNodeType.SCALAR);
+    TScalarType scalarType = new TScalarType();
+    scalarType.setType(TPrimitiveType.INT);
+    typeNode.setScalar_type(scalarType);
+    TColumnType colType = new TColumnType();
+    colType.setTypes(Lists.newArrayList(typeNode));
+    TColumnDesc col = new TColumnDesc().setName(colName).setType(colType);
+    TColumnValue value = new TColumnValue();
+    value.setInt_val(3);
+    TBinaryPredicate idPredicate = new TBinaryPredicate().setCol(col).setOp(op)
+        .setValue(value);
+
+    // predicates filter
+    predicates_.add(Lists.newArrayList(idPredicate));
+    expectReturnRows_ = 3L;
+    LOG.info("setup predicates:{}, expectReturnRows: {}", predicates_, 
expectReturnRows_);
+
+    boolean ret = 
jdbcDataSource_.convertInitStringToConfiguration(initString_);
+    Assert.assertTrue(ret);
+  }
+
+  @Test
+  public void test02Prepare() {
+    TPrepareParams params = new TPrepareParams();
+    params.setTable_name("test_strategy");
+    params.setInit_string(initString_);
+    params.setPredicates(Lists.newArrayList());
+    params.setPredicates(predicates_);
+    TPrepareResult resp = jdbcDataSource_.prepare(params);
+    Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
+    if (resp.isSetAccepted_conjuncts()) {
+      acceptedPredicates_ = Lists.newArrayList();
+      // @see 
org.apache.impala.planner.DataSourceScanNode#removeAcceptedConjuncts
+      List<Integer> acceptedPredicatesIdx = resp.getAccepted_conjuncts();
+      // Because conjuncts_ is modified in place using positional indexes from
+      // conjunctsIdx, we remove the accepted predicates in reverse order.
+      for (int i = acceptedPredicatesIdx.size() - 1; i >= 0; --i) {
+        int acceptedPredIdx = acceptedPredicatesIdx.get(i);
+        acceptedPredicates_.add(predicates_.remove(acceptedPredIdx));
+      }
+      // Returns a view of the list in the original order as we will print 
these
+      // in the explain string and it's convenient to have predicates printed
+      // in the same order that they're specified.
+      acceptedPredicates_ = Lists.reverse(acceptedPredicates_);
+    }
+    if (resp.isSetNum_rows_estimate()) {
+      long estimate = resp.getNum_rows_estimate();
+      Assert.assertEquals(5, estimate);
+    }
+  }
+
+  @Test
+  public void test03Open() {
+    TOpenParams params = new TOpenParams();
+    TUniqueId unique_id = new TUniqueId();
+    unique_id.hi = 0xfeedbeeff00d7777L;
+    unique_id.lo = 0x2020202020202020L;
+    String str = "feedbeeff00d7777:2020202020202020";
+    params.setQuery_id(unique_id);
+    params.setTable_name("test_strategy");
+    params.setInit_string(initString_);
+    schema_ = initSchema();
+    params.setRow_schema(schema_);
+    params.setBatch_size(5);
+    params.setPredicates(acceptedPredicates_);
+    TOpenResult resp = jdbcDataSource_.open(params);
+    Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
+    scanHandle_ = resp.getScan_handle();
+    Assert.assertTrue(StringUtils.isNoneBlank(scanHandle_));
+  }
+
+  @Test
+  public void test04GetNext() {
+    TGetNextParams params = new TGetNextParams();
+    params.setScan_handle(scanHandle_);
+    boolean eos;
+    long totalNumRows = 0;
+    do {
+      TGetNextResult resp = jdbcDataSource_.getNext(params);
+      Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
+      eos = resp.isEos();
+      TRowBatch rowBatch = resp.getRows();
+      long numRows = rowBatch.getNum_rows();
+      totalNumRows += numRows;
+      List<TColumnData> cols = rowBatch.getCols();
+      Assert.assertEquals(schema_.getColsSize(), cols.size());
+    } while (!eos);
+    Assert.assertEquals(expectReturnRows_, totalNumRows);
+  }
+
+  @Test
+  public void test05Close() {
+    TCloseParams params = new TCloseParams();
+    params.setScan_handle(scanHandle_);
+    TCloseResult resp = jdbcDataSource_.close(params);
+    Assert.assertEquals(TErrorCode.OK, resp.getStatus().status_code);
+  }
+
+  private static TTableSchema initSchema() {
+    // strategy_id int, name string, referrer string, landing string, priority 
 int,
+    // implementation string, last_modified timestamp
+    TTableSchema schema_ = new TTableSchema();
+    TColumnDesc col = new TColumnDesc();
+    col.setName("id");
+    TTypeNode typeNode = new TTypeNode();
+    typeNode.setType(TTypeNodeType.SCALAR);
+    TScalarType scalarType = new TScalarType();
+    scalarType.setType(TPrimitiveType.INT);
+    typeNode.setScalar_type(scalarType);
+    TColumnType colType = new TColumnType();
+    colType.setTypes(Lists.newArrayList(typeNode));
+    col.setType(colType);
+    schema_.addToCols(col);
+
+    col = new TColumnDesc();
+    col.setName("name");
+    typeNode = new TTypeNode();
+    typeNode.setType(TTypeNodeType.SCALAR);
+    scalarType = new TScalarType();
+    scalarType.setType(TPrimitiveType.STRING);
+    typeNode.setScalar_type(scalarType);
+    colType = new TColumnType();
+    colType.setTypes(Lists.newArrayList(typeNode));
+    col.setType(colType);
+    schema_.addToCols(col);
+
+    col = new TColumnDesc();
+    col.setName("priority");
+    typeNode = new TTypeNode();
+    typeNode.setType(TTypeNodeType.SCALAR);
+    scalarType = new TScalarType();
+    scalarType.setType(TPrimitiveType.INT);
+    typeNode.setScalar_type(scalarType);
+    colType = new TColumnType();
+    colType.setTypes(Lists.newArrayList(typeNode));
+    col.setType(colType);
+    schema_.addToCols(col);
+
+    col = new TColumnDesc();
+    col.setName("implementation");
+    typeNode = new TTypeNode();
+    typeNode.setType(TTypeNodeType.SCALAR);
+    scalarType = new TScalarType();
+    scalarType.setType(TPrimitiveType.STRING);
+    typeNode.setScalar_type(scalarType);
+    colType = new TColumnType();
+    colType.setTypes(Lists.newArrayList(typeNode));
+    col.setType(colType);
+    schema_.addToCols(col);
+
+    col = new TColumnDesc();
+    col.setName("last_modified");
+    typeNode = new TTypeNode();
+    typeNode.setType(TTypeNodeType.SCALAR);
+    scalarType = new TScalarType();
+    scalarType.setType(TPrimitiveType.TIMESTAMP);
+    typeNode.setScalar_type(scalarType);
+    colType = new TColumnType();
+    colType.setTypes(Lists.newArrayList(typeNode));
+    col.setType(colType);
+    schema_.addToCols(col);
+    return schema_;
+  }
+
+  public static void printData(List<TColumnDesc> colDescs, List<TColumnData> 
colDatas) {
+    for (int i = 0; i < colDatas.size(); ++i) {
+      TColumnDesc colDesc = colDescs.get(i);
+      TColumnData colData = colDatas.get(i);
+      System.out.println("idx: " + i);
+      System.out.println(" Name: " + colDesc);
+      System.out.println(" Data: " + colData);
+    }
+  }
+
+}
diff --git a/testdata/bin/copy-data-sources.sh 
b/java/ext-data-source/jdbc/src/test/resources/log4j.properties
old mode 100755
new mode 100644
similarity index 58%
rename from testdata/bin/copy-data-sources.sh
rename to java/ext-data-source/jdbc/src/test/resources/log4j.properties
index c218e15dd..31e2a489e
--- a/testdata/bin/copy-data-sources.sh
+++ b/java/ext-data-source/jdbc/src/test/resources/log4j.properties
@@ -1,5 +1,3 @@
-#!/bin/bash
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,17 +14,15 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
-# This script copies the test data source library into hdfs.
-
-set -euo pipefail
-. $IMPALA_HOME/bin/report_build_error.sh
-setup_report_build_error
 
-. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
+# Define some default values that can be overridden by system properties
+# Don't use hadoop.root.logger because Hadoop's config scripts override it
+impala.hadoop.root.logger=INFO,console
 
-hadoop fs -mkdir -p ${FILESYSTEM_PREFIX}/test-warehouse/data-sources/
+# Define the root logger to the system property "impala.hadoop.root.logger".
+log4j.rootLogger=${impala.hadoop.root.logger}
 
-hadoop fs -put -f \
-  
${IMPALA_HOME}/java/ext-data-source/test/target/impala-data-source-test-*.jar \
-  ${FILESYSTEM_PREFIX}/test-warehouse/data-sources/test-data-source.jar
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n
diff --git a/java/ext-data-source/jdbc/src/test/resources/test_script.sql 
b/java/ext-data-source/jdbc/src/test/resources/test_script.sql
new file mode 100644
index 000000000..f88cb5144
--- /dev/null
+++ b/java/ext-data-source/jdbc/src/test/resources/test_script.sql
@@ -0,0 +1,47 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+DROP TABLE IF EXISTS test_strategy;
+
+CREATE TABLE IF NOT EXISTS test_strategy (
+  strategy_id int(11) NOT NULL,
+  name varchar(50) NOT NULL,
+  referrer varchar(1024) DEFAULT NULL,
+  landing varchar(1024) DEFAULT NULL,
+  priority int(11) DEFAULT NULL,
+  implementation varchar(512) DEFAULT NULL,
+  last_modified timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+  PRIMARY KEY (strategy_id)
+);
+
+
+INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, 
implementation,
+                           last_modified)
+VALUES (1, 'S1', 'aaa', 'abc', 1000, NULL, '2012-05-08 15:01:15');
+INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, 
implementation,
+                           last_modified)
+VALUES (2, 'S2', 'bbb', 'def', 990, NULL, '2012-05-08 15:01:15');
+INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, 
implementation,
+                           last_modified)
+VALUES (3, 'S3', 'ccc', 'ghi', 1000, NULL, '2012-05-08 15:01:15');
+INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, 
implementation,
+                           last_modified)
+VALUES (4, 'S4', 'ddd', 'jkl', 980, NULL, '2012-05-08 15:01:15');
+INSERT INTO test_strategy (strategy_id, name, referrer, landing, priority, 
implementation,
+                           last_modified)
+VALUES (5, 'S5', 'eee', NULL, NULL, NULL, '2012-05-08 15:01:15');
diff --git a/java/ext-data-source/pom.xml b/java/ext-data-source/pom.xml
index febc41a05..d16b0882d 100644
--- a/java/ext-data-source/pom.xml
+++ b/java/ext-data-source/pom.xml
@@ -33,5 +33,6 @@
     <module>api</module>
     <module>sample</module>
     <module>test</module>
+    <module>jdbc</module>
   </modules>
 </project>
diff --git a/testdata/bin/copy-ext-data-sources.sh 
b/testdata/bin/copy-ext-data-sources.sh
new file mode 100755
index 000000000..7b651914f
--- /dev/null
+++ b/testdata/bin/copy-ext-data-sources.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This script copies the test data source library into hdfs.
+
+set -euo pipefail
+. $IMPALA_HOME/bin/report_build_error.sh
+setup_report_build_error
+
+. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
+
+EXT_DATA_SOURCE_SRC_PATH=${IMPALA_HOME}/java/ext-data-source
+EXT_DATA_SOURCES_HDFS_PATH=${FILESYSTEM_PREFIX}/test-warehouse/data-sources
+JDBC_DRIVERS_HDFS_PATH=${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-drivers
+
+hadoop fs -mkdir -p ${EXT_DATA_SOURCES_HDFS_PATH}
+hadoop fs -mkdir -p ${JDBC_DRIVERS_HDFS_PATH}
+
+# Copy libraries of external data sources to HDFS
+hadoop fs -put -f \
+  ${EXT_DATA_SOURCE_SRC_PATH}/test/target/impala-data-source-test-*.jar \
+  ${EXT_DATA_SOURCES_HDFS_PATH}/test-data-source.jar
+
+echo "Copied" 
${EXT_DATA_SOURCE_SRC_PATH}/test/target/impala-data-source-test-*.jar \
+  "into HDFS" ${EXT_DATA_SOURCES_HDFS_PATH}
+
+hadoop fs -put -f \
+  ${EXT_DATA_SOURCE_SRC_PATH}/jdbc/target/impala-data-source-jdbc-*.jar \
+  ${EXT_DATA_SOURCES_HDFS_PATH}/jdbc-data-source.jar
+
+echo "Copied" 
${EXT_DATA_SOURCE_SRC_PATH}/jdbc/target/impala-data-source-jdbc-*.jar \
+  "into HDFS" ${EXT_DATA_SOURCES_HDFS_PATH}
+
+# Copy Postgres JDBC driver to HDFS
+hadoop fs -put -f \
+  ${IMPALA_HOME}/fe/target/dependency/postgresql-*.jar \
+  ${JDBC_DRIVERS_HDFS_PATH}/postgresql-jdbc.jar
+
+echo "Copied" ${IMPALA_HOME}/fe/target/dependency/postgresql-*.jar \
+  "into HDFS" ${JDBC_DRIVERS_HDFS_PATH}
diff --git a/testdata/bin/create-data-source-table.sql 
b/testdata/bin/create-data-source-table.sql
deleted file mode 100644
index 5d9c03256..000000000
--- a/testdata/bin/create-data-source-table.sql
+++ /dev/null
@@ -1,47 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---   http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing,
--- software distributed under the License is distributed on an
--- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
--- KIND, either express or implied.  See the License for the
--- specific language governing permissions and limitations
--- under the License.
-
--- Create test data sources and tables
-
-USE functional;
-
-DROP DATA SOURCE IF EXISTS AllTypesDataSource;
-CREATE DATA SOURCE AllTypesDataSource
-LOCATION '/test-warehouse/data-sources/test-data-source.jar'
-CLASS 'org.apache.impala.extdatasource.AllTypesDataSource'
-API_VERSION 'V1';
-
-DROP TABLE IF EXISTS alltypes_datasource;
-CREATE TABLE alltypes_datasource (
-  id INT,
-  bool_col BOOLEAN,
-  tinyint_col TINYINT,
-  smallint_col SMALLINT,
-  int_col INT,
-  bigint_col BIGINT,
-  float_col FLOAT,
-  double_col DOUBLE,
-  timestamp_col TIMESTAMP,
-  string_col STRING,
-  dec_col1 DECIMAL(9,0),
-  dec_col2 DECIMAL(10,0),
-  dec_col3 DECIMAL(20,10),
-  dec_col4 DECIMAL(38,37),
-  dec_col5 DECIMAL(10,5),
-  date_col DATE)
-PRODUCED BY DATA SOURCE AllTypesDataSource("TestInitString");
diff --git a/testdata/bin/create-ext-data-source-table.sql 
b/testdata/bin/create-ext-data-source-table.sql
new file mode 100644
index 000000000..84bd02e17
--- /dev/null
+++ b/testdata/bin/create-ext-data-source-table.sql
@@ -0,0 +1,98 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- Create test data sources and tables
+
+USE functional;
+
+DROP DATA SOURCE IF EXISTS AllTypesDataSource;
+CREATE DATA SOURCE AllTypesDataSource
+LOCATION '/test-warehouse/data-sources/test-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.AllTypesDataSource'
+API_VERSION 'V1';
+
+DROP TABLE IF EXISTS alltypes_datasource;
+CREATE TABLE alltypes_datasource (
+  id INT,
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  timestamp_col TIMESTAMP,
+  string_col STRING,
+  dec_col1 DECIMAL(9,0),
+  dec_col2 DECIMAL(10,0),
+  dec_col3 DECIMAL(20,10),
+  dec_col4 DECIMAL(38,37),
+  dec_col5 DECIMAL(10,5),
+  date_col DATE)
+PRODUCED BY DATA SOURCE AllTypesDataSource("TestInitString");
+
+DROP DATA SOURCE IF EXISTS JdbcDataSource;
+CREATE DATA SOURCE JdbcDataSource
+LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar'
+CLASS 'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
+API_VERSION 'V1';
+
+DROP TABLE IF EXISTS alltypes_jdbc_datasource;
+CREATE TABLE alltypes_jdbc_datasource (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE JdbcDataSource(
+'{"database.type":"POSTGRES",
+"jdbc.url":"jdbc:postgresql://localhost:5432/functional",
+"jdbc.driver":"org.postgresql.Driver",
+"driver.url":"/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
+"dbcp.username":"hiveuser",
+"dbcp.password":"password",
+"table":"alltypes"}');
+
+DROP TABLE IF EXISTS alltypes_jdbc_datasource_2;
+CREATE TABLE alltypes_jdbc_datasource_2 (
+ id INT,
+ bool_col BOOLEAN,
+ tinyint_col TINYINT,
+ smallint_col SMALLINT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ date_string_col STRING,
+ string_col STRING,
+ timestamp_col TIMESTAMP)
+PRODUCED BY DATA SOURCE JdbcDataSource(
+'{"database.type":"POSTGRES",
+"jdbc.url":"jdbc:postgresql://localhost:5432/functional",
+"jdbc.driver":"org.postgresql.Driver",
+"driver.url":"hdfs://localhost:20500/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
+"dbcp.username":"hiveuser",
+"dbcp.password":"password",
+"table":"AllTypesWithQuote",
+"column.mapping":"id=id, bool_col=Bool_col, tinyint_col=Tinyint_col, 
smallint_col=Smallint_col, int_col=Int_col, bigint_col=Bigint_col, 
float_col=Float_col, double_col=Double_col, date_string_col=Date_string_col, 
string_col=String_col, timestamp=Timestamp"}');
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index e9d6183a6..9d87c06cf 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -478,10 +478,12 @@ function custom-post-load-steps {
 
 function copy-and-load-ext-data-source {
   # Copy the test data source library into HDFS
-  ${IMPALA_HOME}/testdata/bin/copy-data-sources.sh
+  ${IMPALA_HOME}/testdata/bin/copy-ext-data-sources.sh
+  # Load the underlying data of the data source
+  ${IMPALA_HOME}/testdata/bin/load-ext-data-sources.sh
   # Create data sources table.
   ${IMPALA_HOME}/bin/impala-shell.sh -i ${IMPALAD} -f\
-    ${IMPALA_HOME}/testdata/bin/create-data-source-table.sql
+    ${IMPALA_HOME}/testdata/bin/create-ext-data-source-table.sql
 }
 
 function check-hdfs-health {
diff --git a/testdata/bin/load-ext-data-sources.sh 
b/testdata/bin/load-ext-data-sources.sh
new file mode 100755
index 000000000..0476bdae2
--- /dev/null
+++ b/testdata/bin/load-ext-data-sources.sh
@@ -0,0 +1,83 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This script create and load the jdbc data source target table in Postgres.
+
+set -euo pipefail
+. $IMPALA_HOME/bin/report_build_error.sh
+setup_report_build_error
+
+. ${IMPALA_HOME}/bin/impala-config.sh > /dev/null 2>&1
+
+# Create functional.alltype table
+dropdb -U hiveuser functional || true
+createdb -U hiveuser functional
+
+# Create jdbc table
+cat > /tmp/jdbc_alltypes.sql <<__EOT__
+DROP TABLE IF EXISTS alltypes;
+CREATE TABLE alltypes
+(
+    id              INT,
+    bool_col        BOOLEAN,
+    tinyint_col     SMALLINT,
+    smallint_col    SMALLINT,
+    int_col         INT,
+    bigint_col      BIGINT,
+    float_col       FLOAT,
+    double_col      DOUBLE PRECISION,
+    date_string_col VARCHAR(8),
+    string_col      VARCHAR(10),
+    timestamp_col   TIMESTAMP
+);
+__EOT__
+sudo -u postgres psql -U hiveuser -d functional -f /tmp/jdbc_alltypes.sql
+
+# Create jdbc table with case sensitive names for table and columns.
+cat > /tmp/jdbc_alltypes_with_quote.sql <<__EOT__
+DROP TABLE IF EXISTS "AllTypesWithQuote";
+CREATE TABLE "AllTypesWithQuote"
+(
+    "id"            INT,
+    "Bool_col"      BOOLEAN,
+    "Tinyint_col"   SMALLINT,
+    "Smallint_col"  SMALLINT,
+    "Int_col"       INT,
+    "Bigint_col"    BIGINT,
+    "Float_col"     FLOAT,
+    "Double_col"    DOUBLE PRECISION,
+    "Date_string_col" VARCHAR(8),
+    "String_col"    VARCHAR(10),
+    "Timestamp_col" TIMESTAMP
+);
+__EOT__
+sudo -u postgres psql -U hiveuser -d functional -f 
/tmp/jdbc_alltypes_with_quote.sql
+
+# Load data to jdbc table
+cat ${IMPALA_HOME}/testdata/target/AllTypes/* > /tmp/jdbc_alltypes.csv
+loadCmd="COPY alltypes FROM '/tmp/jdbc_alltypes.csv' DELIMITER ',' CSV"
+sudo -u postgres psql -d functional -c "$loadCmd"
+
+loadCmd="COPY \"AllTypesWithQuote\" FROM '/tmp/jdbc_alltypes.csv' DELIMITER 
',' CSV"
+sudo -u postgres psql -d functional -c "$loadCmd"
+
+
+# Clean tmp files
+rm /tmp/jdbc_alltypes.*
+rm /tmp/jdbc_alltypes_with_quote.*
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test 
b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
new file mode 100644
index 000000000..2bd068234
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/jdbc-data-source.test
@@ -0,0 +1,105 @@
+====
+---- QUERY
+# Test the jdbc data source
+---- QUERY
+# count(*) with a predicate evaluated by Impala
+select count(*) from alltypes_jdbc_datasource
+where float_col = 0 and string_col is not NULL
+---- RESULTS
+730
+---- TYPES
+BIGINT
+====
+---- QUERY
+# count(*) with no predicates has no materialized slots
+select count(*) from alltypes_jdbc_datasource
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Gets all types including a row with a NULL value. The predicate pushed to
+# the data source.
+select *
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 limit 5
+---- RESULTS
+11,false,1,1,1,10,1.100000023841858,10.1,'01/02/09','1',2009-01-02 
00:11:00.450000000
+12,true,2,2,2,20,2.200000047683716,20.2,'01/02/09','2',2009-01-02 
00:12:00.460000000
+13,false,3,3,3,30,3.299999952316284,30.3,'01/02/09','3',2009-01-02 
00:13:00.480000000
+14,true,4,4,4,40,4.400000095367432,40.4,'01/02/09','4',2009-01-02 
00:14:00.510000000
+20,true,0,0,0,0,0,0,'01/03/09','0',2009-01-03 00:20:00.900000000
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, 
TIMESTAMP
+====
+---- QUERY
+# Gets specified columns.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource
+where id > 10 and int_col< 5 limit 5
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+====
+---- QUERY
+# Gets specified columns from external jdbc table with case sensitive column 
names
+# and table name.
+select id, bool_col, smallint_col, float_col, double_col, date_string_col
+from alltypes_jdbc_datasource_2
+where id > 10 and int_col< 5 limit 5
+---- RESULTS
+11,false,1,1.100000023841858,10.1,'01/02/09'
+12,true,2,2.200000047683716,20.2,'01/02/09'
+13,false,3,3.299999952316284,30.3,'01/02/09'
+14,true,4,4.400000095367432,40.4,'01/02/09'
+20,true,0,0,0,'01/03/09'
+---- TYPES
+INT, BOOLEAN, SMALLINT, FLOAT, DOUBLE, STRING
+====
+---- QUERY
+# Inner join with a non jdbc table
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join functional.alltypes b on (a.id = 
b.id)
+where a.id = 1
+---- RESULTS
+1,1
+---- TYPES
+INT, INT
+====
+---- QUERY
+# Inner join with another jdbc table
+select a.id, b.int_col
+from alltypes_jdbc_datasource a inner join alltypes_jdbc_datasource_2 b on 
(a.id = b.id)
+where a.id < 3 group by a.id, b.int_col
+---- RESULTS
+0,0
+1,1
+2,2
+---- TYPES
+INT, INT
+====
+---- QUERY
+# Cross join
+select a.id, b.id
+from alltypes_jdbc_datasource a cross join alltypes_jdbc_datasource b
+where (a.id < 3 and b.id < 3)
+order by a.id, b.id limit 10
+---- RESULTS
+0,0
+0,1
+0,2
+1,0
+1,1
+1,2
+2,0
+2,1
+2,2
+---- TYPES
+INT, INT
+====
diff --git a/tests/query_test/test_ext_data_sources.py 
b/tests/query_test/test_ext_data_sources.py
new file mode 100644
index 000000000..b32d39cd9
--- /dev/null
+++ b/tests/query_test/test_ext_data_sources.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfCatalogV2
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+
+
+class TestExtDataSources(ImpalaTestSuite):
+  """Impala query tests for external data sources."""
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestExtDataSources, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  def _get_tbl_properties(self, table_name):
+    """Extracts the table properties mapping from the output of DESCRIBE 
FORMATTED"""
+    return self._get_properties('Table Parameters:', table_name)
+
+  def _get_properties(self, section_name, name, is_db=False):
+    """Extracts the db/table properties mapping from the output of DESCRIBE 
FORMATTED"""
+    result = self.client.execute("describe {0} formatted {1}".format(
+      "database" if is_db else "", name))
+    match = False
+    properties = dict()
+    for row in result.data:
+      fields = row.split("\t")
+      if fields[0] != '':
+        # Start of new section.
+        if match:
+          # Finished processing matching section.
+          break
+        match = section_name in fields[0]
+      elif match:
+        if fields[1] == 'NULL':
+          break
+        properties[fields[1].rstrip()] = fields[2].rstrip()
+    return properties
+
+  @SkipIfCatalogV2.data_sources_unsupported()
+  def test_verify_jdbc_table_properties(self, vector):
+    jdbc_tbl_name = "functional.alltypes_jdbc_datasource"
+    properties = self._get_tbl_properties(jdbc_tbl_name)
+    # Verify data source related table properties
+    assert properties['__IMPALA_DATA_SOURCE_NAME'] == 'jdbcdatasource'
+    assert properties['__IMPALA_DATA_SOURCE_LOCATION'] == \
+        
'hdfs://localhost:20500/test-warehouse/data-sources/jdbc-data-source.jar'
+    assert properties['__IMPALA_DATA_SOURCE_CLASS'] == \
+        'org.apache.impala.extdatasource.jdbc.JdbcDataSource'
+    assert properties['__IMPALA_DATA_SOURCE_API_VERSION'] == 'V1'
+    assert 'database.type\\":\\"POSTGRES' \
+        in properties['__IMPALA_DATA_SOURCE_INIT_STRING']
+    assert 'table\\":\\"alltypes' \
+        in properties['__IMPALA_DATA_SOURCE_INIT_STRING']
+
+  @SkipIfCatalogV2.data_sources_unsupported()
+  def test_data_source_tables(self, vector):
+    self.run_test_case('QueryTest/data-source-tables', vector)
+
+  @SkipIfCatalogV2.data_sources_unsupported()
+  def test_jdbc_data_source(self, vector):
+    self.run_test_case('QueryTest/jdbc-data-source', vector)
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index 1c7d9de67..0ae78b304 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -236,10 +236,6 @@ class TestQueriesTextTables(ImpalaTestSuite):
     vector.get_value('exec_option')['abort_on_error'] = 1
     self.run_test_case('QueryTest/strict-mode-abort', vector)
 
-  @SkipIfCatalogV2.data_sources_unsupported()
-  def test_data_source_tables(self, vector):
-    self.run_test_case('QueryTest/data-source-tables', vector)
-
   def test_range_constant_propagation(self, vector):
     self.run_test_case('QueryTest/range-constant-propagation', vector)
 

Reply via email to