This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 15530d278 [Hotfix][Connector-V2][ES] Source deserializer error and
inappropriate (#4233)
15530d278 is described below
commit 15530d2785ff98e8560951f59876104e7d6f321c
Author: wyc <[email protected]>
AuthorDate: Sat Apr 1 17:30:12 2023 +0800
[Hotfix][Connector-V2][ES] Source deserializer error and inappropriate
(#4233)
* [Fix][Connector-V2][ES]source deserializer error and inappropriate
* [WIP][Connector-V2][ES]try fix e2e
* [WIP][Connector-V2][ES]try fix e2e
* [WIP][Connector-V2][ES]try fix e2e
* [WIP][Connector-V2][ES]try fix e2e
* [Fix][Connector-V2][ES]es special types may result in a null pointer.
e.g. ip type
* [Fix][Connector-V2][ES]fix the parsing exception when the es time type is
epoch_millis
---
.../source/DefaultSeaTunnelRowDeserializer.java | 17 ++++++++++++++++-
.../e2e/connector/elasticsearch/ElasticsearchIT.java | 14 ++++++++++----
2 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
index 0cd06dcaa..5a4858df9 100644
---
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
+++
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/source/DefaultSeaTunnelRowDeserializer.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingExcep
import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
@@ -37,7 +38,9 @@ import
org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.Elastic
import java.lang.reflect.Array;
import java.math.BigDecimal;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
@@ -111,7 +114,12 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
- seaTunnelFields[i] = convertValue(seaTunnelDataType,
value.toString());
+ if (value instanceof TextNode) {
+ seaTunnelFields[i] =
+ convertValue(seaTunnelDataType, ((TextNode)
value).textValue());
+ } else {
+ seaTunnelFields[i] = convertValue(seaTunnelDataType,
value.toString());
+ }
}
}
} catch (Exception ex) {
@@ -188,6 +196,13 @@ public class DefaultSeaTunnelRowDeserializer implements
SeaTunnelRowDeserializer
}
private LocalDateTime parseDate(String fieldValue) {
+ // handle strings of timestamp type
+ try {
+ long ts = Long.parseLong(fieldValue);
+ return LocalDateTime.ofInstant(Instant.ofEpochMilli(ts),
ZoneId.systemDefault());
+ } catch (NumberFormatException e) {
+ // no op
+ }
String formatDate = fieldValue.replace("T", " ");
if (fieldValue.length() == "yyyyMMdd".length()
|| fieldValue.length() == "yyyy-MM-dd".length()) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index ec9e77847..793fa2513 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -44,10 +44,10 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.math.BigDecimal;
-import java.net.UnknownHostException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -131,8 +131,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}
- private List<String> generateTestDataSet()
- throws JsonProcessingException, UnknownHostException {
+ private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
"c_map",
@@ -170,7 +169,7 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.now().toString(),
- LocalDateTime.now().toString()
+ System.currentTimeMillis()
};
for (int j = 0; j < fields.length; j++) {
doc.put(fields[j], values[j]);
@@ -215,6 +214,13 @@ public class ElasticsearchIT extends TestSuiteBase
implements TestResource {
x.remove("_index");
x.remove("_type");
x.remove("_id");
+ // I don’t know if converting the test cases in
this way complies with
+ // the CI specification
+ x.replace(
+ "c_timestamp",
+
LocalDateTime.parse(x.get("c_timestamp").toString())
+ .toInstant(ZoneOffset.UTC)
+ .toEpochMilli());
});
List<String> docs =
scrollResult.getDocs().stream()