This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new adafd80255 [feature][connector-v2-hbase-sink] Support Connector v2 HBase sink TTL data writing (#7116) adafd80255 is described below commit adafd802551b9acf9c942965e4f67986a92b07a8 Author: Jast <745925...@qq.com> AuthorDate: Sat Jul 6 10:00:04 2024 +0800 [feature][connector-v2-hbase-sink] Support Connector v2 HBase sink TTL data writing (#7116) --- docs/en/connector-v2/sink/Hbase.md | 5 +++++ docs/zh/connector-v2/sink/Hbase.md | 5 +++++ .../seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java | 7 +++++++ .../connectors/seatunnel/hbase/config/HbaseParameters.java | 6 ++++++ .../seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java | 3 +++ 5 files changed, 26 insertions(+) diff --git a/docs/en/connector-v2/sink/Hbase.md b/docs/en/connector-v2/sink/Hbase.md index 58c0a16c34..51cb4b3362 100644 --- a/docs/en/connector-v2/sink/Hbase.md +++ b/docs/en/connector-v2/sink/Hbase.md @@ -26,6 +26,7 @@ Output data to Hbase | encoding | string | no | utf8 | | hbase_extra_config | string | no | - | | common-options | | no | - | +| ttl | long | no | - | ### zookeeper_quorum [string] @@ -95,6 +96,10 @@ The encoding of string field, support [`utf8`, `gbk`], default `utf8` The extra configuration of hbase +### ttl [long] + +Hbase writes data TTL time, the default is based on the TTL set in the table, unit: milliseconds + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/docs/zh/connector-v2/sink/Hbase.md b/docs/zh/connector-v2/sink/Hbase.md index 9e79ed9799..a9839dbafa 100644 --- a/docs/zh/connector-v2/sink/Hbase.md +++ b/docs/zh/connector-v2/sink/Hbase.md @@ -26,6 +26,7 @@ | encoding | string | no | utf8 | | hbase_extra_config | string | no | - | | common-options | | no | - | +| ttl | long | no | - | ### zookeeper_quorum [string] @@ -95,6 +96,10 @@ hbase 客户端的写入缓冲区大小,默认 8 * 1024 * 1024 hbase扩展配置 +### ttl [long] + +hbase 写入数据 TTL 时间,默认以表设置的TTL为准,单位毫秒 + ### 常见选项 Sink 插件常用参数,详见 Sink 常用选项 [Sink Common Options](common-options.md) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java index 565f1b4b48..88c068bee1 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java @@ -97,6 +97,13 @@ public class HbaseConfig { .noDefaultValue() .withDescription("Hbase extra config"); + public static final Option<Long> HBASE_TTL_CONFIG = + Options.key("ttl") + .longType() + .defaultValue(-1L) + .withDescription( + "The expiration time configuration for writing hbase data. The default value is -1, indicating no expiration time."); + public enum NullMode { SKIP, EMPTY; diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java index 858030fe2a..490e248107 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java @@ -31,6 +31,7 @@ import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG; +import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.QUERY_COLUMNS; import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS; @@ -59,6 +60,8 @@ public class HbaseParameters implements Serializable { private Map<String, String> hbaseExtraConfig; + @Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue(); + @Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue(); @Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue(); @@ -80,6 +83,9 @@ public class HbaseParameters implements Serializable { TypesafeConfigUtils.configToMap(pluginConfig.getConfig(FAMILY_NAME.key()))); // optional parameters + if (pluginConfig.hasPath(HBASE_TTL_CONFIG.key())) { + builder.ttl(pluginConfig.getLong(HBASE_TTL_CONFIG.key())); + } if (pluginConfig.hasPath(ROWKEY_DELIMITER.key())) { builder.rowkeyDelimiter(pluginConfig.getString(ROWKEY_DELIMITER.key())); } diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 0455245c67..72722e582e 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -117,6 +117,9 @@ public class HbaseSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> { timestamp = (Long) row.getField(versionColumnIndex); } Put put = new Put(rowkey, timestamp); + if (hbaseParameters.getTtl() != -1 && hbaseParameters.getTtl() > 0) { + put.setTTL(hbaseParameters.getTtl()); + } if (!hbaseParameters.isWalWrite()) { put.setDurability(Durability.SKIP_WAL); }