This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 4ef1b10f8c426883530a6307376ae94c5f6cf3f5 Author: wudi <676366...@qq.com> AuthorDate: Tue Sep 28 04:37:03 2021 -0500 [Fix] Flink connector support json import and use httpclient to streamlaod (#6740) * [Bug]:fix when data null , throw NullPointerException * [Bug]:Distinguish between null and empty string * [Feature]:flink-connector supports streamload parameters * [Fix]:code style * [Fix]: support json format import and use httpclient to streamload * [Fix]:remove System out * [Fix]:upgrade httpclient version * [Doc]: add json format import doc Co-authored-by: wudi <w...@shuhaisc.com> --- pom.xml | 15 ++ .../flink/table/DorisDynamicOutputFormat.java | 55 ++++++-- .../flink/table/DorisDynamicTableFactory.java | 20 ++- .../doris/flink/table/DorisDynamicTableSink.java | 9 +- .../apache/doris/flink/table/DorisStreamLoad.java | 156 +++++++++------------ .../org/apache/doris/flink/DorisSinkExample.java | 4 +- 6 files changed, 150 insertions(+), 109 deletions(-) diff --git a/pom.xml b/pom.xml index bd43778..a0d10f4 100644 --- a/pom.xml +++ b/pom.xml @@ -118,6 +118,21 @@ <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>${libthrift.version}</version> + <exclusions> + <exclusion> + <artifactId>httpclient</artifactId> + <groupId>org.apache.httpcomponents</groupId> + </exclusion> + <exclusion> + <artifactId>httpcore</artifactId> + <groupId>org.apache.httpcomponents</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.13</version> </dependency> <dependency> <groupId>org.apache.arrow</groupId> diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 6ee8834..73c68b6 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -16,6 +16,7 @@ // under the License. package org.apache.doris.flink.table; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -33,8 +34,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Arrays; import java.util.StringJoiner; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -50,35 +53,45 @@ import static org.apache.flink.table.data.RowData.createFieldGetter; public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String FIELD_DELIMITER_KEY = "column_separator"; private static final String FIELD_DELIMITER_DEFAULT = "\t"; private static final String LINE_DELIMITER_KEY = "line_delimiter"; private static final String LINE_DELIMITER_DEFAULT = "\n"; + private static final String FORMAT_KEY = "format"; + private static final String FORMAT_JSON_VALUE = "json"; private static final String NULL_VALUE = "\\N"; + private final String fieldDelimiter; private final String lineDelimiter; - + private final String[] fieldNames; + private final boolean jsonFormat; private DorisOptions options; private DorisReadOptions readOptions; private DorisExecutionOptions executionOptions; private DorisStreamLoad dorisStreamLoad; + private final RowData.FieldGetter[] fieldGetters; - - private final List<String> batch = new ArrayList<>(); + private final List batch = new ArrayList<>(); private transient volatile boolean closed = false; private transient ScheduledExecutorService scheduler; private transient ScheduledFuture<?> scheduledFuture; private transient volatile Exception flushException; - private final RowData.FieldGetter[] fieldGetters; - - public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, LogicalType[] logicalTypes) { + public DorisDynamicOutputFormat(DorisOptions option, + DorisReadOptions readOptions, + DorisExecutionOptions executionOptions, + LogicalType[] logicalTypes, + String[] fieldNames) { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT); this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); + this.fieldNames = fieldNames; + this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY)); this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; for (int i = 0; i < logicalTypes.length; i++) { fieldGetters[i] = createFieldGetter(logicalTypes[i], i); @@ -133,16 +146,20 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } private void addBatch(RowData row) { + Map<String, String> valueMap = new HashMap<>(); StringJoiner value = new StringJoiner(this.fieldDelimiter); for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) { Object field = fieldGetters[i].getFieldOrNull(row); - if (field != null) { - value.add(field.toString()); + if (jsonFormat) { + String data = field != null ? field.toString() : null; + valueMap.put(this.fieldNames[i], data); } else { - value.add(NULL_VALUE); + String data = field != null ? field.toString() : NULL_VALUE; + value.add(data); } } - batch.add(value.toString()); + Object data = jsonFormat ? valueMap : value.toString(); + batch.add(data); } @Override @@ -170,9 +187,15 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { if (batch.isEmpty()) { return; } + String result; + if (jsonFormat) { + result = OBJECT_MAPPER.writeValueAsString(batch); + } else { + result = String.join(this.lineDelimiter, batch); + } for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { - dorisStreamLoad.load(String.join(this.lineDelimiter, batch)); + dorisStreamLoad.load(result); batch.clear(); break; } catch (StreamLoadException e) { @@ -221,6 +244,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private DorisReadOptions readOptions; private DorisExecutionOptions executionOptions; private DataType[] fieldDataTypes; + private String[] fieldNames; public Builder() { this.optionsBuilder = DorisOptions.builder(); @@ -256,6 +280,11 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { return this; } + public Builder setFieldNames(String[] fieldNames) { + this.fieldNames = fieldNames; + return this; + } + public Builder setFieldDataTypes(DataType[] fieldDataTypes) { this.fieldDataTypes = fieldDataTypes; return this; @@ -267,7 +296,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { .map(DataType::getLogicalType) .toArray(LogicalType[]::new); return new DorisDynamicOutputFormat( - optionsBuilder.build(), readOptions, executionOptions, logicalTypes + optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames ); } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index c0d9934..4654c0d 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -38,7 +38,15 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import static org.apache.doris.flink.cfg.ConfigurationOptions.*; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; /** * The {@link DorisDynamicTableFactory} translates the catalog table to a table source. @@ -260,13 +268,13 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions()); TableSchema physicalSchema = - TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); // create and return dynamic table source return new DorisDynamicTableSink( - getDorisOptions(helper.getOptions()), - getDorisReadOptions(helper.getOptions()), - getDorisExecutionOptions(helper.getOptions(), streamLoadProp), - physicalSchema + getDorisOptions(helper.getOptions()), + getDorisReadOptions(helper.getOptions()), + getDorisExecutionOptions(helper.getOptions(), streamLoadProp), + physicalSchema ); } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 0b69ea7..3c0c6df 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -35,7 +35,10 @@ public class DorisDynamicTableSink implements DynamicTableSink { private final DorisExecutionOptions executionOptions; private final TableSchema tableSchema; - public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, TableSchema tableSchema) { + public DorisDynamicTableSink(DorisOptions options, + DorisReadOptions readOptions, + DorisExecutionOptions executionOptions, + TableSchema tableSchema) { this.options = options; this.readOptions = readOptions; this.executionOptions = executionOptions; @@ -60,8 +63,8 @@ public class DorisDynamicTableSink implements DynamicTableSink { .setTableIdentifier(options.getTableIdentifier()) .setReadOptions(readOptions) .setExecutionOptions(executionOptions) - .setFieldDataTypes(tableSchema.getFieldDataTypes());; - + .setFieldDataTypes(tableSchema.getFieldDataTypes()) + .setFieldNames(tableSchema.getFieldNames()); return OutputFormatProvider.of(builder.build()); } diff --git a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index b9a7708..c37e640 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -16,27 +16,30 @@ // under the License. package org.apache.doris.flink.table; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.models.RespContent; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedOutputStream; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.io.Serializable; -import java.net.HttpURLConnection; -import java.net.URL; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Base64; -import java.util.Calendar; import java.util.Date; import java.util.List; import java.util.Map; @@ -49,6 +52,7 @@ import java.util.UUID; public class DorisStreamLoad implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"; @@ -68,7 +72,7 @@ public class DorisStreamLoad implements Serializable { this.user = user; this.passwd = passwd; this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); - this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + this.authEncoding = basicAuthHeader(user, passwd); this.streamLoadProp = streamLoadProp; } @@ -76,64 +80,19 @@ public class DorisStreamLoad implements Serializable { return loadUrlStr; } - public String getHostPort() { - return hostPort; - } - public void setHostPort(String hostPort) { this.hostPort = hostPort; this.loadUrlStr = String.format(loadUrlPattern, hostPort, this.db, this.tbl); } - - private HttpURLConnection getConnection(String urlStr, String label) throws IOException { - URL url = new URL(urlStr); - HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - conn.setInstanceFollowRedirects(false); - conn.setRequestMethod("PUT"); - String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); - conn.setRequestProperty("Authorization", "Basic " + authEncoding); - conn.addRequestProperty("Expect", "100-continue"); - conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); - conn.addRequestProperty("label", label); - for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) { - conn.addRequestProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); - } - conn.setDoOutput(true); - conn.setDoInput(true); - return conn; - } - - public static class LoadResponse { - public int status; - public String respMsg; - public String respContent; - - public LoadResponse(int status, String respMsg, String respContent) { - this.status = status; - this.respMsg = respMsg; - this.respContent = respContent; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("status: ").append(status); - sb.append(", resp msg: ").append(respMsg); - sb.append(", resp content: ").append(respContent); - return sb.toString(); - } - } - public void load(String value) throws StreamLoadException { LoadResponse loadResponse = loadBatch(value); LOG.info("Streamload Response:{}", loadResponse); if (loadResponse.status != 200) { throw new StreamLoadException("stream load error: " + loadResponse.respContent); } else { - ObjectMapper obj = new ObjectMapper(); try { - RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class); + RespContent respContent = OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { throw new StreamLoadException("stream load error: " + respContent.getMessage()); } @@ -148,44 +107,69 @@ public class DorisStreamLoad implements Serializable { if (StringUtils.isBlank(label)) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss"); String formatDate = sdf.format(new Date()); - label = String.format("flink_connector_%s_%s",formatDate, + label = String.format("flink_connector_%s_%s", formatDate, UUID.randomUUID().toString().replaceAll("-", "")); } - HttpURLConnection feConn = null; - HttpURLConnection beConn = null; - try { - // build request and send to new be location - beConn = getConnection(loadUrlStr, label); - // send data to be - BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream()); - bos.write(value.getBytes()); - bos.close(); - - // get respond - int status = beConn.getResponseCode(); - String respMsg = beConn.getResponseMessage(); - InputStream stream = (InputStream) beConn.getContent(); - BufferedReader br = new BufferedReader(new InputStreamReader(stream)); - StringBuilder response = new StringBuilder(); - String line; - while ((line = br.readLine()) != null) { - response.append(line); + final HttpClientBuilder httpClientBuilder = HttpClients + .custom() + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + + try (CloseableHttpClient client = httpClientBuilder.build()) { + HttpPut put = new HttpPut(loadUrlStr); + put.setHeader(HttpHeaders.EXPECT, "100-continue"); + put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding); + put.setHeader("label", label); + for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) { + put.setHeader(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); + } + StringEntity entity = new StringEntity(value, "UTF-8"); + put.setEntity(entity); + + try (CloseableHttpResponse response = client.execute(put)) { + final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); + String loadResult = ""; + if (response.getEntity() != null) { + loadResult = EntityUtils.toString(response.getEntity()); + } + return new LoadResponse(statusCode, reasonPhrase, loadResult); } -// log.info("AuditLoader plugin load with label: {}, response code: {}, msg: {}, content: {}",label, status, respMsg, response.toString()); - return new LoadResponse(status, respMsg, response.toString()); - } catch (Exception e) { - e.printStackTrace(); - String err = "failed to load audit via AuditLoader plugin with label: " + label; + String err = "failed to stream load data with label: " + label; LOG.warn(err, e); return new LoadResponse(-1, e.getMessage(), err); - } finally { - if (feConn != null) { - feConn.disconnect(); - } - if (beConn != null) { - beConn.disconnect(); + } + } + + private String basicAuthHeader(String username, String password) { + final String tobeEncode = username + ":" + password; + byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encoded); + } + + public static class LoadResponse { + public int status; + public String respMsg; + public String respContent; + + public LoadResponse(int status, String respMsg, String respContent) { + this.status = status; + this.respMsg = respMsg; + this.respContent = respContent; + } + + @Override + public String toString() { + try { + return OBJECT_MAPPER.writeValueAsString(this); + } catch (JsonProcessingException e) { + return ""; } } } diff --git a/src/test/java/org/apache/doris/flink/DorisSinkExample.java b/src/test/java/org/apache/doris/flink/DorisSinkExample.java index dde97de..774d686 100644 --- a/src/test/java/org/apache/doris/flink/DorisSinkExample.java +++ b/src/test/java/org/apache/doris/flink/DorisSinkExample.java @@ -48,7 +48,9 @@ public class DorisSinkExample { " 'fenodes' = 'FE_IP:8030',\n" + " 'table.identifier' = 'db.table',\n" + " 'username' = 'root',\n" + - " 'password' = ''\n" + + " 'password' = '',\n" + + " 'sink.properties.format' = 'json',\n" + + " 'sink.properties.strip_outer_array' = 'true'\n" + ")"); tEnv.executeSql("INSERT INTO doris_test_sink select name,age from doris_test"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org