This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 706310e fix hidden column does not take effect (#117)
706310e is described below
commit 706310e73e455f08c9a65b5085e7492c19bd21c8
Author: wudi <[email protected]>
AuthorDate: Fri Mar 10 17:49:47 2023 +0800
fix hidden column does not take effect (#117)
Co-authored-by: wudi <>
---
.../org/apache/doris/flink/sink/DorisSink.java | 1 -
.../org/apache/doris/flink/sink/EscapeHandler.java | 7 +-
.../doris/flink/sink/writer/DorisStreamLoad.java | 3 +-
.../doris/flink/sink/writer/RowDataSerializer.java | 3 +-
.../doris/flink/DorisSinkExampleRowData.java | 101 +++++++++++++++++++++
5 files changed, 108 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index efb530e..d64e488 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -128,7 +128,6 @@ public class DorisSink<IN> implements Sink<IN,
DorisCommittable, DorisWriterStat
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
Preconditions.checkNotNull(serializer);
-
EscapeHandler.handleEscape(dorisExecutionOptions.getStreamLoadProp());
if(dorisReadOptions == null) {
dorisReadOptions = DorisReadOptions.builder().build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
index 1b9a70f..f83cc79 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/EscapeHandler.java
@@ -33,7 +33,7 @@ public class EscapeHandler {
public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
public static final Pattern ESCAPE_PATTERN =
Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
- public String escapeString(String source) {
+ public static String escapeString(String source) {
if (source.contains(ESCAPE_DELIMITERS_FLAGS)) {
Matcher m = ESCAPE_PATTERN.matcher(source);
StringBuffer buf = new StringBuffer();
@@ -46,7 +46,7 @@ public class EscapeHandler {
return source;
}
- public void handle(Properties properties) {
+ public static void handle(Properties properties) {
String fieldDelimiter = properties.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
if (fieldDelimiter.contains(ESCAPE_DELIMITERS_FLAGS)) {
@@ -60,7 +60,6 @@ public class EscapeHandler {
}
public static void handleEscape(Properties properties) {
- EscapeHandler handler = new EscapeHandler();
- handler.handle(properties);
+ handle(properties);
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 80760a2..dc33fce 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -25,6 +25,7 @@ import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
+import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.flink.annotation.VisibleForTesting;
@@ -105,7 +106,7 @@ public class DorisStreamLoad implements Serializable {
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new
ExecutorThreadFactory("stream-load-upload"));
this.recordStream = new RecordStream(executionOptions.getBufferSize(),
executionOptions.getBufferCount());
- lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT).getBytes();
+ lineDelimiter =
EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT)).getBytes();
loadBatchFirstRecord = true;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
index 5d00301..e1ec13e 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/RowDataSerializer.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.sink.writer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.deserialization.converter.DorisRowConverter;
+import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
@@ -141,7 +142,7 @@ public class RowDataSerializer implements
DorisRecordSerializer<RowData> {
}
public Builder setFieldDelimiter(String fieldDelimiter) {
- this.fieldDelimiter = fieldDelimiter;
+ this.fieldDelimiter = EscapeHandler.escapeString(fieldDelimiter);
return this;
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
new file mode 100644
index 0000000..79d36c5
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExampleRowData.java
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.writer.LoadConstants;
+import org.apache.doris.flink.sink.writer.RowDataSerializer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Collector;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkExampleRowData {
+
+ public static void main(String[] args) throws Exception{
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.enableCheckpointing(10000);
+ env.setParallelism(1);
+
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
Time.milliseconds(30000)));
+ DorisSink.Builder<RowData> builder = DorisSink.builder();
+
+ Properties properties = new Properties();
+ properties.setProperty("column_separator", ",");
+ properties.setProperty("line_delimiter", "\n");
+// properties.setProperty("read_json_by_line", "true");
+// properties.setProperty("format", "json");
+ DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+ dorisBuilder.setFenodes("127.0.0.1:8030")
+ .setTableIdentifier("db.tbl")
+ .setUsername("root")
+ .setPassword("");
+ DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();
+ executionBuilder.setLabelPrefix(UUID.randomUUID().toString())
+ .setStreamLoadProp(properties);
+
+ //flink rowdata‘s schema
+ String[] fields = {"name", "age"};
+ DataType[] types = {DataTypes.VARCHAR(256), DataTypes.INT()};
+
+ builder.setDorisExecutionOptions(executionBuilder.build())
+ .setSerializer(RowDataSerializer.builder() //serialize
according to rowdata
+ .setType(LoadConstants.CSV)
//.setType(LoadConstants.CSV)
+ .setFieldDelimiter(",")
+ .setFieldNames(fields) //.setFieldDelimiter(",")
+ .setFieldType(types).build())
+ .setDorisOptions(dorisBuilder.build());
+
+ //mock rowdata source
+ DataStream<RowData> source = env.fromElements("")
+ .flatMap(new FlatMapFunction<String, RowData>() {
+ @Override
+ public void flatMap(String s, Collector<RowData> out)
throws Exception {
+ GenericRowData genericRowData = new GenericRowData(2);
+ genericRowData.setField(0,
StringData.fromString("beijing"));
+ genericRowData.setField(1, 123);
+ out.collect(genericRowData);
+
+ GenericRowData genericRowData2 = new GenericRowData(2);
+ genericRowData2.setField(0,
StringData.fromString("shanghai"));
+ genericRowData2.setField(1, 1234);
+ out.collect(genericRowData2);
+ }
+ });
+
+ source.sinkTo(builder.build());
+ env.execute("doris test");
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]