This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bb180b2988 [Feature][Connector-V2] HTTP supports page increase #5477 
(#5561)
bb180b2988 is described below

commit bb180b298846a0f7214d07ee9bcb66a900e368f5
Author: xiaofan2012 <[email protected]>
AuthorDate: Thu Oct 26 17:47:05 2023 +0800

    [Feature][Connector-V2] HTTP supports page increase #5477 (#5561)
    
    
    ---------
    
    Co-authored-by: xiaofan2022 <[email protected]>
    Co-authored-by: Eric <[email protected]>
---
 docs/en/connector-v2/source/Http.md                |  33 +++++
 .../seatunnel/http/config/HttpConfig.java          |  19 +++
 .../connectors/seatunnel/http/config/PageInfo.java |  35 +++++
 .../seatunnel/http/source/HttpSource.java          |  30 ++++-
 .../seatunnel/http/source/HttpSourceReader.java    | 116 +++++++++++++----
 .../seatunnel/e2e/connector/http/HttpIT.java       |   7 +
 .../resources/http_page_increase_no_page_num.conf  |  85 ++++++++++++
 .../resources/http_page_increase_page_num.conf     |  85 ++++++++++++
 .../src/test/resources/mockserver-config.json      | 144 +++++++++++++++++++++
 9 files changed, 529 insertions(+), 25 deletions(-)

diff --git a/docs/en/connector-v2/source/Http.md 
b/docs/en/connector-v2/source/Http.md
index f3e6a221bb..0c01be813e 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -48,6 +48,10 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | schema                      | Config  | No       | -       | Http and 
seatunnel data structure mapping                                                
                                            |
 | schema.fields               | Config  | No       | -       | The schema 
fields of upstream data                                                         
                                          |
 | json_field                  | Config  | No       | -       | This parameter 
helps you configure the schema,so this parameter must be used with schema.      
                                      |
+| pageing                     | Config  | No       | -       | This parameter 
is used for paging queries                                                      
                                      |
+| pageing.page_field          | String  | No       | -       | This parameter 
is used to specify the page field name in the request parameter                 
                                      |
+| pageing.total_page_size     | Int     | No       | -       | This parameter 
is used to control the total number of pages                                    
                                      |
+| pageing.batch_size          | Int     | No       | -       | The batch size 
returned per request is used to determine whether to continue when the total 
number of pages is unknown               |
 | content_json                | String  | No       | -       | This parameter 
can get some json data.If you only need the data in the 'book' section, 
configure `content_field = "$.store.book.*"`. |
 | format                      | String  | No       | json    | The format of 
upstream data, now only support `json` `text`, default `json`.                  
                                       |
 | method                      | String  | No       | get     | Http request 
method, only supports GET, POST method.                                         
                                        |
@@ -310,6 +314,35 @@ source {
 - Test data can be found at this link 
[mockserver-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json)
 - See this link for task configuration 
[http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
 
+### pageing
+
+```hocon
+source {
+    Http {
+      url = "http://localhost:8080/mock/queryData";
+      method = "GET"
+      format = "json"
+      params={
+       page: "${page}"
+      }
+      content_field = "$.data.*"
+      pageing={
+       total_page_size=20
+       page_field=page
+       #when don't know the total_page_size use batch_size if read 
size<batch_size finish ,otherwise continue
+       #batch_size=10
+      }
+      schema = {
+        fields {
+          name = string
+          age = string
+        }
+      }
+    }
+}
+
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index d57f7ad8d3..2a68249b86 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -29,6 +29,25 @@ public class HttpConfig {
     public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;
     public static final Option<String> URL =
             
Options.key("url").stringType().noDefaultValue().withDescription("Http request 
url");
+    public static final Option<Long> TOTAL_PAGE_SIZE =
+            Options.key("total_page_size")
+                    .longType()
+                    .defaultValue(0L)
+                    .withDescription("total page size");
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription(
+                            "the batch size returned per request is used to 
determine whether to continue when the total number of pages is unknown");
+    public static final Option<String> PAGE_FIELD =
+            Options.key("page_field")
+                    .stringType()
+                    .defaultValue("page")
+                    .withDescription(
+                            "this parameter is used to specify the page field 
name in the request parameter");
+    public static final Option<Map<String, String>> PAGEING =
+            
Options.key("pageing").mapType().noDefaultValue().withDescription("pageing");
     public static final Option<HttpRequestMethod> METHOD =
             Options.key("method")
                     .enumType(HttpRequestMethod.class)
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
new file mode 100644
index 0000000000..a5c7061347
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.http.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.io.Serializable;
+
+@Setter
+@Getter
+@ToString
+public class PageInfo implements Serializable {
+
+    private Long totalPageSize;
+
+    private Integer batchSize;
+    private String pageField;
+    private Long pageIndex;
+}
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
index 8e8311b650..e0a30ad061 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java
@@ -43,6 +43,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
@@ -53,6 +54,7 @@ import java.util.Locale;
 @AutoService(SeaTunnelSource.class)
 public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
     protected final HttpParameter httpParameter = new HttpParameter();
+    protected PageInfo pageInfo;
     protected SeaTunnelRowType rowType;
     protected JsonField jsonField;
     protected String contentField;
@@ -83,6 +85,31 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
         }
         this.httpParameter.buildWithConfig(pluginConfig);
         buildSchemaWithConfig(pluginConfig);
+        buildPagingWithConfig(pluginConfig);
+    }
+
+    private void buildPagingWithConfig(Config pluginConfig) {
+        if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) {
+            pageInfo = new PageInfo();
+            Config pageConfig = 
pluginConfig.getConfig(HttpConfig.PAGEING.key());
+            if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) {
+                
pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key()));
+            }
+            if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) {
+                
pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key()));
+            } else {
+                
pageInfo.setTotalPageSize(HttpConfig.TOTAL_PAGE_SIZE.defaultValue());
+            }
+
+            if (pageConfig.hasPath(HttpConfig.BATCH_SIZE.key())) {
+                
pageInfo.setBatchSize(pageConfig.getInt(HttpConfig.BATCH_SIZE.key()));
+            } else {
+                pageInfo.setBatchSize(HttpConfig.BATCH_SIZE.defaultValue());
+            }
+            if (pageConfig.hasPath(HttpConfig.PAGE_FIELD.key())) {
+                
pageInfo.setPageField(pageConfig.getString(HttpConfig.PAGE_FIELD.key()));
+            }
+        }
     }
 
     protected void buildSchemaWithConfig(Config pluginConfig) {
@@ -141,7 +168,8 @@ public class HttpSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
                 readerContext,
                 this.deserializationSchema,
                 jsonField,
-                contentField);
+                contentField,
+                pageInfo);
     }
 
     private JsonField getJsonField(Config jsonFieldConf) {
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 1b9969a237..3c4669659f 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
 import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
 
@@ -36,6 +37,7 @@ import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.Option;
 import com.jayway.jsonpath.ReadContext;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.BufferedReader;
@@ -46,8 +48,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 @Slf4j
+@Setter
 public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
     protected final SingleSplitReaderContext context;
     protected final HttpParameter httpParameter;
@@ -61,6 +65,8 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     private final String contentJson;
     private final Configuration jsonConfiguration =
             Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
+    private boolean noMoreElementFlag = true;
+    private Optional<PageInfo> pageInfoOptional = Optional.empty();
 
     public HttpSourceReader(
             HttpParameter httpParameter,
@@ -75,6 +81,21 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
         this.contentJson = contentJson;
     }
 
+    public HttpSourceReader(
+            HttpParameter httpParameter,
+            SingleSplitReaderContext context,
+            DeserializationSchema<SeaTunnelRow> deserializationSchema,
+            JsonField jsonField,
+            String contentJson,
+            PageInfo pageInfo) {
+        this.context = context;
+        this.httpParameter = httpParameter;
+        this.deserializationCollector = new 
DeserializationCollector(deserializationSchema);
+        this.jsonField = jsonField;
+        this.contentJson = contentJson;
+        this.pageInfoOptional = Optional.ofNullable(pageInfo);
+    }
+
     @Override
     public void open() {
         httpClient = new HttpClientProvider(httpParameter);
@@ -87,40 +108,72 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
         }
     }
 
-    @Override
-    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        try {
-            HttpResponse response =
-                    httpClient.execute(
-                            this.httpParameter.getUrl(),
-                            this.httpParameter.getMethod().getMethod(),
-                            this.httpParameter.getHeaders(),
-                            this.httpParameter.getParams(),
-                            this.httpParameter.getBody());
-            if (HttpResponse.STATUS_OK == response.getCode()) {
-                String content = response.getContent();
-                if (!Strings.isNullOrEmpty(content)) {
-                    if (this.httpParameter.isEnableMultilines()) {
-                        StringReader stringReader = new StringReader(content);
-                        BufferedReader bufferedReader = new 
BufferedReader(stringReader);
-                        String lineStr;
-                        while ((lineStr = bufferedReader.readLine()) != null) {
-                            collect(output, lineStr);
-                        }
-                    } else {
-                        collect(output, content);
+    public void pollAndCollectData(Collector<SeaTunnelRow> output) throws 
Exception {
+        HttpResponse response =
+                httpClient.execute(
+                        this.httpParameter.getUrl(),
+                        this.httpParameter.getMethod().getMethod(),
+                        this.httpParameter.getHeaders(),
+                        this.httpParameter.getParams(),
+                        this.httpParameter.getBody());
+        if (HttpResponse.STATUS_OK == response.getCode()) {
+            String content = response.getContent();
+            if (!Strings.isNullOrEmpty(content)) {
+                if (this.httpParameter.isEnableMultilines()) {
+                    StringReader stringReader = new StringReader(content);
+                    BufferedReader bufferedReader = new 
BufferedReader(stringReader);
+                    String lineStr;
+                    while ((lineStr = bufferedReader.readLine()) != null) {
+                        collect(output, lineStr);
                     }
+                } else {
+                    collect(output, content);
                 }
-                return;
             }
+            log.info(
+                    "http client execute success request param:[{}], http 
response status code:[{}], content:[{}]",
+                    httpParameter.getParams(),
+                    response.getCode(),
+                    response.getContent());
+        } else {
             log.error(
                     "http client execute exception, http response status 
code:[{}], content:[{}]",
                     response.getCode(),
                     response.getContent());
+        }
+    }
+
+    private void updateRequestParam(PageInfo pageInfo) {
+        if (this.httpParameter.getParams() == null) {
+            httpParameter.setParams(new HashMap<>());
+        }
+        this.httpParameter
+                .getParams()
+                .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        try {
+            if (pageInfoOptional.isPresent()) {
+                noMoreElementFlag = false;
+                Long pageIndex = 1L;
+                while (!noMoreElementFlag) {
+                    PageInfo info = pageInfoOptional.get();
+                    // increment page
+                    info.setPageIndex(pageIndex);
+                    // set request param
+                    updateRequestParam(info);
+                    pollAndCollectData(output);
+                    pageIndex += 1;
+                }
+            } else {
+                pollAndCollectData(output);
+            }
         } catch (Exception e) {
             log.error(e.getMessage(), e);
         } finally {
-            if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            if (Boundedness.BOUNDED.equals(context.getBoundedness()) && 
noMoreElementFlag) {
                 // signal to the source that we have reached the end of the 
data.
                 log.info("Closed the bounded http source");
                 context.signalNoMoreElement();
@@ -140,6 +193,21 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
             this.initJsonPath(jsonField);
             data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), 
jsonField)).toString();
         }
+        // page increase
+        if (pageInfoOptional.isPresent()) {
+            // Determine whether the task is completed by specifying the 
presence of the 'total
+            // page' field
+            PageInfo pageInfo = pageInfoOptional.get();
+            if (pageInfo.getTotalPageSize() > 0) {
+                noMoreElementFlag = pageInfo.getPageIndex() >= 
pageInfo.getTotalPageSize();
+            } else {
+                // no 'total page' configured
+                int readSize = JsonUtils.stringToJsonNode(data).size();
+                // if read size < BatchSize : read finish
+                // if read size = BatchSize : read next page.
+                noMoreElementFlag = readSize < pageInfo.getBatchSize();
+            }
+        }
         deserializationCollector.collect(data.getBytes(), output);
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index d65617bb55..bd85ed876e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -145,6 +145,13 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
         Container.ExecResult execResult14 =
                 container.executeJob("/http_jsonrequestbody_to_assert.conf");
         Assertions.assertEquals(0, execResult14.getExitCode());
+
+        Container.ExecResult execResult15 =
+                container.executeJob("/http_page_increase_page_num.conf");
+        Assertions.assertEquals(0, execResult15.getExitCode());
+        Container.ExecResult execResult16 =
+                container.executeJob("/http_page_increase_no_page_num.conf");
+        Assertions.assertEquals(0, execResult16.getExitCode());
     }
 
     public String getMockServerConfig() {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
new file mode 100644
index 0000000000..387201b379
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    result_table_name = "http"
+    url = "http://mockserver:1080/query/pagesNoPageNum";
+    method = "GET"
+    format = "json"
+    json_field = {
+      name = "$.data[*].name"
+      age = "$.data[*].age"
+    }
+    pageing = {
+      batch_size=10
+      page_field = page
+    }
+    schema = {
+      fields {
+        name = string
+        age = int
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "http"
+  }
+  Assert {
+    source_table_name = "http"
+    rules {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 12
+        },
+        {
+          rule_type = MAX_ROW
+          rule_value = 12
+        }
+      ]
+      field_rules = [
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = age
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
new file mode 100644
index 0000000000..c3202d6f7b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Http {
+    result_table_name = "http"
+    url = "http://mockserver:1080/query/pages";
+    method = "GET"
+    format = "json"
+    json_field = {
+      name = "$.data[*].name"
+      age = "$.data[*].age"
+    }
+    pageing = {
+      total_page_size = 2
+      page_field = page
+    }
+    schema = {
+      fields {
+        name = string
+        age = int
+      }
+    }
+  }
+}
+
+sink {
+  Console {
+    source_table_name = "http"
+  }
+  Assert {
+    source_table_name = "http"
+    rules {
+      row_rules = [
+        {
+          rule_type = MIN_ROW
+          rule_value = 4
+        },
+        {
+          rule_type = MAX_ROW
+          rule_value = 4
+        }
+      ]
+      field_rules = [
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = age
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 2c419277e0..9cb561225d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4526,5 +4526,149 @@
         "Content-Type": "application/json"
       }
     }
+  },
+  {
+    "httpRequest": {
+      "method" : "GET",
+      "path": "/query/pages",
+      "queryStringParameters": {
+        "page": "1"
+      }
+    },
+    "httpResponse": {
+      "body":
+      {
+        "status": null,
+        "msg": null,
+        "data": [
+          {
+            "name": "name1",
+            "age": 69
+          },
+          {
+            "name": "name2",
+            "age": 51
+          }
+        ],
+        "currentPageIndex": 1,
+        "totalPage": 2
+      }
+    }
+  },
+  {
+    "httpRequest": {
+      "method" : "GET",
+      "path": "/query/pages",
+      "queryStringParameters": {
+        "page": "2"
+      }
+    },
+    "httpResponse": {
+      "body":
+      {
+        "status": null,
+        "msg": null,
+        "data": [
+          {
+            "name": "name1",
+            "age": 69
+          },
+          {
+            "name": "name2",
+            "age": 51
+          }
+        ],
+        "currentPageIndex": 2,
+        "totalPage": 2
+      }
+    }
+  },
+  {
+    "httpRequest": {
+      "method" : "GET",
+      "path": "/query/pagesNoPageNum",
+      "queryStringParameters": {
+        "page": "1"
+      }
+    },
+    "httpResponse": {
+      "body":
+      {
+        "status": null,
+        "msg": null,
+        "data": [
+          {
+            "name": "name1",
+            "age": 69
+          },
+          {
+            "name": "name2",
+            "age": 51
+          },
+          {
+            "name": "name3",
+            "age": 36
+          },
+          {
+            "name": "name4",
+            "age": 51
+          },
+          {
+            "name": "name5",
+            "age": 74
+          },
+          {
+            "name": "name6",
+            "age": 51
+          },
+          {
+            "name": "name7",
+            "age": 67
+          },
+          {
+            "name": "name8",
+            "age": 12
+          },
+          {
+            "name": "name9",
+            "age": 45
+          },
+          {
+            "name": "name10",
+            "age": 23
+          }
+        ],
+        "currentPageIndex": 1,
+        "hasNext": true
+      }
+    }
+  },
+  {
+    "httpRequest": {
+      "method" : "GET",
+      "path": "/query/pagesNoPageNum",
+      "queryStringParameters": {
+        "page": "2"
+      }
+    },
+    "httpResponse": {
+      "body":
+      {
+        "status": null,
+        "msg": null,
+        "data": [
+          {
+            "name": "name11",
+            "age": 69
+          },
+          {
+            "name": "name22",
+            "age": 51
+          }
+        ],
+        "currentPageIndex": 2,
+        "hasNext": false
+      }
+    }
   }
 ]

Reply via email to