This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 9831982 [Improvement] Move DorisSourceBuilder into DorisSource as a static inner builder class (#189) 9831982 is described below commit 983198218adfcd9ec11e1b389f106dafbfc51c8a Author: thehuldra <samm...@outlook.com> AuthorDate: Mon Sep 4 17:37:33 2023 +0800 [Improvement] Move DorisSourceBuilder into DorisSource as a static inner builder class (#189) --- .../org/apache/doris/flink/source/DorisSource.java | 52 ++++++++++++++++ .../doris/flink/source/DorisSourceBuilder.java | 71 ---------------------- .../doris/flink/table/DorisDynamicTableSource.java | 3 +- .../doris/flink/source/DorisSourceExampleTest.java | 2 +- 4 files changed, 54 insertions(+), 74 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index e2b6d41..8edf10b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -120,4 +120,56 @@ public class DorisSource<OUT> implements Source<OUT, DorisSourceSplit, PendingSp public TypeInformation<OUT> getProducedType() { return deserializer.getProducedType(); } + + public static <OUT> DorisSourceBuilder<OUT> builder() { + return new DorisSourceBuilder(); + } + + /** + * build for DorisSource. + * @param <OUT> record type. + */ + + public static class DorisSourceBuilder<OUT> { + + private DorisOptions options; + private DorisReadOptions readOptions; + + // Boundedness + private Boundedness boundedness; + private DorisDeserializationSchema<OUT> deserializer; + + DorisSourceBuilder() { + boundedness = Boundedness.BOUNDED; + } + + + public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) { + this.options = options; + return this; + } + + public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions readOptions) { + this.readOptions = readOptions; + return this; + } + + public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) { + this.boundedness = boundedness; + return this; + } + + public DorisSourceBuilder<OUT> setDeserializer(DorisDeserializationSchema<OUT> deserializer) { + this.deserializer = deserializer; + return this; + } + + public DorisSource<OUT> build() { + if(readOptions == null){ + readOptions = DorisReadOptions.builder().build(); + } + return new DorisSource<>(options, readOptions, boundedness, deserializer); + } + } + } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java deleted file mode 100644 index 94febb8..0000000 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java +++ /dev/null @@ -1,71 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -package org.apache.doris.flink.source; - -import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.deserialization.DorisDeserializationSchema; -import org.apache.flink.api.connector.source.Boundedness; - -/** - * The builder class for {@link DorisSource} to make it easier for the users to construct a {@link - * DorisSource}. - **/ -public class DorisSourceBuilder<OUT> { - - private DorisOptions options; - private DorisReadOptions readOptions; - - // Boundedness - private Boundedness boundedness; - private DorisDeserializationSchema<OUT> deserializer; - - DorisSourceBuilder() { - boundedness = Boundedness.BOUNDED; - } - - public static <OUT> DorisSourceBuilder<OUT> builder() { - return new DorisSourceBuilder(); - } - - public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) { - this.options = options; - return this; - } - - public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions readOptions) { - this.readOptions = readOptions; - return this; - } - - public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) { - this.boundedness = boundedness; - return this; - } - - public DorisSourceBuilder<OUT> setDeserializer(DorisDeserializationSchema<OUT> deserializer) { - this.deserializer = deserializer; - return this; - } - - public DorisSource<OUT> build() { - if(readOptions == null){ - readOptions = DorisReadOptions.builder().build(); - } - return new DorisSource<>(options, readOptions, boundedness, deserializer); - } -} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 0fb096b..bd04e20 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -25,7 +25,6 @@ import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.PartitionDefinition; import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.source.DorisSource; -import org.apache.doris.flink.source.DorisSourceBuilder; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.TableSchema; @@ -114,7 +113,7 @@ public final class DorisDynamicTableSource implements ScanTableSource, LookupTab return InputFormatProvider.of(builder.build()); } else { //Read data using the interface of the FLIP-27 specification - DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder() + DorisSource<RowData> build = DorisSource.<RowData>builder() .setDorisReadOptions(readOptions) .setDorisOptions(options) .setDeserializer(new RowDataDeserializationSchema((RowType) physicalSchema.toRowDataType().getLogicalType())) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java index d85e70d..392596d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java @@ -34,7 +34,7 @@ public class DorisSourceExampleTest { @Test public void testBoundedDorisSource() throws Exception { - DorisSource<List<?>> dorisSource = DorisSourceBuilder.<List<?>>builder() + DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder() .setDorisOptions(OptionUtils.buildDorisOptions()) .setDorisReadOptions(OptionUtils.buildDorisReadOptions()) .setDeserializer(new SimpleListDeserializationSchema()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org