This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a67a2f7abd [Improve][Transform-V2] Improve sql transform exception to
locate error expression (#9227)
a67a2f7abd is described below
commit a67a2f7abd427f1fe1f2c6339062fe314e82b064
Author: misi <[email protected]>
AuthorDate: Wed May 28 21:21:37 2025 +0800
[Improve][Transform-V2] Improve sql transform exception to locate error
expression (#9227)
---
.../seatunnel/engine/e2e/JobExecutionIT.java | 2 +-
.../transform/exception/TransformCommonError.java | 14 +++++
.../exception/TransformCommonErrorCode.java | 7 ++-
.../transform/exception/TransformException.java | 5 ++
.../transform/sql/zeta/ZetaSQLEngine.java | 19 +++++--
.../seatunnel/transform/sql/SQLTransformTest.java | 61 +++++++++++++++++++++-
6 files changed, 99 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 86ec21a375..1cb5333042 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -174,7 +174,7 @@ public class JobExecutionIT {
JobResult result = clientJobProxy.getJobResultCache();
Assertions.assertEquals(result.getStatus(), JobStatus.FAILED);
-
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
+
Assertions.assertTrue(result.getError().contains("java.lang.NumberFormatException"));
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
index f0d12efd8e..46e6bf833c 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java
@@ -23,11 +23,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.EXPRESSION_EXECUTE_ERROR;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_TABLE_NOT_FOUND;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_FIELDS_NOT_FOUND;
import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.METADATA_MAPPING_FIELD_EXISTS;
+import static
org.apache.seatunnel.transform.exception.TransformCommonErrorCode.WHERE_STATEMENT_ERROR;
/** The common error of SeaTunnel transform. Please refer {@link CommonError}
*/
public class TransformCommonError {
@@ -67,4 +69,16 @@ public class TransformCommonError {
params.put("transform", transform);
return new TransformException(INPUT_TABLE_NOT_FOUND, params);
}
+
+ public static TransformException sqlExpressionError(String expression,
Throwable cause) {
+ Map<String, String> params = new HashMap<>();
+ params.put("expression", expression);
+ return new TransformException(EXPRESSION_EXECUTE_ERROR, params, cause);
+ }
+
+ public static TransformException sqlWhereStatementError(String wherebody,
Throwable cause) {
+ Map<String, String> params = new HashMap<>();
+ params.put("wherebody", wherebody);
+ return new TransformException(WHERE_STATEMENT_ERROR, params, cause);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
index f02450341e..20a2c30bf8 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java
@@ -34,7 +34,12 @@ public enum TransformCommonErrorCode implements
SeaTunnelErrorCode {
"The metadata mapping field '<field>' of '<transform>' transform
already exists in upstream schema"),
INPUT_TABLE_NOT_FOUND(
"TRANSFORM_COMMON-05",
- "The input table '<table>' of '<transform>' transform not found in
upstream schema");
+ "The input table '<table>' of '<transform>' transform not found in
upstream schema"),
+ EXPRESSION_EXECUTE_ERROR(
+ "TRANSFORM_COMMON-06", "The expression '<expression>' of SQL
transform execute failed"),
+ WHERE_STATEMENT_ERROR(
+ "TRANSFORM_COMMON-07",
+ "The where statement '<wherebody>' of SQL transform execute
failed");
private final String code;
private final String description;
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
index 77467a7bd2..149bc96e9c 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java
@@ -30,4 +30,9 @@ public class TransformException extends
SeaTunnelRuntimeException {
TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Map<String,
String> params) {
super(seaTunnelErrorCode, params);
}
+
+ TransformException(
+ SeaTunnelErrorCode seaTunnelErrorCode, Map<String, String> params,
Throwable cause) {
+ super(seaTunnelErrorCode, params, cause);
+ }
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
index 9ce552c289..2a1481aaee 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.SQLEngine;
@@ -240,9 +241,13 @@ public class ZetaSQLEngine implements SQLEngine {
Object[] inputFields = scanTable(inputRow);
// Filter
- boolean retain = zetaSQLFilter.executeFilter(selectBody.getWhere(),
inputFields);
- if (!retain) {
- return null;
+ try {
+ boolean retain =
zetaSQLFilter.executeFilter(selectBody.getWhere(), inputFields);
+ if (!retain) {
+ return null;
+ }
+ } catch (Exception e) {
+ throw
TransformCommonError.sqlWhereStatementError(selectBody.getWhere().toString(),
e);
}
// Project
@@ -280,8 +285,12 @@ public class ZetaSQLEngine implements SQLEngine {
}
} else {
Expression expression = selectItem.getExpression();
- fields[idx] = zetaSQLFunction.computeForValue(expression,
inputFields);
- idx++;
+ try {
+ fields[idx] = zetaSQLFunction.computeForValue(expression,
inputFields);
+ idx++;
+ } catch (Exception e) {
+ throw
TransformCommonError.sqlExpressionError(expression.toString(), e);
+ }
}
}
return fields;
diff --git
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
index afd6fde0ed..0d88aace0f 100644
---
a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
+++
b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/SQLTransformTest.java
@@ -397,8 +397,11 @@ public class SQLTransformTest {
new SeaTunnelRow(new Object[]
{Integer.valueOf(1), 3, "false"}));
} catch (Exception e) {
Assertions.assertEquals(
- "ErrorCode:[COMMON-05],
ErrorDescription:[Unsupported operation] - Unsupported CAST AS Boolean: 3",
+ "ErrorCode:[TRANSFORM_COMMON-06],
ErrorDescription:[The expression 'cast(`int` AS boolean)' of SQL transform
execute failed]",
e.getMessage());
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05],
ErrorDescription:[Unsupported operation] - Unsupported CAST AS Boolean: 3",
+ e.getCause().getMessage());
throw e;
}
});
@@ -411,8 +414,11 @@ public class SQLTransformTest {
new SeaTunnelRow(new Object[]
{Integer.valueOf(1), 0, "false333"}));
} catch (Exception e) {
Assertions.assertEquals(
- "ErrorCode:[COMMON-05],
ErrorDescription:[Unsupported operation] - Unsupported CAST AS Boolean:
false333",
+ "ErrorCode:[TRANSFORM_COMMON-06],
ErrorDescription:[The expression 'cast(`string` AS boolean)' of SQL transform
execute failed]",
e.getMessage());
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-05],
ErrorDescription:[Unsupported operation] - Unsupported CAST AS Boolean:
false333",
+ e.getCause().getMessage());
throw e;
}
});
@@ -441,4 +447,55 @@ public class SQLTransformTest {
Assertions.assertEquals(true, result.get(0).getField(1));
Assertions.assertEquals(false, result.get(0).getField(2));
}
+
+ @Test
+ public void testExpressionErrorField() {
+ String tableName = "test";
+ String[] fields = new String[] {"FIELD1", "FIELD2", "FIELD3"};
+ SeaTunnelDataType[] fieldTypes =
+ new SeaTunnelDataType[] {
+ BasicType.INT_TYPE, BasicType.DOUBLE_TYPE,
BasicType.STRING_TYPE
+ };
+ CatalogTable table =
+ CatalogTableUtil.getCatalogTable(
+ tableName, new SeaTunnelRowType(fields, fieldTypes));
+ String sqlQuery =
+ "select "
+ + "CAST(`FIELD1` AS STRING) AS FIELD1, "
+ + "CAST(`FIELD1` AS decimal(22,4)) AS FIELD2, "
+ + "CAST(`FIELD3` AS decimal(22,0)) AS FIELD3 "
+ + "from dual";
+
+ ReadonlyConfig config =
ReadonlyConfig.fromMap(Collections.singletonMap("query", sqlQuery));
+ SQLTransform sqlTransform = new SQLTransform(config, table);
+ Assertions.assertThrows(
+ TransformException.class,
+ () -> {
+ try {
+ sqlTransform.transformRow(
+ new SeaTunnelRow(new Object[] {1, 123.123,
"true"}));
+ } catch (Exception e) {
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-06],
ErrorDescription:[The expression 'CAST(`FIELD3` AS decimal (22, 0))' of SQL
transform execute failed]",
+ e.getMessage());
+ throw e;
+ }
+ });
+ sqlQuery = "select * from dual where FIELD1/0 > 10";
+ config = ReadonlyConfig.fromMap(Collections.singletonMap("query",
sqlQuery));
+ SQLTransform sqlTransform2 = new SQLTransform(config, table);
+ Assertions.assertThrows(
+ TransformException.class,
+ () -> {
+ try {
+ sqlTransform2.transformRow(
+ new SeaTunnelRow(new Object[] {1, 123.123,
"true"}));
+ } catch (Exception e) {
+ Assertions.assertEquals(
+ "ErrorCode:[TRANSFORM_COMMON-07],
ErrorDescription:[The where statement 'FIELD1 / 0 > 10' of SQL transform
execute failed]",
+ e.getMessage());
+ throw e;
+ }
+ });
+ }
}