This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch schemachange-1.14 in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/schemachange-1.14 by this push: new b407905 [Feature] Support Light Schema change for flink1.14 (#64) b407905 is described below commit b40790537b0fe89d2c4c0aac75ff7476c0345ee2 Author: wudi <676366...@qq.com> AuthorDate: Wed Sep 28 08:59:55 2022 +0800 [Feature] Support Light Schema change for flink1.14 (#64) Support Light Schema change for flink1.14 --- flink-doris-connector/pom.xml | 45 ++-- .../doris/flink/cfg/DorisExecutionOptions.java | 2 +- .../org/apache/doris/flink/rest/RestService.java | 2 +- .../apache/doris/flink/sink/HttpGetWithEntity.java | 36 +++ .../apache/doris/flink/sink/HttpPutBuilder.java | 12 +- .../doris/flink/sink/committer/DorisCommitter.java | 1 + .../doris/flink/sink/writer/DorisStreamLoad.java | 3 + .../doris/flink/sink/writer/DorisWriter.java | 7 +- .../sink/writer/JsonDebeziumSchemaSerializer.java | 264 +++++++++++++++++++++ .../flink/table/DorisDynamicTableFactory.java | 2 +- .../apache/doris/flink/CDCSchemaChangeExample.java | 87 +++++++ .../org/apache/doris/flink/DorisSinkExample.java | 12 +- .../apache/doris/flink/DorisSinkSQLExample.java | 7 +- .../apache/doris/flink/DorisSourceSinkExample.java | 40 ++-- .../writer/TestJsonDebeziumSchemaSerializer.java | 151 ++++++++++++ .../doris/flink/source/DorisSourceExampleTest.java | 1 - .../flink/source/reader/DorisSourceReaderTest.java | 2 + .../doris/flink/utils/DateToStringConverter.java | 147 ++++++++++++ 18 files changed, 759 insertions(+), 62 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 6206b4a..bde8d37 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -62,9 +62,9 @@ under the License. </mailingList> </mailingLists> <properties> - <scala.version>${env.scala.version}</scala.version> - <flink.version>${env.flink.version}</flink.version> - <flink.minor.version>${env.flink.minor.version}</flink.minor.version> + <scala.version>2.12</scala.version> + <flink.version>1.14.4</flink.version> + <flink.minor.version>1.14</flink.minor.version> <libthrift.version>0.13.0</libthrift.version> <arrow.version>5.0.0</arrow.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> @@ -251,33 +251,20 @@ under the License. <version>2.13.3</version> </dependency> <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-web</artifactId> - <version>${log4j2.version}</version> - </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <version>${log4j2.version}</version> - </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <version>${log4j2.version}</version> - </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-slf4j-impl</artifactId> - <version>${log4j2.version}</version> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> </dependency> - <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> - <version>1.7.9</version> + <version>1.7.25</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> </dependency> <!--Test--> <dependency> @@ -310,6 +297,12 @@ under the License. </exclusions> <scope>test</scope> </dependency> + <dependency> + <groupId>com.ververica</groupId> + <artifactId>flink-connector-mysql-cdc</artifactId> + <version>2.2.1</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 2daf5e1..102a7ee 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -73,7 +73,7 @@ public class DorisExecutionOptions implements Serializable { public static DorisExecutionOptions defaults() { Properties properties = new Properties(); properties.setProperty("format", "json"); - properties.setProperty("strip_outer_array", "true"); + properties.setProperty("read_json_by_line", "true"); return new Builder().setStreamLoadProp(properties).build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 734bfdb..5732dc8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -442,7 +442,7 @@ public class RestService implements Serializable { public static boolean isUniqueKeyType(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisRuntimeException { try { - return "UNIQUE_KEYS_TYPE".equals(getSchema(options, readOptions, logger).getKeysType()); + return UNIQUE_KEYS_TYPE.equals(getSchema(options, readOptions, logger).getKeysType()); } catch (Exception e) { throw new DorisRuntimeException(e); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpGetWithEntity.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpGetWithEntity.java new file mode 100644 index 0000000..28b4a65 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpGetWithEntity.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink; + +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; + +import java.net.URI; + +public class HttpGetWithEntity extends HttpEntityEnclosingRequestBase { + private final static String METHOD_NAME = "GET"; + + @Override + public String getMethod() { + return METHOD_NAME; + } + + public HttpGetWithEntity(final String uri) { + super(); + setURI(URI.create(uri)); + } +} \ No newline at end of file diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java index 6288645..b59f2c1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/HttpPutBuilder.java @@ -17,13 +17,14 @@ package org.apache.doris.flink.sink; -import org.apache.flink.util.Preconditions; - import org.apache.commons.codec.binary.Base64; +import org.apache.doris.flink.sink.writer.LoadConstants; +import org.apache.flink.util.Preconditions; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; + import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -50,6 +51,13 @@ public class HttpPutBuilder { return this; } + public HttpPutBuilder addHiddenColumns(boolean add) { + if(add){ + header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN); + } + return this; + } + public HttpPutBuilder enable2PC() { header.put("two_phase_commit", "true"); return this; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index dfcc0bc..d62e8b4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -90,6 +90,7 @@ public class DorisCommitter implements Committer<DorisCommittable> { try { response = httpClient.execute(putBuilder.build()); } catch (IOException e) { + LOG.error("commit transaction failed: ", e); hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG); continue; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index dfb9cb7..bd29d34 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -75,6 +75,7 @@ public class DorisStreamLoad implements Serializable { private final String db; private final String table; private final boolean enable2PC; + private final boolean enableDelete; private final Properties streamLoadProp; private final RecordStream recordStream; private Future<CloseableHttpResponse> pendingLoadFuture; @@ -98,6 +99,7 @@ public class DorisStreamLoad implements Serializable { this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db); this.enable2PC = executionOptions.enabled2PC(); this.streamLoadProp = executionOptions.getStreamLoadProp(); + this.enableDelete = executionOptions.getDeletable(); this.httpClient = httpClient; this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, @@ -233,6 +235,7 @@ public class DorisStreamLoad implements Serializable { putBuilder.setUrl(loadUrlStr) .baseAuth(user, passwd) .addCommonHeader() + .addHiddenColumns(enableDelete) .setLabel(label) .setEntity(entity) .addProperties(streamLoadProp); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 86ed9ed..aa11df5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -123,7 +124,11 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr @Override public void write(IN in, Context context) throws IOException { checkLoadException(); - dorisStreamLoad.writeRecord(serializer.serialize(in)); + byte[] serialize = serializer.serialize(in); + if(Objects.isNull(serialize)){ + return; + } + dorisStreamLoad.writeRecord(serialize); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java new file mode 100644 index 0000000..c3fe987 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/JsonDebeziumSchemaSerializer.java @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.codec.binary.Base64; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.sink.HttpGetWithEntity; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN; + +public class JsonDebeziumSchemaSerializer implements DorisRecordSerializer<String> { + + private static final Logger LOG = LoggerFactory.getLogger(JsonDebeziumSchemaSerializer.class); + private static final String CHECK_SCHEMA_CHANGE_API = "http://%s/api/enable_light_schema_change/%s/%s"; + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private static final String OP_READ = "r"; // snapshot read + private static final String OP_CREATE = "c"; // insert + private static final String OP_UPDATE = "u"; // update + private static final String OP_DELETE = "d"; // delete + + private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+COLUMN\\s+([^\\s]+).*"; + private final Pattern addDropDDLPattern; + private DorisOptions dorisOptions; + private ObjectMapper objectMapper = new ObjectMapper(); + + public JsonDebeziumSchemaSerializer(DorisOptions dorisOptions, Pattern pattern) { + this.dorisOptions = dorisOptions; + this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, Pattern.CASE_INSENSITIVE) : pattern; + } + + @Override + public byte[] serialize(String record) throws IOException { + LOG.debug("received debezium json data {} :", record); + JsonNode recordRoot = objectMapper.readTree(record); + String op = extractJsonNode(recordRoot, "op"); + if (Objects.isNull(op)) { + //schema change ddl + schemaChange(recordRoot); + return null; + } + Map<String, String> valueMap; + if (OP_READ.equals(op) || OP_CREATE.equals(op)) { + valueMap = extractAfterRow(recordRoot); + addDeleteSign(valueMap,false); + } else if (OP_UPDATE.equals(op)) { + valueMap = extractAfterRow(recordRoot); + addDeleteSign(valueMap,false); + } else if (OP_DELETE.equals(op)) { + valueMap = extractBeforeRow(recordRoot); + addDeleteSign(valueMap,true); + } else { + LOG.error("parse record fail, unknown op {} in {}",op,record); + return null; + } + return objectMapper.writeValueAsString(valueMap).getBytes(StandardCharsets.UTF_8); + } + + @VisibleForTesting + public boolean schemaChange(JsonNode recordRoot) { + boolean status = false; + try{ + boolean doSchemaChange = checkSchemaChange(recordRoot); + status = doSchemaChange && execSchemaChange(recordRoot); + LOG.info("schema change status:{}", status); + }catch (Exception ex){ + LOG.warn("schema change error :", ex); + } + return status; + } + + private void addDeleteSign(Map<String, String> valueMap, boolean delete) { + if(delete){ + valueMap.put(DORIS_DELETE_SIGN, "1"); + }else{ + valueMap.put(DORIS_DELETE_SIGN, "0"); + } + } + + private boolean checkSchemaChange(JsonNode record) throws IOException { + String database = extractDatabase(record); + String table = extractTable(record); + String requestUrl = String.format(CHECK_SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database, table); + Map<String,Object> param = buildRequestParam(record); + if(param.size() != 2){ + return false; + } + HttpGetWithEntity httpGet = new HttpGetWithEntity(requestUrl); + httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpGet.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + boolean success = handleResponse(httpGet); + if (!success) { + LOG.warn("schema change can not do table {}.{}",database,table); + } + return success; + } + + /** + * Build param + * { + * "isDropColumn": true, + * "columnName" : "column" + * } + */ + private Map<String, Object> buildRequestParam(JsonNode record) throws JsonProcessingException { + Map<String,Object> params = new HashMap<>(); + String ddl = extractDDL(record); + if(ddl == null){ + return params; + } + Matcher matcher = addDropDDLPattern.matcher(ddl); + if(matcher.find()){ + String op = matcher.group(1); + String col = matcher.group(2); + params.put("isDropColumn", op.equalsIgnoreCase("DROP")); + params.put("columnName", col); + } + return params; + } + + private boolean execSchemaChange(JsonNode record) throws IOException { + String extractDDL = extractDDL(record); + Map<String, String> param = new HashMap<>(); + param.put("stmt", extractDDL); + String database = extractDatabase(record); + String requestUrl = String.format(SCHEMA_CHANGE_API, dorisOptions.getFenodes(), database); + HttpPost httpPost = new HttpPost(requestUrl); + httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader()); + httpPost.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + httpPost.setEntity(new StringEntity(objectMapper.writeValueAsString(param))); + boolean success = handleResponse(httpPost); + return success; + } + + private String extractDatabase(JsonNode record) { + return extractJsonNode(record.get("source"), "db"); + } + + private String extractTable(JsonNode record) { + return extractJsonNode(record.get("source"), "table"); + } + + private boolean handleResponse(HttpUriRequest request) throws IOException { + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + CloseableHttpResponse response = httpclient.execute(request); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == 200 && response.getEntity() != null) { + String loadResult = EntityUtils.toString(response.getEntity()); + Map<String, Object> responseMap = objectMapper.readValue(loadResult, Map.class); + String code = responseMap.getOrDefault("code", "-1").toString(); + if (code.equals("0")) { + return true; + } else { + LOG.error("schema change response:{}", loadResult); + } + } + }catch(Exception e){ + LOG.error("http request error,", e); + } + return false; + } + + private String extractJsonNode(JsonNode record, String key) { + return record != null && record.get(key) != null ? record.get(key).asText() : null; + } + + private Map<String, String> extractBeforeRow(JsonNode record) { + return extractRow(record.get("before")); + } + + private Map<String, String> extractAfterRow(JsonNode record) { + return extractRow(record.get("after")); + } + + private Map<String, String> extractRow(JsonNode recordRow) { + Map<String, String> recordMap = objectMapper.convertValue(recordRow, new TypeReference<Map<String, String>>() { + }); + return recordMap != null ? recordMap : new HashMap<>(); + } + + @VisibleForTesting + public String extractDDL(JsonNode record) throws JsonProcessingException { + String historyRecord = extractJsonNode(record, "historyRecord"); + if (Objects.isNull(historyRecord)) { + return null; + } + String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl"); + if (!Objects.isNull(ddl)) { + //filter add/drop operation + if (addDropDDLPattern.matcher(ddl).matches()) { + return ddl; + } + } + LOG.info("parse ddl:{}", ddl); + return null; + } + + private String authHeader() { + return "Basic " + new String(Base64.encodeBase64((dorisOptions.getUsername() + ":" + dorisOptions.getPassword()).getBytes(StandardCharsets.UTF_8))); + } + + public static JsonDebeziumSchemaSerializer.Builder builder() { + return new JsonDebeziumSchemaSerializer.Builder(); + } + + /** + * Builder for JsonDebeziumSchemaSerializer. + */ + public static class Builder { + private DorisOptions dorisOptions; + private Pattern addDropDDLPattern; + + public JsonDebeziumSchemaSerializer.Builder setDorisOptions(DorisOptions dorisOptions) { + this.dorisOptions = dorisOptions; + return this; + } + + public JsonDebeziumSchemaSerializer.Builder setPattern(Pattern addDropDDLPattern) { + this.addDropDDLPattern = addDropDDLPattern; + return this; + } + + public JsonDebeziumSchemaSerializer build() { + return new JsonDebeziumSchemaSerializer(dorisOptions, addDropDDLPattern); + } + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index be00cff..fb44359 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -147,7 +147,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions .key("sink.label-prefix") .stringType() - .noDefaultValue() + .defaultValue("") .withDescription("the unique label prefix."); private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions .key("sink.batch.interval") diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java new file mode 100644 index 0000000..bdc0584 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/CDCSchemaChangeExample.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink; + +import com.ververica.cdc.connectors.mysql.source.MySqlSource; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.sink.DorisSink; +import org.apache.doris.flink.utils.DateToStringConverter; +import org.apache.doris.flink.sink.writer.JsonDebeziumSchemaSerializer; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.kafka.connect.json.JsonConverterConfig; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +public class CDCSchemaChangeExample { + + public static void main(String[] args) throws Exception { + + Map<String, Object> customConverterConfigs = new HashMap<>(); + customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); + JsonDebeziumDeserializationSchema schema = + new JsonDebeziumDeserializationSchema(false, customConverterConfigs); + + MySqlSource<String> mySqlSource = MySqlSource.<String>builder() + .hostname("127.0.0.1") + .port(3306) + .databaseList("test") // set captured database + .tableList("test.t1") // set captured table + .username("root") + .password("123456") + .debeziumProperties(DateToStringConverter.DEFAULT_PROPS) + .deserializer(schema) + .serverTimeZone("Asia/Shanghai") + .includeSchemaChanges(true) // converts SourceRecord to JSON String + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + // enable checkpoint + env.enableCheckpointing(10000); +// + Properties props = new Properties(); + props.setProperty("format", "json"); + props.setProperty("read_json_by_line", "true"); + DorisOptions dorisOptions = DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.t1") + .setUsername("root") + .setPassword("").build(); +// + DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); + executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID()) + .setStreamLoadProp(props).setDeletable(true); + + DorisSink.Builder<String> builder = DorisSink.builder(); + builder.setDorisReadOptions(DorisReadOptions.builder().build()) + .setDorisExecutionOptions(executionBuilder.build()) + .setDorisOptions(dorisOptions) + .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()); + + env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.print(); + .sinkTo(builder.build()); + + env.execute("Print MySQL Snapshot + Binlog"); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java index 9c459b7..daac8bb 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.UUID; public class DorisSinkExample { @@ -43,6 +44,7 @@ public class DorisSinkExample { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.enableCheckpointing(10000); + env.setParallelism(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000))); DorisSink.Builder<String> builder = DorisSink.builder(); @@ -61,12 +63,12 @@ public class DorisSinkExample { properties.setProperty("line_delimiter", "\n"); properties.setProperty("format", "csv"); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); - dorisBuilder.setFenodes("127.0.0.1:8040") - .setTableIdentifier("db.table") - .setUsername("test") - .setPassword("test"); + dorisBuilder.setFenodes("127.0.0.1:8131") + .setTableIdentifier("test.tbl") + .setUsername("root") + .setPassword(""); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); - executionBuilder.setLabelPrefix("label-doris") + executionBuilder.setLabelPrefix("label-doris" + UUID.randomUUID()) .setStreamLoadProp(properties) .setBufferSize(8*1024) .setBufferCount(3); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java index 87da571..91ce32b 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java @@ -47,15 +47,16 @@ public class DorisSinkSQLExample { ") " + "WITH (\n" + " 'connector' = 'doris',\n" + - " 'fenodes' = 'FE_IP:8030',\n" + - " 'table.identifier' = 'db.table',\n" + + " 'fenodes' = '127.0.0.1:8030',\n" + + " 'table.identifier' = 'test.tbl',\n" + " 'username' = 'root',\n" + " 'password' = '',\n" + " 'sink.properties.format' = 'json',\n" + " 'sink.buffer-count' = '4',\n" + " 'sink.buffer-size' = '4086'," + " 'sink.label-prefix' = 'doris_label',\n" + - " 'sink.properties.read_json_by_line' = 'true'\n" + + " 'sink.properties.read_json_by_line' = 'true',\n" + + " 'sink.properties.function_column.sequence_col' = 'age'\n" + ")"); tEnv.executeSql("INSERT INTO doris_test_sink select name,age from doris_test"); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java index 60524c8..1478885 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java @@ -17,48 +17,46 @@ package org.apache.doris.flink; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class DorisSourceSinkExample { public static void main(String[] args) { - EnvironmentSettings settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() - .inStreamingMode() - .build(); - TableEnvironment tEnv = TableEnvironment.create(settings); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.executeSql( "CREATE TABLE doris_test (" + + "id INT," + "name STRING," + - "age INT," + - "price DECIMAL(5,2)," + - "sale DOUBLE" + + "PRIMARY KEY (id) NOT ENFORCED" + ") " + "WITH (\n" + - " 'connector' = 'doris',\n" + - " 'fenodes' = 'FE_IP:8030',\n" + - " 'table.identifier' = 'db.table',\n" + + " 'connector' = 'mysql-cdc',\n" + + " 'hostname' = '127.0.0.1',\n" + + " 'port' = '3306',\n" + " 'username' = 'root',\n" + - " 'password' = ''" + + " 'password' = '123456'," + + " 'database-name' = 'test', " + + " 'table-name' = 'test'" + ")"); tEnv.executeSql( "CREATE TABLE doris_test_sink (" + - "name STRING," + - "age INT," + - "price DECIMAL(5,2)," + - "sale DOUBLE" + + "id INT," + + "name STRING" + ") " + "WITH (\n" + " 'connector' = 'doris',\n" + - " 'fenodes' = 'FE_IP:8030',\n" + - " 'table.identifier' = 'db.table',\n" + + " 'fenodes' = '127.0.0.1:8030',\n" + + " 'table.identifier' = 'test.tbk',\n" + " 'username' = 'root',\n" + " 'password' = '',\n" + " 'sink.properties.format' = 'csv',\n" + " 'sink.label-prefix' = 'doris_csv_table'\n" + ")"); - tEnv.executeSql("INSERT INTO doris_test_sink select name,age,price,sale from doris_test"); + tEnv.executeSql("INSERT INTO doris_test_sink select id,name from doris_test"); } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java new file mode 100644 index 0000000..0d35ae3 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestJsonDebeziumSchemaSerializer.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.writer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.rest.models.Field; +import org.apache.doris.flink.rest.models.Schema; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +/** + * test for JsonDebeziumSchemaSerializer. + */ +public class TestJsonDebeziumSchemaSerializer { + private static final Logger LOG = LoggerFactory.getLogger(TestJsonDebeziumSchemaSerializer.class); + static DorisOptions dorisOptions; + static JsonDebeziumSchemaSerializer serializer; + static ObjectMapper objectMapper = new ObjectMapper(); + + @BeforeClass + public static void setUp() { + dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.t1") + .setUsername("root") + .setPassword("").build(); + serializer = JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build(); + } + + @Test + public void testSerializeInsert() throws IOException { + //insert into t1 VALUES(1,"doris",'2022-01-01','2022-01-01 10:01:02','2022-01-01 10:01:03'); + byte[] serializedValue = serializer.serialize("{\"before\":null,\"after\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663923840000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":11834,\"row\":0,\"thread\":null,\ [...] + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + Assert.assertEquals("1", valueMap.get("id")); + Assert.assertEquals("doris", valueMap.get("name")); + Assert.assertEquals("2022-01-01", valueMap.get("dt")); + Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime")); + Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts")); + Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__")); + Assert.assertEquals(6, valueMap.size()); + + } + + @Test + public void testSerializeUpdate() throws IOException { + //update t1 set name='doris-update' WHERE id =1; + byte[] serializedValue = serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924082000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":n [...] + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + Assert.assertEquals("1", valueMap.get("id")); + Assert.assertEquals("doris-update", valueMap.get("name")); + Assert.assertEquals("2022-01-01", valueMap.get("dt")); + Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime")); + Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts")); + Assert.assertEquals("0", valueMap.get("__DORIS_DELETE_SIGN__")); + Assert.assertEquals(6, valueMap.size()); + } + + @Test + public void testSerializeDelete() throws IOException { + byte[] serializedValue = serializer.serialize("{\"before\":{\"id\":1,\"name\":\"doris-update\",\"dt\":\"2022-01-01\",\"dtime\":\"2022-01-01 10:01:02\",\"ts\":\"2022-01-01 10:01:03\"},\"after\":null,\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924328000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":12500,\"row\":0,\"thread\" [...] + Map<String, String> valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>(){}); + Assert.assertEquals("1", valueMap.get("id")); + Assert.assertEquals("doris-update", valueMap.get("name")); + Assert.assertEquals("2022-01-01", valueMap.get("dt")); + Assert.assertEquals("2022-01-01 10:01:02", valueMap.get("dtime")); + Assert.assertEquals("2022-01-01 10:01:03", valueMap.get("ts")); + Assert.assertEquals("1", valueMap.get("__DORIS_DELETE_SIGN__")); + Assert.assertEquals(6, valueMap.size()); + } + + @Test + public void testExtractDDL() throws IOException { + String srcDDL = "alter table t1 add \n column c_1 varchar(200)"; + String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + JsonNode recordRoot = objectMapper.readTree(record); + String ddl = serializer.extractDDL(recordRoot); + Assert.assertEquals(srcDDL, ddl); + } + + @Ignore + @Test + public void testSerializeAddColumn() throws IOException, DorisException { + // alter table t1 add column c_1 varchar(200) + String record = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663924503565,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13088,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13088,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\ [...] + JsonNode recordRoot = objectMapper.readTree(record); + boolean flag = serializer.schemaChange(recordRoot); + Assert.assertEquals(true, flag); + + Field targetField = getField("c_1"); + Assert.assertNotNull(targetField); + Assert.assertEquals("c_1", targetField.getName()); + Assert.assertEquals("VARCHAR", targetField.getType()); + } + + @Ignore + @Test + public void testSerializeDropColumn() throws IOException, DorisException { + //alter table t1 drop column c_1; + String ddl = "{\"source\":{\"version\":\"1.5.4.Final\",\"connector\":\"mysql\",\"name\":\"mysql_binlog_source\",\"ts_ms\":1663925897321,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"t1\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000006\",\"pos\":13298,\"row\":0,\"thread\":null,\"query\":null},\"historyRecord\":\"{\\\"source\\\":{\\\"file\\\":\\\"binlog.000006\\\",\\\"pos\\\":13298,\\\"server_id\\\":1},\\\"position\\\":{\\\"transaction_id\\\":null,\\\" [...] + JsonNode recordRoot = objectMapper.readTree(ddl); + boolean flag = serializer.schemaChange(recordRoot); + Assert.assertEquals(true, flag); + + Field targetField = getField("c_1"); + Assert.assertNull(targetField); + } + + private static Field getField(String column) throws DorisException{ + //get table schema + Schema schema = RestService.getSchema(dorisOptions, DorisReadOptions.builder().build(), LOG); + List<Field> properties = schema.getProperties(); + Field targetField = null; + for(Field field : properties){ + if(column.equals(field.getName())){ + targetField = field; + break; + } + } + return targetField; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java index d85e70d..c8033d6 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java @@ -40,7 +40,6 @@ public class DorisSourceExampleTest { .setDeserializer(new SimpleListDeserializationSchema()) .build(); - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris Source") diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java index a44b96d..4ab44bf 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/reader/DorisSourceReaderTest.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.source.reader; import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema; import org.apache.doris.flink.sink.OptionUtils; import org.apache.doris.flink.source.split.DorisSourceSplit; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -29,6 +30,7 @@ import static org.junit.Assert.assertEquals; /** * Unit tests for the {@link DorisSourceReader}. */ +@Ignore public class DorisSourceReaderTest { private static DorisSourceReader createReader(TestingReaderContext context) { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java new file mode 100644 index 0000000..9d73f53 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/utils/DateToStringConverter.java @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.utils; + +import io.debezium.spi.converter.CustomConverter; +import io.debezium.spi.converter.RelationalColumn; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.time.DateTimeException; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Properties; +import java.util.function.Consumer; + +public class DateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { + private static final Logger log = LoggerFactory.getLogger(DateToStringConverter.class); + private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; + private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; + private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; + private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; + private ZoneId timestampZoneId = ZoneId.systemDefault(); + + public static Properties DEFAULT_PROPS = new Properties(); + + static { + DEFAULT_PROPS.setProperty("converters", "date"); + DEFAULT_PROPS.setProperty("date.type", "org.apache.doris.flink.utils.DateToStringConverter"); + DEFAULT_PROPS.setProperty("date.format.date", "yyyy-MM-dd"); + DEFAULT_PROPS.setProperty("date.format.datetime", "yyyy-MM-dd HH:mm:ss"); + DEFAULT_PROPS.setProperty("date.format.timestamp", "yyyy-MM-dd HH:mm:ss"); + DEFAULT_PROPS.setProperty("date.format.timestamp.zone", "UTC"); + } + + @Override + public void configure(Properties props) { + readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); + readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z)); + } + + private void readProps(Properties properties, String settingKey, Consumer<String> callback) { + String settingValue = (String) properties.get(settingKey); + if (settingValue == null || settingValue.length() == 0) { + return; + } + try { + callback.accept(settingValue.trim()); + } catch (IllegalArgumentException | DateTimeException e) { + log.error("setting {} is illegal:{}", settingKey, settingValue); + throw e; + } + } + + @Override + public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { + String sqlType = column.typeName().toUpperCase(); + SchemaBuilder schemaBuilder = null; + Converter converter = null; + if ("DATE".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertDate; + } + if ("TIME".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertTime; + } + if ("DATETIME".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertDateTime; + } + if ("TIMESTAMP".equals(sqlType)) { + schemaBuilder = SchemaBuilder.string().optional(); + converter = this::convertTimestamp; + } + if (schemaBuilder != null) { + registration.register(schemaBuilder, converter); + } + } + + private String convertDate(Object input) { + if (input instanceof LocalDate) { + return dateFormatter.format((LocalDate) input); + } else if (input instanceof Integer) { + LocalDate date = LocalDate.ofEpochDay((Integer) input); + return dateFormatter.format(date); + } + return null; + } + + private String convertTime(Object input) { + if (input instanceof Duration) { + Duration duration = (Duration) input; + long seconds = duration.getSeconds(); + int nano = duration.getNano(); + LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); + return timeFormatter.format(time); + } + return null; + } + + private String convertDateTime(Object input) { + if (input instanceof LocalDateTime) { + return datetimeFormatter.format((LocalDateTime) input); + } else if (input instanceof Timestamp) { + return datetimeFormatter.format(((Timestamp) input).toLocalDateTime()); + } + return null; + } + + private String convertTimestamp(Object input) { + if (input instanceof ZonedDateTime) { + // mysql timestamp will be converted to UTC storage,and the zonedDatetime here is UTC time + ZonedDateTime zonedDateTime = (ZonedDateTime) input; + LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); + return timestampFormatter.format(localDateTime); + } else if (input instanceof Timestamp) { + return timestampFormatter.format(((Timestamp) input).toLocalDateTime()); + } + return null; + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org