This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new adafd80255 [feature][connector-v2-hbase-sink] Support Connector v2 
HBase sink TTL data writing (#7116)
adafd80255 is described below

commit adafd802551b9acf9c942965e4f67986a92b07a8
Author: Jast <745925...@qq.com>
AuthorDate: Sat Jul 6 10:00:04 2024 +0800

    [feature][connector-v2-hbase-sink] Support Connector v2 HBase sink TTL data 
writing (#7116)
---
 docs/en/connector-v2/sink/Hbase.md                                 | 5 +++++
 docs/zh/connector-v2/sink/Hbase.md                                 | 5 +++++
 .../seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java   | 7 +++++++
 .../connectors/seatunnel/hbase/config/HbaseParameters.java         | 6 ++++++
 .../seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java | 3 +++
 5 files changed, 26 insertions(+)

diff --git a/docs/en/connector-v2/sink/Hbase.md 
b/docs/en/connector-v2/sink/Hbase.md
index 58c0a16c34..51cb4b3362 100644
--- a/docs/en/connector-v2/sink/Hbase.md
+++ b/docs/en/connector-v2/sink/Hbase.md
@@ -26,6 +26,7 @@ Output data to Hbase
 | encoding           | string  | no       | utf8            |
 | hbase_extra_config | string  | no       | -               |
 | common-options     |         | no       | -               |
+| ttl                | long    | no       | -               |
 
 ### zookeeper_quorum [string]
 
@@ -95,6 +96,10 @@ The encoding of string field, support [`utf8`, `gbk`], 
default `utf8`
 
 The extra configuration of hbase
 
+### ttl [long]
+
+Hbase writes data TTL time, the default is based on the TTL set in the table, 
unit: milliseconds
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
diff --git a/docs/zh/connector-v2/sink/Hbase.md 
b/docs/zh/connector-v2/sink/Hbase.md
index 9e79ed9799..a9839dbafa 100644
--- a/docs/zh/connector-v2/sink/Hbase.md
+++ b/docs/zh/connector-v2/sink/Hbase.md
@@ -26,6 +26,7 @@
 | encoding           | string  | no   | utf8            |
 | hbase_extra_config | string  | no   | -               |
 | common-options     |         | no   | -               |
+| ttl                | long    | no   | -               |
 
 ### zookeeper_quorum [string]
 
@@ -95,6 +96,10 @@ hbase 客户端的写入缓冲区大小,默认 8 * 1024 * 1024
 
 hbase扩展配置
 
+### ttl [long]
+
+hbase 写入数据 TTL 时间,默认以表设置的TTL为准,单位毫秒
+
 ### 常见选项
 
 Sink 插件常用参数,详见 Sink 常用选项 [Sink Common Options](common-options.md)
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
index 565f1b4b48..88c068bee1 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java
@@ -97,6 +97,13 @@ public class HbaseConfig {
                     .noDefaultValue()
                     .withDescription("Hbase extra config");
 
+    public static final Option<Long> HBASE_TTL_CONFIG =
+            Options.key("ttl")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            "The expiration time configuration for writing 
hbase data. The default value is -1, indicating no expiration time.");
+
     public enum NullMode {
         SKIP,
         EMPTY;
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
index 858030fe2a..490e248107 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
+import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS;
 import static 
org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
@@ -59,6 +60,8 @@ public class HbaseParameters implements Serializable {
 
     private Map<String, String> hbaseExtraConfig;
 
+    @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();
+
     @Builder.Default private String rowkeyDelimiter = 
ROWKEY_DELIMITER.defaultValue();
 
     @Builder.Default private HbaseConfig.NullMode nullMode = 
NULL_MODE.defaultValue();
@@ -80,6 +83,9 @@ public class HbaseParameters implements Serializable {
                 
TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key())));
 
         // optional parameters
+        if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) {
+            builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key()));
+        }
         if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) {
             
builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key()));
         }
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
index 0455245c67..72722e582e 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java
@@ -117,6 +117,9 @@ public class HbaseSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
             timestamp = (Long) row.getField(versionColumnIndex);
         }
         Put put = new Put(rowkey, timestamp);
+        if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) {
+            put.setTTL(hbaseParameters.getTtl());
+        }
         if (!hbaseParameters.isWalWrite()) {
             put.setDurability(Durability.SKIP_WAL);
         }

Reply via email to