This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 001b52c082944da2870c83bc61fa8c45c325cf0c Author: xhmz <yao...@163.com> AuthorDate: Fri Sep 24 21:38:35 2021 +0800 [Flink] Fix bug of flink doris connector (#6655) Flink-Doris-Connector do not support flink 1.13, refactor doris sink forma to not use GenericRowData. But to use RowData::FieldGetter. --- .../flink/table/DorisDynamicOutputFormat.java | 45 ++++-- .../flink/table/DorisDynamicTableFactory.java | 165 ++++++++++----------- .../doris/flink/table/DorisDynamicTableSink.java | 28 ++-- 3 files changed, 127 insertions(+), 111 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 77b53ba..6ee8834 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -25,22 +25,24 @@ import org.apache.doris.flink.rest.RestService; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.util.ExecutorThreadFactory; -import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import java.util.Properties; import java.util.StringJoiner; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import static org.apache.flink.table.data.RowData.createFieldGetter; + /** * DorisDynamicOutputFormat @@ -69,12 +71,18 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private transient ScheduledFuture<?> scheduledFuture; private transient volatile Exception flushException; - public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) { + private final RowData.FieldGetter[] fieldGetters; + + public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, LogicalType[] logicalTypes) { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT); this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); + this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; + for (int i = 0; i < logicalTypes.length; i++) { + fieldGetters[i] = createFieldGetter(logicalTypes[i], i); + } } @Override @@ -84,12 +92,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { @Override public void open(int taskNumber, int numTasks) throws IOException { dorisStreamLoad = new DorisStreamLoad( - getBackend(), - options.getTableIdentifier().split("\\.")[0], - options.getTableIdentifier().split("\\.")[1], - options.getUsername(), - options.getPassword(), - executionOptions.getStreamLoadProp()); + getBackend(), + options.getTableIdentifier().split("\\.")[0], + options.getTableIdentifier().split("\\.")[1], + options.getUsername(), + options.getPassword(), + executionOptions.getStreamLoadProp()); LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr()); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { @@ -126,9 +134,8 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private void addBatch(RowData row) { StringJoiner value = new StringJoiner(this.fieldDelimiter); - GenericRowData rowData = (GenericRowData) row; - for (int i = 0; i < row.getArity(); ++i) { - Object field = rowData.getField(i); + for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) { + Object field = fieldGetters[i].getFieldOrNull(row); if (field != null) { value.add(field.toString()); } else { @@ -213,6 +220,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private DorisOptions.Builder optionsBuilder; private DorisReadOptions readOptions; private DorisExecutionOptions executionOptions; + private DataType[] fieldDataTypes; public Builder() { this.optionsBuilder = DorisOptions.builder(); @@ -248,9 +256,18 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { return this; } + public Builder setFieldDataTypes(DataType[] fieldDataTypes) { + this.fieldDataTypes = fieldDataTypes; + return this; + } + public DorisDynamicOutputFormat build() { + final LogicalType[] logicalTypes = + Arrays.stream(fieldDataTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new); return new DorisDynamicOutputFormat( - optionsBuilder.build(), readOptions, executionOptions + optionsBuilder.build(), readOptions, executionOptions, logicalTypes ); } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index 92d69e6..c0d9934 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -38,15 +38,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT; -import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.*; /** * The {@link DorisDynamicTableFactory} translates the catalog table to a table source. @@ -63,92 +55,92 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory // doris options private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions - .key("doris.read.field") - .stringType() - .noDefaultValue() - .withDescription("List of column names in the Doris table, separated by commas"); + .key("doris.read.field") + .stringType() + .noDefaultValue() + .withDescription("List of column names in the Doris table, separated by commas"); private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions - .key("doris.filter.query") - .stringType() - .noDefaultValue() - .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); + .key("doris.filter.query") + .stringType() + .noDefaultValue() + .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering"); private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions - .key("doris.request.tablet.size") - .intType() - .defaultValue(DORIS_TABLET_SIZE_DEFAULT) - .withDescription(""); + .key("doris.request.tablet.size") + .intType() + .defaultValue(DORIS_TABLET_SIZE_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions - .key("doris.request.connect.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) - .withDescription(""); + .key("doris.request.connect.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions - .key("doris.request.read.timeout.ms") - .intType() - .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) - .withDescription(""); + .key("doris.request.read.timeout.ms") + .intType() + .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions - .key("doris.request.query.timeout.s") - .intType() - .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) - .withDescription(""); + .key("doris.request.query.timeout.s") + .intType() + .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions - .key("doris.request.retries") - .intType() - .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) - .withDescription(""); + .key("doris.request.retries") + .intType() + .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT) + .withDescription(""); private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions - .key("doris.deserialize.arrow.async") - .booleanType() - .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) - .withDescription(""); + .key("doris.deserialize.arrow.async") + .booleanType() + .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions - .key("doris.request.retriesdoris.deserialize.queue.size") - .intType() - .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) - .withDescription(""); + .key("doris.request.retriesdoris.deserialize.queue.size") + .intType() + .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT) + .withDescription(""); private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions - .key("doris.batch.size") - .intType() - .defaultValue(DORIS_BATCH_SIZE_DEFAULT) - .withDescription(""); + .key("doris.batch.size") + .intType() + .defaultValue(DORIS_BATCH_SIZE_DEFAULT) + .withDescription(""); private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions - .key("doris.exec.mem.limit") - .longType() - .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) - .withDescription(""); + .key("doris.exec.mem.limit") + .longType() + .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT) + .withDescription(""); // flink write config options private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions - .key("sink.batch.size") - .intType() - .defaultValue(100) - .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + - " of records, will flush data. The default value is 100."); + .key("sink.batch.size") + .intType() + .defaultValue(100) + .withDescription("the flush max size (includes all append, upsert and delete records), over this number" + + " of records, will flush data. The default value is 100."); private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions - .key("sink.max-retries") - .intType() - .defaultValue(3) - .withDescription("the max retry times if writing records to database failed."); + .key("sink.max-retries") + .intType() + .defaultValue(3) + .withDescription("the max retry times if writing records to database failed."); private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions - .key("sink.batch.interval") - .durationType() - .defaultValue(Duration.ofSeconds(1)) - .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + - "default value is 1s."); + .key("sink.batch.interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 1s."); // Prefix for Doris StreamLoad specific properties. @@ -207,16 +199,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); // create and return dynamic table source return new DorisDynamicTableSource( - getDorisOptions(helper.getOptions()), - getDorisReadOptions(helper.getOptions()), - physicalSchema); + getDorisOptions(helper.getOptions()), + getDorisReadOptions(helper.getOptions()), + physicalSchema); } private DorisOptions getDorisOptions(ReadableConfig readableConfig) { final String fenodes = readableConfig.get(FENODES); final DorisOptions.Builder builder = DorisOptions.builder() - .setFenodes(fenodes) - .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); + .setFenodes(fenodes) + .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER)); readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername); readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword); @@ -226,16 +218,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) { final DorisReadOptions.Builder builder = DorisReadOptions.builder(); builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC)) - .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) - .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT)) - .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) - .setReadFields(readableConfig.get(DORIS_READ_FIELD)) - .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)) - .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE)) - .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) - .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) - .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) - .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)); + .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE)) + .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT)) + .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY)) + .setReadFields(readableConfig.get(DORIS_READ_FIELD)) + .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S)) + .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE)) + .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS)) + .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS)) + .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES)) + .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE)); return builder.build(); } @@ -267,11 +259,14 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory helper.validateExcept(STREAM_LOAD_PROP_PREFIX); Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions()); + TableSchema physicalSchema = + TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); // create and return dynamic table source return new DorisDynamicTableSink( - getDorisOptions(helper.getOptions()), - getDorisReadOptions(helper.getOptions()), - getDorisExecutionOptions(helper.getOptions(), streamLoadProp) + getDorisOptions(helper.getOptions()), + getDorisReadOptions(helper.getOptions()), + getDorisExecutionOptions(helper.getOptions(), streamLoadProp), + physicalSchema ); } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index dc710d7..0b69ea7 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -19,6 +19,7 @@ package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.OutputFormatProvider; @@ -32,38 +33,41 @@ public class DorisDynamicTableSink implements DynamicTableSink { private final DorisOptions options; private final DorisReadOptions readOptions; private final DorisExecutionOptions executionOptions; + private final TableSchema tableSchema; - public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) { + public DorisDynamicTableSink(DorisOptions options, DorisReadOptions readOptions, DorisExecutionOptions executionOptions, TableSchema tableSchema) { this.options = options; this.readOptions = readOptions; this.executionOptions = executionOptions; + this.tableSchema = tableSchema; } @Override public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { return ChangelogMode.newBuilder() - .addContainedKind(RowKind.INSERT) - .addContainedKind(RowKind.DELETE) - .addContainedKind(RowKind.UPDATE_AFTER) - .build(); + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.DELETE) + .addContainedKind(RowKind.UPDATE_AFTER) + .build(); } @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { DorisDynamicOutputFormat.Builder builder = DorisDynamicOutputFormat.builder() - .setFenodes(options.getFenodes()) - .setUsername(options.getUsername()) - .setPassword(options.getPassword()) - .setTableIdentifier(options.getTableIdentifier()) - .setReadOptions(readOptions) - .setExecutionOptions(executionOptions); + .setFenodes(options.getFenodes()) + .setUsername(options.getUsername()) + .setPassword(options.getPassword()) + .setTableIdentifier(options.getTableIdentifier()) + .setReadOptions(readOptions) + .setExecutionOptions(executionOptions) + .setFieldDataTypes(tableSchema.getFieldDataTypes());; return OutputFormatProvider.of(builder.build()); } @Override public DynamicTableSink copy() { - return new DorisDynamicTableSink(options, readOptions, executionOptions); + return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org