This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new f248f97703 [Improve][Formats] Refactor exception catch for `ignoreParseErrors`. (#6065) f248f97703 is described below commit f248f97703b94ba187711b88014f81cc7a18c622 Author: Chengyu Yan <cheney...@hotmail.com> AuthorDate: Mon Dec 25 15:34:27 2023 +0800 [Improve][Formats] Refactor exception catch for `ignoreParseErrors`. (#6065) --- release-note.md | 1 + .../seatunnel/format/json/JsonDeserializationSchema.java | 13 +++++++------ .../apache/seatunnel/format/json/JsonToRowConverters.java | 4 ++-- .../format/json/canal/CanalJsonDeserializationSchema.java | 4 ++-- .../json/debezium/DebeziumJsonDeserializationSchema.java | 4 ++-- .../format/json/ogg/OggJsonDeserializationSchema.java | 8 ++++---- 6 files changed, 18 insertions(+), 16 deletions(-) diff --git a/release-note.md b/release-note.md index 8e29b08365..07bb3d11d5 100644 --- a/release-note.md +++ b/release-note.md @@ -105,6 +105,7 @@ - [Json] Remove assert key word. (#5919) - [Formats] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED. (#5948) +- [Formats] Refactor exception catch for `ignoreParseErrors`. (#6065) ### Connector-V2 diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java index 90387c08ef..1f3925192c 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.format.json; +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.seatunnel.shade.com.fasterxml.jackson.core.json.JsonReadFeature; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; @@ -132,11 +133,11 @@ public class JsonDeserializationSchema implements DeserializationSchema<SeaTunne } try { return (SeaTunnelRow) runtimeConverter.convert(jsonNode); - } catch (Throwable t) { + } catch (RuntimeException e) { if (ignoreParseErrors) { return null; } - throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), t); + throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), e); } } @@ -151,22 +152,22 @@ public class JsonDeserializationSchema implements DeserializationSchema<SeaTunne private JsonNode convertBytes(byte[] message) { try { return objectMapper.readTree(message); - } catch (Throwable t) { + } catch (IOException | RuntimeException e) { if (ignoreParseErrors) { return NullNode.getInstance(); } - throw CommonError.jsonOperationError(FORMAT, new String(message), t); + throw CommonError.jsonOperationError(FORMAT, new String(message), e); } } private JsonNode convert(String message) { try { return objectMapper.readTree(message); - } catch (Throwable t) { + } catch (JsonProcessingException | RuntimeException e) { if (ignoreParseErrors) { return NullNode.getInstance(); } - throw CommonError.jsonOperationError(FORMAT, new String(message), t); + throw CommonError.jsonOperationError(FORMAT, new String(message), e); } } diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java index aee3c1a896..648b2c98af 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java @@ -394,9 +394,9 @@ public class JsonToRowConverters implements Serializable { } try { return converter.convert(jsonNode); - } catch (Throwable t) { + } catch (RuntimeException e) { if (!ignoreParseErrors) { - throw t; + throw e; } return null; } diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java index 96482291ec..c33e586c7d 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/canal/CanalJsonDeserializationSchema.java @@ -169,9 +169,9 @@ public class CanalJsonDeserializationSchema implements DeserializationSchema<Sea } else { throw new IllegalStateException(format("Unknown operation type '%s'.", type)); } - } catch (Throwable t) { + } catch (RuntimeException e) { if (!ignoreParseErrors) { - throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), t); + throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), e); } } } diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java index 8d68fb379a..f51a5171fd 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java @@ -120,10 +120,10 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema< } else { throw new IllegalStateException(format("Unknown operation type '%s'.", op)); } - } catch (Throwable t) { + } catch (RuntimeException e) { // a big try catch to protect the processing. if (!ignoreParseErrors) { - throw CommonError.jsonOperationError(FORMAT, new String(message), t); + throw CommonError.jsonOperationError(FORMAT, new String(message), e); } } } diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java index 36e6e541c4..c618a120df 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/ogg/OggJsonDeserializationSchema.java @@ -176,9 +176,9 @@ public class OggJsonDeserializationSchema implements DeserializationSchema<SeaTu } else { throw new IllegalStateException(format("Unknown operation type '%s'.", op)); } - } catch (Throwable t) { + } catch (RuntimeException e) { if (!ignoreParseErrors) { - throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), t); + throw CommonError.jsonOperationError(FORMAT, jsonNode.toString(), e); } } } @@ -192,9 +192,9 @@ public class OggJsonDeserializationSchema implements DeserializationSchema<SeaTu ObjectNode jsonNode; try { jsonNode = convertBytes(message); - } catch (Throwable cause) { + } catch (RuntimeException e) { if (!ignoreParseErrors) { - throw cause; + throw e; } else { return; }