This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9831982  [Improvement] Move DorisSourceBuilder into DorisSource as a 
static inner builder class (#189)
9831982 is described below

commit 983198218adfcd9ec11e1b389f106dafbfc51c8a
Author: thehuldra <samm...@outlook.com>
AuthorDate: Mon Sep 4 17:37:33 2023 +0800

    [Improvement] Move DorisSourceBuilder into DorisSource as a static inner 
builder class (#189)
---
 .../org/apache/doris/flink/source/DorisSource.java | 52 ++++++++++++++++
 .../doris/flink/source/DorisSourceBuilder.java     | 71 ----------------------
 .../doris/flink/table/DorisDynamicTableSource.java |  3 +-
 .../doris/flink/source/DorisSourceExampleTest.java |  2 +-
 4 files changed, 54 insertions(+), 74 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index e2b6d41..8edf10b 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -120,4 +120,56 @@ public class DorisSource<OUT> implements Source<OUT, 
DorisSourceSplit, PendingSp
     public TypeInformation<OUT> getProducedType() {
         return deserializer.getProducedType();
     }
+
+    public static <OUT> DorisSourceBuilder<OUT> builder() {
+        return new DorisSourceBuilder();
+    }
+
+    /**
+     * build for DorisSource.
+     * @param <OUT> record type.
+     */
+
+    public static class DorisSourceBuilder<OUT> {
+
+        private DorisOptions options;
+        private DorisReadOptions readOptions;
+
+        // Boundedness
+        private Boundedness boundedness;
+        private DorisDeserializationSchema<OUT> deserializer;
+
+        DorisSourceBuilder() {
+            boundedness = Boundedness.BOUNDED;
+        }
+
+
+        public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
+            this.options = options;
+            return this;
+        }
+
+        public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions 
readOptions) {
+            this.readOptions = readOptions;
+            return this;
+        }
+
+        public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) 
{
+            this.boundedness = boundedness;
+            return this;
+        }
+
+        public DorisSourceBuilder<OUT> 
setDeserializer(DorisDeserializationSchema<OUT> deserializer) {
+            this.deserializer = deserializer;
+            return this;
+        }
+
+        public DorisSource<OUT> build() {
+            if(readOptions == null){
+                readOptions = DorisReadOptions.builder().build();
+            }
+            return new DorisSource<>(options, readOptions, boundedness, 
deserializer);
+        }
+    }
+
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
deleted file mode 100644
index 94febb8..0000000
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSourceBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.apache.doris.flink.source;
-
-import org.apache.doris.flink.cfg.DorisOptions;
-import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
-import org.apache.flink.api.connector.source.Boundedness;
-
-/**
- * The builder class for {@link DorisSource} to make it easier for the users 
to construct a {@link
- * DorisSource}.
- **/
-public class DorisSourceBuilder<OUT> {
-
-    private DorisOptions options;
-    private DorisReadOptions readOptions;
-
-    // Boundedness
-    private Boundedness boundedness;
-    private DorisDeserializationSchema<OUT> deserializer;
-
-    DorisSourceBuilder() {
-        boundedness = Boundedness.BOUNDED;
-    }
-
-    public static <OUT> DorisSourceBuilder<OUT> builder() {
-        return new DorisSourceBuilder();
-    }
-
-    public DorisSourceBuilder<OUT> setDorisOptions(DorisOptions options) {
-        this.options = options;
-        return this;
-    }
-
-    public DorisSourceBuilder<OUT> setDorisReadOptions(DorisReadOptions 
readOptions) {
-        this.readOptions = readOptions;
-        return this;
-    }
-
-    public DorisSourceBuilder<OUT> setBoundedness(Boundedness boundedness) {
-        this.boundedness = boundedness;
-        return this;
-    }
-
-    public DorisSourceBuilder<OUT> 
setDeserializer(DorisDeserializationSchema<OUT> deserializer) {
-        this.deserializer = deserializer;
-        return this;
-    }
-
-    public DorisSource<OUT> build() {
-        if(readOptions == null){
-            readOptions = DorisReadOptions.builder().build();
-        }
-        return new DorisSource<>(options, readOptions, boundedness, 
deserializer);
-    }
-}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index 0fb096b..bd04e20 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -25,7 +25,6 @@ import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.PartitionDefinition;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.source.DorisSource;
-import org.apache.doris.flink.source.DorisSourceBuilder;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableSchema;
@@ -114,7 +113,7 @@ public final class DorisDynamicTableSource implements 
ScanTableSource, LookupTab
             return InputFormatProvider.of(builder.build());
         } else {
             //Read data using the interface of the FLIP-27 specification
-            DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder()
+            DorisSource<RowData> build = DorisSource.<RowData>builder()
                     .setDorisReadOptions(readOptions)
                     .setDorisOptions(options)
                     .setDeserializer(new 
RowDataDeserializationSchema((RowType) 
physicalSchema.toRowDataType().getLogicalType()))
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
index d85e70d..392596d 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceExampleTest.java
@@ -34,7 +34,7 @@ public class DorisSourceExampleTest {
 
     @Test
     public void testBoundedDorisSource() throws Exception {
-        DorisSource<List<?>> dorisSource = 
DorisSourceBuilder.<List<?>>builder()
+        DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
                 .setDorisOptions(OptionUtils.buildDorisOptions())
                 .setDorisReadOptions(OptionUtils.buildDorisReadOptions())
                 .setDeserializer(new SimpleListDeserializationSchema())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to