This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3a2f719d2e [Feature][Transform] Add Zeta SQL UDF context and lifecycle
hooks with e2e/docs updates (#10489)
3a2f719d2e is described below
commit 3a2f719d2e71c712ffc43baa65cc8ed2f7fa175b
Author: cosmosni <[email protected]>
AuthorDate: Tue Mar 17 21:48:23 2026 +0800
[Feature][Transform] Add Zeta SQL UDF context and lifecycle hooks with
e2e/docs updates (#10489)
---
docs/en/transforms/sql-udf.md | 85 ++++++++++++++-
docs/zh/transforms/sql-udf.md | 85 ++++++++++++++-
.../seatunnel/e2e/transform/udf/ExampleUdfIT.java | 8 ++
.../resources/custom_udf_context_lifecycle.conf | 68 ++++++++++++
.../seatunnel/e2e/transform/udf/EncryptUDF.java | 83 +++++++++++++++
.../transform/sql/zeta/ZetaSQLEngine.java | 53 +++++++++-
.../transform/sql/zeta/ZetaSQLFunction.java | 21 ++++
.../seatunnel/transform/sql/zeta/ZetaUDF.java | 29 +++++-
.../transform/sql/zeta/ZetaUDFContext.java | 116 +++++++++++++++++++++
9 files changed, 541 insertions(+), 7 deletions(-)
diff --git a/docs/en/transforms/sql-udf.md b/docs/en/transforms/sql-udf.md
index 7b3d02d105..bc331b7051 100644
--- a/docs/en/transforms/sql-udf.md
+++ b/docs/en/transforms/sql-udf.md
@@ -34,9 +34,48 @@ public interface ZetaUDF {
* @return result value
*/
Object evaluate(List<Object> args);
+
+ /**
+ * Whether current udf requires row level context.
+ */
+ default boolean requiresContext() {
+ return false;
+ }
+
+ /**
+ * Evaluate with row level context.
+ */
+ default Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
+ return evaluate(args);
+ }
+
+ /**
+ * Initialize udf resources.
+ */
+ default void open() throws Exception {}
+
+ /**
+ * Release udf resources.
+ */
+ default void close() {}
}
```
+`ZetaUDFContext` provides runtime row-level metadata and fields:
+
+- `getRawTableId()`
+- `getDatabase()`
+- `getSchema()`
+- `getTable()`
+- `getRowKind()`
+- `getAllFields()`
+
+Notes:
+
+- `database/schema/table` parsing follows `TablePath.of(tableId)` semantics.
+- If `tableId` is in an unsupported format, accessing `database/schema/table`
throws `IllegalArgumentException`.
+- Existing UDFs remain backward compatible and continue using
`evaluate(List<Object> args)`.
+
## UDF Implements Example
Add these dependencies and provided scope to your maven project. **Dependency
versions should match the runtime environment.**
@@ -94,6 +133,50 @@ public class ExampleUDF implements ZetaUDF {
Package the UDF project and copy the jar to the path: ${SEATUNNEL_HOME}/lib.
And if your UDF use third party library, you also need put it to
${SEATUNNEL_HOME}/lib.
If you use cluster mode, you need put the lib to all your node's
${SEATUNNEL_HOME}/lib folder and re-start the cluster.
+## Context-aware & lifecycle UDF example
+
+```java
+@AutoService(ZetaUDF.class)
+public class ContextLifecycleUdf implements ZetaUDF {
+
+ private transient String prefix;
+
+ @Override
+ public String functionName() {
+ return "CTX_LIFE";
+ }
+
+ @Override
+ public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>>
argsType) {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ public boolean requiresContext() {
+ return true;
+ }
+
+ @Override
+ public void open() {
+ this.prefix = "OPENED";
+ }
+
+ @Override
+ public Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
+ String arg = args.get(0) == null ? null : String.valueOf(args.get(0));
+ if (arg == null) {
+ return null;
+ }
+ return prefix + ":" + context.getRowKind().shortString() + ":" + arg;
+ }
+
+ @Override
+ public void close() {
+ this.prefix = null;
+ }
+}
+```
+
## Example
The data read from source is a table like this:
@@ -130,4 +213,4 @@ Then the data in result table `fake1` will update to
### new version
-- Add UDF of SQL Transform Connector
+- Add UDF of SQL Transform Connector
\ No newline at end of file
diff --git a/docs/zh/transforms/sql-udf.md b/docs/zh/transforms/sql-udf.md
index fdaaaa393b..ee6c56a159 100644
--- a/docs/zh/transforms/sql-udf.md
+++ b/docs/zh/transforms/sql-udf.md
@@ -34,9 +34,48 @@ public interface ZetaUDF {
* @return result value
*/
Object evaluate(List<Object> args);
+
+ /**
+ * 是否需要行级上下文。
+ */
+ default boolean requiresContext() {
+ return false;
+ }
+
+ /**
+ * 带上下文执行。
+ */
+ default Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
+ return evaluate(args);
+ }
+
+ /**
+ * 初始化 UDF 资源。
+ */
+ default void open() throws Exception {}
+
+ /**
+ * 释放 UDF 资源。
+ */
+ default void close() {}
}
```
+`ZetaUDFContext` 提供运行时行级元数据与字段:
+
+- `getRawTableId()`
+- `getDatabase()`
+- `getSchema()`
+- `getTable()`
+- `getRowKind()`
+- `getAllFields()`
+
+说明:
+
+- `database/schema/table` 的解析语义与 `TablePath.of(tableId)` 保持一致。
+- 如果 `tableId` 格式不被支持,访问 `database/schema/table` 时会抛出
`IllegalArgumentException`。
+- 已有 UDF 保持向后兼容,仍可只实现 `evaluate(List<Object> args)`。
+
## UDF 实现示例
将这些依赖项添加到您的 Maven 项目,并使用 provided 作用域。**依赖版本应与运行环境一致。**
@@ -93,6 +132,50 @@ public class ExampleUDF implements ZetaUDF {
打包UDF项目并将jar文件复制到路径:${SEATUNNEL_HOME}/lib
+## 支持上下文与生命周期的 UDF 示例
+
+```java
+@AutoService(ZetaUDF.class)
+public class ContextLifecycleUdf implements ZetaUDF {
+
+ private transient String prefix;
+
+ @Override
+ public String functionName() {
+ return "CTX_LIFE";
+ }
+
+ @Override
+ public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>>
argsType) {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ public boolean requiresContext() {
+ return true;
+ }
+
+ @Override
+ public void open() {
+ this.prefix = "OPENED";
+ }
+
+ @Override
+ public Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
+ String arg = args.get(0) == null ? null : String.valueOf(args.get(0));
+ if (arg == null) {
+ return null;
+ }
+ return prefix + ":" + context.getRowKind().shortString() + ":" + arg;
+ }
+
+ @Override
+ public void close() {
+ this.prefix = null;
+ }
+}
+```
+
## 示例
源端数据读取的表格如下:
@@ -129,4 +212,4 @@ transform {
### 新版本
-- 添加SQL转换连接器的UDF
+- 添加SQL转换连接器的UDF
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
index 5cf7f32135..b0d016969c 100644
---
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/java/org/apache/seatunnel/e2e/transform/udf/ExampleUdfIT.java
@@ -42,4 +42,12 @@ public class ExampleUdfIT extends TestSuiteBase {
Container.ExecResult execResult =
container.executeJob("/custom_udf.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}
+
+ @TestTemplate
+ public void testCustomUdfContextLifecycle(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/custom_udf_context_lifecycle.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/resources/custom_udf_context_lifecycle.conf
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/resources/custom_udf_context_lifecycle.conf
new file mode 100644
index 0000000000..ec5c8eb707
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-udf/src/test/resources/custom_udf_context_lifecycle.conf
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10000
+}
+
+source {
+ FakeSource {
+ plugin_output = "fake"
+ schema = {
+ fields {
+ id = "int"
+ name = "string"
+ }
+ }
+ rows = [
+ {fields = [1, "Hello World"], kind = INSERT}
+ ]
+ }
+}
+
+transform {
+ sql {
+ plugin_input = "fake"
+ plugin_output = "fake1"
+ query = "select id, ENCRYPT(name) as name from dual"
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = "fake1"
+ rules = {
+ field_rules = [
+ {
+ field_name = "id"
+ field_type = "int"
+ field_value = [
+ {equals_to = 1}
+ ]
+ },
+ {
+ field_name = "name"
+ field_type = "string"
+ field_value = [
+ {equals_to = "ENC(3135317):Hello World"}
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java
new file mode 100644
index 0000000000..a764652785
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-udf/src/main/java/org/apache/seatunnel/e2e/transform/udf/EncryptUDF.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.transform.udf;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.transform.sql.zeta.ZetaUDF;
+import org.apache.seatunnel.transform.sql.zeta.ZetaUDFContext;
+
+import com.google.auto.service.AutoService;
+
+import java.util.List;
+
+@AutoService(ZetaUDF.class)
+public class EncryptUDF implements ZetaUDF {
+
+ private transient CryptoClient client;
+
+ @Override
+ public String functionName() {
+ return "ENCRYPT";
+ }
+
+ @Override
+ public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>>
argsType) {
+ return BasicType.STRING_TYPE;
+ }
+
+ @Override
+ public void open() {
+ this.client = new CryptoClient();
+ }
+
+ @Override
+ public boolean requiresContext() {
+ return true;
+ }
+
+ @Override
+ public Object evaluate(List<Object> args) {
+ throw new UnsupportedOperationException("ENCRYPT should be called with
context");
+ }
+
+ @Override
+ public Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
+ if (client == null) {
+ throw new IllegalStateException("open() was not called before
evaluateWithContext()");
+ }
+ Object value = args.get(0);
+ if (value == null) {
+ return null;
+ }
+ String tableId = context.getRawTableId();
+ return client.encrypt(value, tableId);
+ }
+
+ @Override
+ public void close() {
+ this.client = null;
+ }
+
+ private static class CryptoClient {
+ private String encrypt(Object value, String tableId) {
+ int keySeed = tableId == null ? 0 : tableId.hashCode();
+ return "ENC(" + keySeed + "):" + value;
+ }
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index fff51a8998..86a12e6126 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -49,6 +49,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
@@ -68,6 +69,8 @@ public class ZetaSQLEngine implements SQLEngine {
private ZetaSQLFunction zetaSQLFunction;
private ZetaSQLFilter zetaSQLFilter;
private ZetaSQLType zetaSQLType;
+ private List<ZetaUDF> udfList = Collections.emptyList();
+ private ZetaUDFContext udfContext;
private Integer allColumnsCount = null;
@@ -84,15 +87,38 @@ public class ZetaSQLEngine implements SQLEngine {
this.inputRowType = inputRowType;
this.sql = sql;
- List<ZetaUDF> udfList = new ArrayList<>();
- ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
- ServiceLoader.load(ZetaUDF.class, classLoader).forEach(udfList::add);
+ udfList = loadUDFs();
+ udfContext = new ZetaUDFContext();
this.zetaSQLType = new ZetaSQLType(inputRowType, udfList);
- this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType,
udfList);
+ this.zetaSQLFunction = new ZetaSQLFunction(inputRowType, zetaSQLType,
udfList, udfContext);
this.zetaSQLFilter = new ZetaSQLFilter(zetaSQLFunction, zetaSQLType);
parseSQL();
+ openUDFs();
+ }
+
+ protected List<ZetaUDF> loadUDFs() {
+ List<ZetaUDF> loadedUdfs = new ArrayList<>();
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ ServiceLoader.load(ZetaUDF.class,
classLoader).forEach(loadedUdfs::add);
+ return loadedUdfs;
+ }
+
+ private void openUDFs() {
+ for (int i = 0; i < udfList.size(); i++) {
+ ZetaUDF udf = udfList.get(i);
+ try {
+ udf.open();
+ } catch (Exception e) {
+ closeUDFs(i - 1);
+ log.error("Open udf {} failed", udf.functionName(), e);
+ throw new TransformException(
+ CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
+ String.format(
+ "Open udf %s failed: %s", udf.functionName(),
e.getMessage()));
+ }
+ }
}
private void parseSQL() {
@@ -239,6 +265,7 @@ public class ZetaSQLEngine implements SQLEngine {
// ------Physical Query Plan Execution------
// Scan Table
Object[] inputFields = scanTable(inputRow);
+ zetaSQLFunction.updateUDFContext(inputFields, inputRow);
// Filter
try {
@@ -313,4 +340,22 @@ public class ZetaSQLEngine implements SQLEngine {
- allColumnsCnt;
return allColumnsCount;
}
+
+ @Override
+ public void close() {
+ if (udfList == null || udfList.isEmpty()) {
+ return;
+ }
+ closeUDFs(udfList.size() - 1);
+ }
+
+ private void closeUDFs(int lastIndex) {
+ for (int i = lastIndex; i >= 0; i--) {
+ try {
+ udfList.get(i).close();
+ } catch (Exception e) {
+ log.warn("Close udf {} failed", udfList.get(i).functionName(),
e);
+ }
+ }
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
index de1a78eae2..e339434614 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java
@@ -225,13 +225,30 @@ public class ZetaSQLFunction {
private final ZetaSQLFilter zetaSQLFilter;
private final List<ZetaUDF> udfList;
+ private final ZetaUDFContext udfContext;
public ZetaSQLFunction(
SeaTunnelRowType inputRowType, ZetaSQLType zetaSQLType,
List<ZetaUDF> udfList) {
+ this(inputRowType, zetaSQLType, udfList, null);
+ }
+
+ public ZetaSQLFunction(
+ SeaTunnelRowType inputRowType,
+ ZetaSQLType zetaSQLType,
+ List<ZetaUDF> udfList,
+ ZetaUDFContext udfContext) {
this.inputRowType = inputRowType;
this.zetaSQLType = zetaSQLType;
this.zetaSQLFilter = new ZetaSQLFilter(this, zetaSQLType);
this.udfList = udfList;
+ this.udfContext = udfContext;
+ }
+
+ public void updateUDFContext(Object[] fields, SeaTunnelRow row) {
+ if (udfContext == null) {
+ return;
+ }
+ udfContext.update(fields, row);
}
public Object computeForValue(Expression expression, Object[] inputFields)
{
@@ -648,6 +665,9 @@ public class ZetaSQLFunction {
default:
for (ZetaUDF udf : udfList) {
if (udf.functionName().equalsIgnoreCase(functionName)) {
+ if (udf.requiresContext() && udfContext != null) {
+ return udf.evaluateWithContext(args, udfContext);
+ }
return udf.evaluate(args);
}
}
@@ -847,6 +867,7 @@ public class ZetaSQLFunction {
} else if (expression instanceof Function) {
List<SeaTunnelRow> next = new ArrayList<>();
for (SeaTunnelRow row : seaTunnelRows) {
+ updateUDFContext(row.getFields(), row);
Object splitFieldValue = computeForValue(expression,
row.getFields());
transformExplodeValue(
splitFieldValue,
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
index 6d14789c96..7281e9605e 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDF.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.transform.sql.zeta;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import java.io.Serializable;
import java.util.List;
-public interface ZetaUDF {
+public interface ZetaUDF extends Serializable {
/**
* Function name
*
@@ -44,4 +45,30 @@ public interface ZetaUDF {
* @return result value
*/
Object evaluate(List<Object> args);
+
+ /**
+ * Whether current udf requires row level context.
+ *
+ * @return true means engine should call evaluateWithContext instead of
evaluate
+ */
+ default boolean requiresContext() {
+ return false;
+ }
+
+ /**
+ * Evaluate with row level context.
+ *
+ * @param args input arguments
+ * @param context row context
+ * @return result value
+ */
+ default Object evaluateWithContext(List<Object> args, ZetaUDFContext
context) {
+ return evaluate(args);
+ }
+
+ /** Initialize udf resources. */
+ default void open() throws Exception {}
+
+ /** Release udf resources. */
+ default void close() {}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
new file mode 100644
index 0000000000..b42dc9473c
--- /dev/null
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaUDFContext.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.transform.sql.zeta;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Runtime context for zeta udf execution. */
+public class ZetaUDFContext {
+ private static final Object[] EMPTY_FIELDS = new Object[0];
+
+ @Nullable private String rawTableId;
+ private boolean tableIdIsNull;
+ @Nullable private String database;
+ @Nullable private String schema;
+ @Nullable private String table;
+ @Nullable private IllegalArgumentException tablePathParseException;
+ private boolean tablePathResolved;
+ private RowKind rowKind = RowKind.INSERT;
+ private Object[] allFields = EMPTY_FIELDS;
+
+ public ZetaUDFContext update(SeaTunnelRow row) {
+ return update(row.getFields(), row);
+ }
+
+ public ZetaUDFContext update(Object[] fields, SeaTunnelRow row) {
+ this.allFields = fields == null ? EMPTY_FIELDS : fields;
+ this.rowKind = row.getRowKind();
+ updateTableId(row.getTableId());
+ return this;
+ }
+
+ private void updateTableId(String tableId) {
+ if (Objects.equals(this.rawTableId, tableId)) {
+ return;
+ }
+ this.rawTableId = tableId;
+ this.tableIdIsNull = tableId == null;
+ this.database = null;
+ this.schema = null;
+ this.table = null;
+ this.tablePathParseException = null;
+ this.tablePathResolved = false;
+ }
+
+ private void resolveTablePathIfNeeded() {
+ if (tablePathResolved) {
+ if (tablePathParseException != null) {
+ throw tablePathParseException;
+ }
+ return;
+ }
+ tablePathResolved = true;
+
+ if (tableIdIsNull) {
+ return;
+ }
+
+ try {
+ TablePath tablePath = TablePath.of(rawTableId);
+ this.database = tablePath.getDatabaseName();
+ this.schema = tablePath.getSchemaName();
+ this.table = tablePath.getTableName();
+ } catch (IllegalArgumentException exception) {
+ this.tablePathParseException = exception;
+ throw exception;
+ }
+ }
+
+ @Nullable public String getRawTableId() {
+ return rawTableId;
+ }
+
+ @Nullable public String getDatabase() {
+ resolveTablePathIfNeeded();
+ return database;
+ }
+
+ @Nullable public String getSchema() {
+ resolveTablePathIfNeeded();
+ return schema;
+ }
+
+ @Nullable public String getTable() {
+ resolveTablePathIfNeeded();
+ return table;
+ }
+
+ public RowKind getRowKind() {
+ return rowKind;
+ }
+
+ public Object[] getAllFields() {
+ return allFields;
+ }
+}