This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 4ef1b10f8c426883530a6307376ae94c5f6cf3f5
Author: wudi <676366...@qq.com>
AuthorDate: Tue Sep 28 04:37:03 2021 -0500

    [Fix] Flink connector support json import and use httpclient to streamlaod 
(#6740)
    
    * [Bug]:fix when data null , throw NullPointerException
    
    * [Bug]:Distinguish between null and empty string
    
    * [Feature]:flink-connector supports streamload parameters
    
    * [Fix]:code style
    
    * [Fix]: support json format import and use httpclient to streamload
    
    * [Fix]:remove System out
    
    * [Fix]:upgrade httpclient  version
    
    * [Doc]: add json format import doc
    
    Co-authored-by: wudi <w...@shuhaisc.com>
---
 pom.xml                                            |  15 ++
 .../flink/table/DorisDynamicOutputFormat.java      |  55 ++++++--
 .../flink/table/DorisDynamicTableFactory.java      |  20 ++-
 .../doris/flink/table/DorisDynamicTableSink.java   |   9 +-
 .../apache/doris/flink/table/DorisStreamLoad.java  | 156 +++++++++------------
 .../org/apache/doris/flink/DorisSinkExample.java   |   4 +-
 6 files changed, 150 insertions(+), 109 deletions(-)

diff --git a/pom.xml b/pom.xml
index bd43778..a0d10f4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,6 +118,21 @@
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
             <version>${libthrift.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>httpclient</artifactId>
+                    <groupId>org.apache.httpcomponents</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>httpcore</artifactId>
+                    <groupId>org.apache.httpcomponents</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.13</version>
         </dependency>
         <dependency>
             <groupId>org.apache.arrow</groupId>
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 6ee8834..73c68b6 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.doris.flink.table;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
@@ -33,8 +34,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Arrays;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -50,35 +53,45 @@ import static 
org.apache.flink.table.data.RowData.createFieldGetter;
 public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     private static final String FIELD_DELIMITER_KEY = "column_separator";
     private static final String FIELD_DELIMITER_DEFAULT = "\t";
     private static final String LINE_DELIMITER_KEY = "line_delimiter";
     private static final String LINE_DELIMITER_DEFAULT = "\n";
+    private static final String FORMAT_KEY = "format";
+    private static final String FORMAT_JSON_VALUE = "json";
     private static final String NULL_VALUE = "\\N";
+
     private final String fieldDelimiter;
     private final String lineDelimiter;
-
+    private final String[] fieldNames;
+    private final boolean jsonFormat;
     private DorisOptions options;
     private DorisReadOptions readOptions;
     private DorisExecutionOptions executionOptions;
     private DorisStreamLoad dorisStreamLoad;
+    private final RowData.FieldGetter[] fieldGetters;
 
-
-    private final List<String> batch = new ArrayList<>();
+    private final List batch = new ArrayList<>();
     private transient volatile boolean closed = false;
 
     private transient ScheduledExecutorService scheduler;
     private transient ScheduledFuture<?> scheduledFuture;
     private transient volatile Exception flushException;
 
-    private final RowData.FieldGetter[] fieldGetters;
-
-    public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions 
readOptions, DorisExecutionOptions executionOptions, LogicalType[] 
logicalTypes) {
+    public DorisDynamicOutputFormat(DorisOptions option,
+                                    DorisReadOptions readOptions,
+                                    DorisExecutionOptions executionOptions,
+                                    LogicalType[] logicalTypes,
+                                    String[] fieldNames) {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
         this.fieldDelimiter = 
executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, 
FIELD_DELIMITER_DEFAULT);
         this.lineDelimiter = 
executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT);
+        this.fieldNames = fieldNames;
+        this.jsonFormat = 
FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
         this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
         for (int i = 0; i < logicalTypes.length; i++) {
             fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
@@ -133,16 +146,20 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
     }
 
     private void addBatch(RowData row) {
+        Map<String, String> valueMap = new HashMap<>();
         StringJoiner value = new StringJoiner(this.fieldDelimiter);
         for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
             Object field = fieldGetters[i].getFieldOrNull(row);
-            if (field != null) {
-                value.add(field.toString());
+            if (jsonFormat) {
+                String data = field != null ? field.toString() : null;
+                valueMap.put(this.fieldNames[i], data);
             } else {
-                value.add(NULL_VALUE);
+                String data = field != null ? field.toString() : NULL_VALUE;
+                value.add(data);
             }
         }
-        batch.add(value.toString());
+        Object data = jsonFormat ? valueMap : value.toString();
+        batch.add(data);
     }
 
     @Override
@@ -170,9 +187,15 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
         if (batch.isEmpty()) {
             return;
         }
+        String result;
+        if (jsonFormat) {
+            result = OBJECT_MAPPER.writeValueAsString(batch);
+        } else {
+            result = String.join(this.lineDelimiter, batch);
+        }
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
-                dorisStreamLoad.load(String.join(this.lineDelimiter, batch));
+                dorisStreamLoad.load(result);
                 batch.clear();
                 break;
             } catch (StreamLoadException e) {
@@ -221,6 +244,7 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
         private DorisReadOptions readOptions;
         private DorisExecutionOptions executionOptions;
         private DataType[] fieldDataTypes;
+        private String[] fieldNames;
 
         public Builder() {
             this.optionsBuilder = DorisOptions.builder();
@@ -256,6 +280,11 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
             return this;
         }
 
+        public Builder setFieldNames(String[] fieldNames) {
+            this.fieldNames = fieldNames;
+            return this;
+        }
+
         public Builder setFieldDataTypes(DataType[] fieldDataTypes) {
             this.fieldDataTypes = fieldDataTypes;
             return this;
@@ -267,7 +296,7 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
                     .map(DataType::getLogicalType)
                     .toArray(LogicalType[]::new);
             return new DorisDynamicOutputFormat(
-                optionsBuilder.build(), readOptions, executionOptions, 
logicalTypes
+                optionsBuilder.build(), readOptions, executionOptions, 
logicalTypes, fieldNames
             );
         }
     }
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index c0d9934..4654c0d 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -38,7 +38,15 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import static org.apache.doris.flink.cfg.ConfigurationOptions.*;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
+import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
 
 /**
  * The {@link DorisDynamicTableFactory} translates the catalog table to a 
table source.
@@ -260,13 +268,13 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
 
         Properties streamLoadProp = 
getStreamLoadProp(context.getCatalogTable().getOptions());
         TableSchema physicalSchema =
-            
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+                
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
         // create and return dynamic table source
         return new DorisDynamicTableSink(
-            getDorisOptions(helper.getOptions()),
-            getDorisReadOptions(helper.getOptions()),
-            getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
-            physicalSchema
+                getDorisOptions(helper.getOptions()),
+                getDorisReadOptions(helper.getOptions()),
+                getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
+                physicalSchema
         );
     }
 }
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 0b69ea7..3c0c6df 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -35,7 +35,10 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
     private final DorisExecutionOptions executionOptions;
     private final TableSchema tableSchema;
 
-    public DorisDynamicTableSink(DorisOptions options, DorisReadOptions 
readOptions, DorisExecutionOptions executionOptions, TableSchema tableSchema) {
+    public DorisDynamicTableSink(DorisOptions options,
+                                 DorisReadOptions readOptions,
+                                 DorisExecutionOptions executionOptions,
+                                 TableSchema tableSchema) {
         this.options = options;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
@@ -60,8 +63,8 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
             .setTableIdentifier(options.getTableIdentifier())
             .setReadOptions(readOptions)
             .setExecutionOptions(executionOptions)
-            .setFieldDataTypes(tableSchema.getFieldDataTypes());;
-
+            .setFieldDataTypes(tableSchema.getFieldDataTypes())
+            .setFieldNames(tableSchema.getFieldNames());
         return OutputFormatProvider.of(builder.build());
     }
 
diff --git a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java 
b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index b9a7708..c37e640 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -16,27 +16,30 @@
 // under the License.
 package org.apache.doris.flink.table;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Base64;
-import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -49,6 +52,7 @@ import java.util.UUID;
 public class DorisStreamLoad implements Serializable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisStreamLoad.class);
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     private final static List<String> DORIS_SUCCESS_STATUS = new 
ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
     private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";;
@@ -68,7 +72,7 @@ public class DorisStreamLoad implements Serializable {
         this.user = user;
         this.passwd = passwd;
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
-        this.authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
+        this.authEncoding = basicAuthHeader(user, passwd);
         this.streamLoadProp = streamLoadProp;
     }
 
@@ -76,64 +80,19 @@ public class DorisStreamLoad implements Serializable {
         return loadUrlStr;
     }
 
-    public String getHostPort() {
-        return hostPort;
-    }
-
     public void setHostPort(String hostPort) {
         this.hostPort = hostPort;
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, this.db, 
this.tbl);
     }
 
-
-    private HttpURLConnection getConnection(String urlStr, String label) 
throws IOException {
-        URL url = new URL(urlStr);
-        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-        conn.setInstanceFollowRedirects(false);
-        conn.setRequestMethod("PUT");
-        String authEncoding = 
Base64.getEncoder().encodeToString(String.format("%s:%s", user, 
passwd).getBytes(StandardCharsets.UTF_8));
-        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
-        conn.addRequestProperty("Expect", "100-continue");
-        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
-        conn.addRequestProperty("label", label);
-        for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
-            conn.addRequestProperty(String.valueOf(entry.getKey()), 
String.valueOf(entry.getValue()));
-        }
-        conn.setDoOutput(true);
-        conn.setDoInput(true);
-        return conn;
-    }
-
-    public static class LoadResponse {
-        public int status;
-        public String respMsg;
-        public String respContent;
-
-        public LoadResponse(int status, String respMsg, String respContent) {
-            this.status = status;
-            this.respMsg = respMsg;
-            this.respContent = respContent;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("status: ").append(status);
-            sb.append(", resp msg: ").append(respMsg);
-            sb.append(", resp content: ").append(respContent);
-            return sb.toString();
-        }
-    }
-
     public void load(String value) throws StreamLoadException {
         LoadResponse loadResponse = loadBatch(value);
         LOG.info("Streamload Response:{}", loadResponse);
         if (loadResponse.status != 200) {
             throw new StreamLoadException("stream load error: " + 
loadResponse.respContent);
         } else {
-            ObjectMapper obj = new ObjectMapper();
             try {
-                RespContent respContent = 
obj.readValue(loadResponse.respContent, RespContent.class);
+                RespContent respContent = 
OBJECT_MAPPER.readValue(loadResponse.respContent, RespContent.class);
                 if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                     throw new StreamLoadException("stream load error: " + 
respContent.getMessage());
                 }
@@ -148,44 +107,69 @@ public class DorisStreamLoad implements Serializable {
         if (StringUtils.isBlank(label)) {
             SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss");
             String formatDate = sdf.format(new Date());
-            label = String.format("flink_connector_%s_%s",formatDate,
+            label = String.format("flink_connector_%s_%s", formatDate,
                     UUID.randomUUID().toString().replaceAll("-", ""));
         }
 
-        HttpURLConnection feConn = null;
-        HttpURLConnection beConn = null;
-        try {
-            // build request and send to new be location
-            beConn = getConnection(loadUrlStr, label);
-            // send data to be
-            BufferedOutputStream bos = new 
BufferedOutputStream(beConn.getOutputStream());
-            bos.write(value.getBytes());
-            bos.close();
-
-            // get respond
-            int status = beConn.getResponseCode();
-            String respMsg = beConn.getResponseMessage();
-            InputStream stream = (InputStream) beConn.getContent();
-            BufferedReader br = new BufferedReader(new 
InputStreamReader(stream));
-            StringBuilder response = new StringBuilder();
-            String line;
-            while ((line = br.readLine()) != null) {
-                response.append(line);
+        final HttpClientBuilder httpClientBuilder = HttpClients
+                .custom()
+                .setRedirectStrategy(new DefaultRedirectStrategy() {
+                    @Override
+                    protected boolean isRedirectable(String method) {
+                        return true;
+                    }
+                });
+
+        try (CloseableHttpClient client = httpClientBuilder.build()) {
+            HttpPut put = new HttpPut(loadUrlStr);
+            put.setHeader(HttpHeaders.EXPECT, "100-continue");
+            put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding);
+            put.setHeader("label", label);
+            for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
+                put.setHeader(String.valueOf(entry.getKey()), 
String.valueOf(entry.getValue()));
+            }
+            StringEntity entity = new StringEntity(value, "UTF-8");
+            put.setEntity(entity);
+
+            try (CloseableHttpResponse response = client.execute(put)) {
+                final int statusCode = 
response.getStatusLine().getStatusCode();
+                final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
+                String loadResult = "";
+                if (response.getEntity() != null) {
+                    loadResult = EntityUtils.toString(response.getEntity());
+                }
+                return new LoadResponse(statusCode, reasonPhrase, loadResult);
             }
-//            log.info("AuditLoader plugin load with label: {}, response code: 
{}, msg: {}, content: {}",label, status, respMsg, response.toString());
-            return new LoadResponse(status, respMsg, response.toString());
-
         } catch (Exception e) {
-            e.printStackTrace();
-            String err = "failed to load audit via AuditLoader plugin with 
label: " + label;
+            String err = "failed to stream load data with label: " + label;
             LOG.warn(err, e);
             return new LoadResponse(-1, e.getMessage(), err);
-        } finally {
-            if (feConn != null) {
-                feConn.disconnect();
-            }
-            if (beConn != null) {
-                beConn.disconnect();
+        }
+    }
+
+    private String basicAuthHeader(String username, String password) {
+        final String tobeEncode = username + ":" + password;
+        byte[] encoded = 
Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encoded);
+    }
+
+    public static class LoadResponse {
+        public int status;
+        public String respMsg;
+        public String respContent;
+
+        public LoadResponse(int status, String respMsg, String respContent) {
+            this.status = status;
+            this.respMsg = respMsg;
+            this.respContent = respContent;
+        }
+
+        @Override
+        public String toString() {
+            try {
+                return OBJECT_MAPPER.writeValueAsString(this);
+            } catch (JsonProcessingException e) {
+                return "";
             }
         }
     }
diff --git a/src/test/java/org/apache/doris/flink/DorisSinkExample.java 
b/src/test/java/org/apache/doris/flink/DorisSinkExample.java
index dde97de..774d686 100644
--- a/src/test/java/org/apache/doris/flink/DorisSinkExample.java
+++ b/src/test/java/org/apache/doris/flink/DorisSinkExample.java
@@ -48,7 +48,9 @@ public class DorisSinkExample {
                         "  'fenodes' = 'FE_IP:8030',\n" +
                         "  'table.identifier' = 'db.table',\n" +
                         "  'username' = 'root',\n" +
-                        "  'password' = ''\n" +
+                        "  'password' = '',\n" +
+                        "  'sink.properties.format' = 'json',\n" +
+                        "  'sink.properties.strip_outer_array' = 'true'\n" +
                         ")");
 
         tEnv.executeSql("INSERT INTO doris_test_sink select name,age from 
doris_test");

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to