This is an automated email from the ASF dual-hosted git repository.

taozex pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 34a6b8e9f6 [hotfix][connector-v2-hbase]fix and  optimize hbase source 
problem (#7148)
34a6b8e9f6 is described below

commit 34a6b8e9f62d9e04da554062b46949c7075176ed
Author: Jast <ad...@hadoop.wiki>
AuthorDate: Wed Jul 31 14:43:11 2024 +0800

    [hotfix][connector-v2-hbase]fix and  optimize hbase source problem (#7148)
    
    * [hotfix][improve][doc]optimize connector hbase source
    
    * [doc]add dependent document
    
    * [doc]update dependent document
    
    * [improve]improve static use
    
    * [hotfix]add test case
    
    * [hotfix]add test case
    
    ---------
    
    Co-authored-by: Jia Fan <fanjiaemi...@qq.com>
---
 docs/en/connector-v2/source/Hbase.md               | 109 +++++++++--------
 docs/zh/connector-v2/source/Hbase.md               |  96 +++++++++++++++
 docs/zh/connector-v2/source/common-options.md      |  81 +++++++++++++
 .../seatunnel/hbase/config/HbaseConfig.java        |  27 ++++-
 .../seatunnel/hbase/config/HbaseParameters.java    |  24 +++-
 .../connectors/seatunnel/hbase/sink/HbaseSink.java |   2 +-
 .../seatunnel/hbase/source/HbaseSource.java        |   6 +-
 .../seatunnel/hbase/source/HbaseSourceFactory.java |   1 -
 .../seatunnel/hbase/source/HbaseSourceReader.java  |  30 +++--
 .../seatunnel/e2e/connector/hbase/HbaseIT.java     |  37 ++++--
 .../hbase-source-to-assert-with-batch-query.conf   | 132 +++++++++++++++++++++
 11 files changed, 455 insertions(+), 90 deletions(-)

diff --git a/docs/en/connector-v2/source/Hbase.md 
b/docs/en/connector-v2/source/Hbase.md
index 677b827fb2..753d68eb6e 100644
--- a/docs/en/connector-v2/source/Hbase.md
+++ b/docs/en/connector-v2/source/Hbase.md
@@ -1,12 +1,12 @@
 # Hbase
 
-> Hbase source connector
+> Hbase Source Connector
 
 ## Description
 
-Read data from Apache Hbase.
+Reads data from Apache Hbase.
 
-## Key features
+## Key Features
 
 - [x] [batch](../../concept/connector-v2-features.md)
 - [ ] [stream](../../concept/connector-v2-features.md)
@@ -17,75 +17,80 @@ Read data from Apache Hbase.
 
 ## Options
 
-|        name        |  type  | required | default value |
-|--------------------|--------|----------|---------------|
-| zookeeper_quorum   | string | yes      | -             |
-| table              | string | yes      | -             |
-| query_columns      | list   | yes      | -             |
-| schema             | config | yes      | -             |
-| hbase_extra_config | string | no       | -             |
-| common-options     |        | no       | -             |
+|        Name        |  Type   | Required | Default |
+|--------------------|---------|----------|---------|
+| zookeeper_quorum   | string  | Yes      | -       |
+| table              | string  | Yes      | -       |
+| schema             | config  | Yes      | -       |
+| hbase_extra_config | string  | No       | -       |
+| caching            | int     | No       | -1      |
+| batch              | int     | No       | -1      |
+| cache_blocks       | boolean | No       | false   |
+| common-options     |         | No       | -       |
 
 ### zookeeper_quorum [string]
 
-The zookeeper cluster host of hbase, example: 
"hadoop001:2181,hadoop002:2181,hadoop003:2181"
+The zookeeper quorum for Hbase cluster hosts, e.g., 
"hadoop001:2181,hadoop002:2181,hadoop003:2181".
 
 ### table [string]
 
-The table name you want to write, example: "seatunnel"
-
-### query_columns [list]
-
-The column name which you want to query in the table. If you want to query the 
rowkey column, please set "rowkey" in query_columns.
-Other column format should be: columnFamily:columnName, example: ["rowkey", 
"columnFamily1:column1", "columnFamily1:column1", "columnFamily2:column1"]
+The name of the table to write to, e.g., "seatunnel".
 
 ### schema [config]
 
-Hbase uses byte arrays for storage. Therefore, you need to configure data 
types for each column in a table. For more information, see: 
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
+Hbase stores data in byte arrays. Therefore, you need to configure the data 
types for each column in the table. For more information, see: 
[guide](../../concept/schema-feature.md#how-to-declare-type-supported).
 
 ### hbase_extra_config [config]
 
-The extra configuration of hbase
+Additional configurations for Hbase.
+
+### caching
+
+The caching parameter sets the number of rows fetched per server trip during 
scans. This reduces round-trips between client and server, improving scan 
efficiency. Default: -1.
+
+### batch
+
+The batch parameter sets the maximum number of columns returned per scan. This 
is useful for rows with many columns to avoid fetching excessive data at once, 
thus saving memory and improving performance.
 
-### common options
+### cache_blocks
 
-Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
+The cache_blocks parameter determines whether to cache data blocks during 
scans. By default, HBase caches data blocks during scans. Setting this to false 
reduces memory usage during scans. Default in SeaTunnel: false.
 
-## Examples
+### common-options
+
+Common parameters for Source plugins, refer to [Common Source 
Options](common-options.md).
+
+## Example
 
 ```bash
 source {
   Hbase {
-      zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
-      table = "seatunnel_test"
-      query_columns=["rowkey", "columnFamily1:column1", 
"columnFamily1:column1", "columnFamily2:column1"]
-      schema = {
-            columns = [
-                  {
-                     name = rowkey
-                     type = string
-                  },
-                  {
-                     name = "columnFamily1:column1"
-                     type = boolean
-                  },
-                  {
-                     name = "columnFamily1:column1"
-                     type = double
-                  },
-                  {
-                     name = "columnFamily2:column1"
-                     type = bigint
-                  }
-            ]
-      }
+    zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" 
+    table = "seatunnel_test" 
+    caching = 1000 
+    batch = 100 
+    cache_blocks = false 
+    schema = {
+      columns = [
+        { 
+          name = "rowkey" 
+          type = string 
+        },
+        {
+          name = "columnFamily1:column1"
+          type = boolean
+        },
+        {
+          name = "columnFamily1:column2" 
+          type = double
+        },
+        {
+          name = "columnFamily2:column1"
+          type = bigint
+        }
+      ]
+    }
   }
 }
 ```
 
-## Changelog
-
-### next version
-
-- Add Hbase Source Connector
-
diff --git a/docs/zh/connector-v2/source/Hbase.md 
b/docs/zh/connector-v2/source/Hbase.md
new file mode 100644
index 0000000000..5f15a30b99
--- /dev/null
+++ b/docs/zh/connector-v2/source/Hbase.md
@@ -0,0 +1,96 @@
+# Hbase
+
+> Hbase 源连接器
+
+## 描述
+
+从 Apache Hbase 读取数据。
+
+## 主要功能
+
+- [x] [批处理](../../concept/connector-v2-features.md)
+- [ ] [流处理](../../concept/connector-v2-features.md)
+- [ ] [精确一次](../../concept/connector-v2-features.md)
+- [x] [Schema](../../concept/connector-v2-features.md)
+- [x] [并行度](../../concept/connector-v2-features.md)
+- [ ] [支持用户定义的拆分](../../concept/connector-v2-features.md)
+
+## 选项
+
+|         名称         |   类型    | 必填 |  默认值  |
+|--------------------|---------|----|-------|
+| zookeeper_quorum   | string  | 是  | -     |
+| table              | string  | 是  | -     |
+| schema             | config  | 是  | -     |
+| hbase_extra_config | string  | 否  | -     |
+| caching            | int     | 否  | -1    |
+| batch              | int     | 否  | -1    |
+| cache_blocks       | boolean | 否  | false |
+| common-options     |         | 否  | -     |
+
+### zookeeper_quorum [string]
+
+hbase的zookeeper集群主机,例如:“hadoop001:2181,hadoop002:2181,hadoop003:2181”
+
+### table [string]
+
+要写入的表名,例如:“seatunnel”
+
+### schema [config]
+
+Hbase 
使用字节数组进行存储。因此,您需要为表中的每一列配置数据类型。有关更多信息,请参阅:[guide](../../concept/schema-feature.md#how-to-declare-type-supported)。
+
+### hbase_extra_config [config]
+
+hbase 的额外配置
+
+### caching
+
+caching 参数用于设置在扫描过程中一次从服务器端获取的行数。这可以减少客户端与服务器之间的往返次数,从而提高扫描效率。默认值:-1
+
+### batch
+
+batch 参数用于设置在扫描过程中每次返回的最大列数。这对于处理有很多列的行特别有用,可以避免一次性返回过多数据,从而节省内存并提高性能。
+
+### cache_blocks
+
+cache_blocks 参数用于设置在扫描过程中是否缓存数据块。默认情况下,HBase 会在扫描时将数据块缓存到块缓存中。如果设置为 
false,则在扫描过程中不会缓存数据块,从而减少内存的使用。在SeaTunnel中默认值为: false
+
+### 常用选项
+
+Source 插件常用参数,具体请参考 [Source 常用选项](common-options.md)
+
+## 示例
+
+```bash
+source {
+  Hbase {
+    zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" 
+    table = "seatunnel_test" 
+    caching = 1000 
+    batch = 100 
+    cache_blocks = false 
+    schema = {
+      columns = [
+        { 
+          name = "rowkey" 
+          type = string 
+        },
+        {
+          name = "columnFamily1:column1"
+          type = boolean
+        },
+        {
+          name = "columnFamily1:column2" 
+          type = double
+        },
+        {
+          name = "columnFamily2:column1"
+          type = bigint
+        }
+      ]
+    }
+  }
+}
+```
+
diff --git a/docs/zh/connector-v2/source/common-options.md 
b/docs/zh/connector-v2/source/common-options.md
new file mode 100644
index 0000000000..902dca2c19
--- /dev/null
+++ b/docs/zh/connector-v2/source/common-options.md
@@ -0,0 +1,81 @@
+# Source Common Options
+
+> Source connector 的常用参数
+
+|        名称         |   类型   | 必填 | 默认值 |                                      
                                                                                
              描述                                                                
                                                                     |
+|-------------------|--------|----|-----|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| result_table_name | String | 否  | -   | 当未指定 `result_table_name` 
时,此插件处理的数据将不会被注册为可由其他插件直接访问的数据集 `(dataStream/dataset)`,或称为临时表 
`(table)`。<br/>当指定了 `result_table_name` 时,此插件处理的数据将被注册为可由其他插件直接访问的数据集 
`(dataStream/dataset)`,或称为临时表 `(table)`。此处注册的数据集 `(dataStream/dataset)` 可通过指定 
`source_table_name` 直接被其他插件访问。 |
+| parallelism       | Int    | 否  | -   | 当未指定 `parallelism` 时,默认使用环境中的 
`parallelism`。<br/>当指定了 `parallelism` 时,将覆盖环境中的 `parallelism` 设置。               
                                                                                
                                                                            |
+
+# 重要提示
+
+在作业配置中使用 `result_table_name` 时,必须设置 `source_table_name` 参数。
+
+## 任务示例
+
+### 简单示例
+
+> 注册一个流或批处理数据源,并在注册时返回表名 `fake_table`
+
+```bash
+source {
+    FakeSourceStream {
+        result_table_name = "fake_table"
+    }
+}
+```
+
+### 复杂示例
+
+> 这是将Fake数据源转换并写入到两个不同的目标中
+
+```bash
+env {
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    result_table_name = "fake"
+    row.num = 100
+    schema = {
+      fields {
+        id = "int"
+        name = "string"
+        age = "int"
+        c_timestamp = "timestamp"
+        c_date = "date"
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_decimal = "decimal(30, 8)"
+        c_row = {
+          c_row = {
+            c_int = int
+          }
+        }
+      }
+    }
+  }
+}
+
+transform {
+  Sql {
+    source_table_name = "fake"
+    result_table_name = "fake1"
+    # 查询表名必须与字段 'source_table_name' 相同
+    query = "select id, regexp_replace(name, '.+', 'b') as name, age+1 as age, 
pi() as pi, c_timestamp, c_date, c_map, c_array, c_decimal, c_row from fake"
+  }
+  # SQL 转换支持基本函数和条件操作
+  # 但不支持复杂的 SQL 操作,包括:多源表/行 JOIN 和聚合操作等
+}
+
+sink {
+  Console {
+    source_table_name = "fake1"
+  }
+   Console {
+    source_table_name = "fake"
+  }
+}
+```
+
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 88c068bee1..44a5640ffe 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -42,12 +42,6 @@ public class HbaseConfig {
                     .noDefaultValue()
                     .withDescription("Hbase rowkey column");
 
-    public static final Option<List<String>> QUERY_COLUMNS =
-            Options.key("query_columns")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription("query Hbase columns");
-
     public static final Option<String> ROWKEY_DELIMITER =
             Options.key("rowkey_delimiter")
                     .stringType()
@@ -104,6 +98,27 @@ public class HbaseConfig {
                     .withDescription(
                             "The expiration time configuration for writing 
hbase data. The default value is -1, indicating no expiration time.");
 
+    public static final Option<Boolean> HBASE_CACHE_BLOCKS_CONFIG =
+            Options.key("cache_blocks")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When it is false, data blocks are not cached. 
When it is true, data blocks are cached. This value should be set to false when 
scanning a large amount of data to reduce memory consumption. The default value 
is false");
+
+    public static final Option<Integer> HBASE_CACHING_CONFIG =
+            Options.key("caching")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "Set the number of rows read from the server each 
time can reduce the number of round trips between the client and the server, 
thereby improving performance. The default value is -1.");
+
+    public static final Option<Integer> HBASE_BATCH_CONFIG =
+            Options.key("batch")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "Set the batch size to control the maximum number 
of cells returned each time, thereby controlling the amount of data returned by 
a single RPC call. The default value is -1.");
+
     public enum NullMode {
         SKIP,
         EMPTY;
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 490e248107..c25f04b375 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -30,10 +30,12 @@ import java.util.Map;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_BATCH_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHE_BLOCKS_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHING_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
@@ -60,8 +62,14 @@ public class HbaseParameters implements Serializable {
 
     private Map<String, String> hbaseExtraConfig;
 
+    @Builder.Default private int caching = HBASE_CACHING_CONFIG.defaultValue();
+
+    @Builder.Default private int batch = HBASE_BATCH_CONFIG.defaultValue();
+
     @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();
 
+    @Builder.Default private boolean cacheBlocks = 
HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
+
     @Builder.Default private String rowkeyDelimiter = 
ROWKEY_DELIMITER.defaultValue();
 
     @Builder.Default private HbaseConfig.NullMode nullMode = 
NULL_MODE.defaultValue();
@@ -72,7 +80,7 @@ public class HbaseParameters implements Serializable {
 
     @Builder.Default private HbaseConfig.EnCoding enCoding = 
ENCODING.defaultValue();
 
-    public static HbaseParameters buildWithConfig(Config pluginConfig) {
+    public static HbaseParameters buildWithSinkConfig(Config pluginConfig) {
         HbaseParametersBuilder builder = HbaseParameters.builder();
 
         // required parameters
@@ -113,18 +121,26 @@ public class HbaseParameters implements Serializable {
         return builder.build();
     }
 
-    public static HbaseParameters buildWithSinkConfig(Config pluginConfig) {
+    public static HbaseParameters buildWithSourceConfig(Config pluginConfig) {
         HbaseParametersBuilder builder = HbaseParameters.builder();
 
         // required parameters
         
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
         builder.table(pluginConfig.getString(TABLE.key()));
-        builder.columns(pluginConfig.getStringList(QUERY_COLUMNS.key()));
 
         if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
             Config extraConfig = 
pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
             
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
         }
+        if (pluginConfig.hasPath(HBASE_CACHING_CONFIG.key())) {
+            builder.caching(pluginConfig.getInt(HBASE_CACHING_CONFIG.key()));
+        }
+        if (pluginConfig.hasPath(HBASE_BATCH_CONFIG.key())) {
+            builder.batch(pluginConfig.getInt(HBASE_BATCH_CONFIG.key()));
+        }
+        if (pluginConfig.hasPath(HBASE_CACHE_BLOCKS_CONFIG.key())) {
+            
builder.cacheBlocks(pluginConfig.getBoolean(HBASE_CACHE_BLOCKS_CONFIG.key()));
+        }
         return builder.build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
index 848e1e8205..4f7b929223 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java
@@ -79,7 +79,7 @@ public class HbaseSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
-        this.hbaseParameters = HbaseParameters.buildWithConfig(pluginConfig);
+        this.hbaseParameters = 
HbaseParameters.buildWithSinkConfig(pluginConfig);
         if (hbaseParameters.getFamilyNames().size() == 0) {
             throw new HbaseConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
index 869e33f623..3aca316151 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSource.java
@@ -44,7 +44,6 @@ import com.google.common.collect.Lists;
 
 import java.util.List;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
 
@@ -68,8 +67,7 @@ public class HbaseSource
     HbaseSource(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
         CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, ZOOKEEPER_QUORUM.key(), TABLE.key(), 
QUERY_COLUMNS.key());
+                CheckConfigUtil.checkAllExists(pluginConfig, 
ZOOKEEPER_QUORUM.key(), TABLE.key());
         if (!result.isSuccess()) {
             throw new HbaseConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -77,7 +75,7 @@ public class HbaseSource
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
-        this.hbaseParameters = 
HbaseParameters.buildWithSinkConfig(pluginConfig);
+        this.hbaseParameters = 
HbaseParameters.buildWithSourceConfig(pluginConfig);
         this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
         this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
index 4eec3e0048..2de385dbd1 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceFactory.java
@@ -45,7 +45,6 @@ public class HbaseSourceFactory implements TableSourceFactory 
{
         return OptionRule.builder()
                 .required(HbaseConfig.ZOOKEEPER_QUORUM)
                 .required(HbaseConfig.TABLE)
-                .required(HbaseConfig.QUERY_COLUMNS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
index 556374844e..526ac826db 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -39,13 +40,13 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Deque;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
 
 @Slf4j
 public class HbaseSourceReader implements SourceReader<SeaTunnelRow, 
HbaseSourceSplit> {
@@ -54,7 +55,6 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
 
     private final transient Map<String, byte[][]> namesMap;
 
-    private final Set<String> columnFamilies = new LinkedHashSet<>();
     private final SourceReader.Context context;
     private final SeaTunnelRowType seaTunnelRowType;
     private volatile boolean noMoreSplit = false;
@@ -74,16 +74,17 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
         this.seaTunnelRowType = seaTunnelRowType;
         this.namesMap = Maps.newConcurrentMap();
 
-        this.columnNames = hbaseParameters.getColumns();
+        this.columnNames =
+                Arrays.asList(seaTunnelRowType.getFieldNames()).stream()
+                        .filter(name -> !ROW_KEY.equals(name))
+                        .collect(Collectors.toList());
         // Check if input column names are in format: [ columnFamily:column ].
         this.columnNames.stream()
-                .peek(
+                .forEach(
                         column ->
                                 Preconditions.checkArgument(
-                                        (column.contains(":") && 
column.split(":").length == 2)
-                                                || 
this.ROW_KEY.equalsIgnoreCase(column),
-                                        "Invalid column names, it should be 
[ColumnFamily:Column] format"))
-                .forEach(column -> 
this.columnFamilies.add(column.split(":")[0]));
+                                        column.contains(":") && 
column.split(":").length == 2,
+                                        "Invalid column names, it should be 
[ColumnFamily:Column] format"));
 
         connection = HbaseConnectionUtil.getHbaseConnection(hbaseParameters);
     }
@@ -122,6 +123,15 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
                     Scan scan = new Scan();
                     scan.withStartRow(split.getStartRow(), true);
                     scan.withStopRow(split.getEndRow(), true);
+                    scan.setCacheBlocks(hbaseParameters.isCacheBlocks());
+                    scan.setCaching(hbaseParameters.getCaching());
+                    scan.setBatch(hbaseParameters.getBatch());
+                    for (String columnName : this.columnNames) {
+                        String[] columnNameSplit = columnName.split(":");
+                        scan.addColumn(
+                                Bytes.toBytes(columnNameSplit[0]),
+                                Bytes.toBytes(columnNameSplit[1]));
+                    }
                     this.currentScanner =
                             this.connection
                                     
.getTable(TableName.valueOf(hbaseParameters.getTable()))
@@ -152,7 +162,7 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
             byte[] bytes;
             try {
                 // handle rowkey column
-                if (this.ROW_KEY.equals(columnName)) {
+                if (ROW_KEY.equals(columnName)) {
                     bytes = result.getRow();
                 } else {
                     byte[][] arr = this.namesMap.get(columnName);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
index 13a7a8805a..85ceef9235 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java
@@ -93,18 +93,7 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testHbaseSink(TestContainer container) throws IOException, 
InterruptedException {
-        deleteData(table);
-        Container.ExecResult sinkExecResult = 
container.executeJob("/fake-to-hbase.conf");
-        Assertions.assertEquals(0, sinkExecResult.getExitCode());
-        Table hbaseTable = hbaseConnection.getTable(table);
-        Scan scan = new Scan();
-        ResultScanner scanner = hbaseTable.getScanner(scan);
-        ArrayList<Result> results = new ArrayList<>();
-        for (Result result : scanner) {
-            results.add(result);
-        }
-        Assertions.assertEquals(results.size(), 5);
-        scanner.close();
+        fakeToHbase(container);
         Container.ExecResult sourceExecResult = 
container.executeJob("/hbase-to-assert.conf");
         Assertions.assertEquals(0, sourceExecResult.getExitCode());
     }
@@ -177,6 +166,30 @@ public class HbaseIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(cf2Count, 5);
     }
 
+    @TestTemplate
+    public void testHbaseSourceWithBatchQuery(TestContainer container)
+            throws IOException, InterruptedException {
+        fakeToHbase(container);
+        Container.ExecResult sourceExecResult =
+                
container.executeJob("/hbase-source-to-assert-with-batch-query.conf");
+        Assertions.assertEquals(0, sourceExecResult.getExitCode());
+    }
+
+    private void fakeToHbase(TestContainer container) throws IOException, 
InterruptedException {
+        deleteData(table);
+        Container.ExecResult sinkExecResult = 
container.executeJob("/fake-to-hbase.conf");
+        Assertions.assertEquals(0, sinkExecResult.getExitCode());
+        Table hbaseTable = hbaseConnection.getTable(table);
+        Scan scan = new Scan();
+        ResultScanner scanner = hbaseTable.getScanner(scan);
+        ArrayList<Result> results = new ArrayList<>();
+        for (Result result : scanner) {
+            results.add(result);
+        }
+        Assertions.assertEquals(results.size(), 5);
+        scanner.close();
+    }
+
     private void deleteData(TableName table) throws IOException {
         Table hbaseTable = hbaseConnection.getTable(table);
         Scan scan = new Scan();
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-to-assert-with-batch-query.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-to-assert-with-batch-query.conf
new file mode 100644
index 0000000000..c89cf28e25
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/resources/hbase-source-to-assert-with-batch-query.conf
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hbase {
+      zookeeper_quorum = "hbase_e2e:2181"
+      table = "seatunnel_test"
+      query_columns=["rowkey", "info:age", "info:c_double", 
"info:c_boolean","info:c_bigint","info:c_smallint","info:c_tinyint","info:c_float"]
+      caching = 1000
+      batch = 100
+      cache_blocks = false
+      schema = {
+            columns = [
+                  {
+                     name = rowkey
+                     type = string
+                  },
+                  {
+                     name = "info:age"
+                     type = int
+                  },
+                  {
+                     name = "info:c_double"
+                     type = double
+                  },
+                  {
+                     name = "info:c_boolean"
+                     type = boolean
+                  },
+                  {
+                     name = "info:c_bigint"
+                     type = bigint
+                  },
+                  {
+                     name = "info:c_smallint"
+                     type = smallint
+                  },
+                  {
+                     name = "info:c_tinyint"
+                     type = tinyint
+                  },
+                  {
+                     name = "info:c_float"
+                     type = float
+                  }
+             ]
+       }
+    }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        },
+        {
+          rule_type = MIN_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = rowkey
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:c_boolean"
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:c_double"
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:c_bigint"
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = "info:age"
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file


Reply via email to