This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c1b2675ab0 [Fix][connector-http] fix when post have param (#8434)
c1b2675ab0 is described below

commit c1b2675ab0826bc90a16766b1811f9f3dc4c2c0c
Author: CosmosNi <nijiahui_y...@cmss.chinamobile.com>
AuthorDate: Mon Feb 24 10:15:59 2025 +0800

    [Fix][connector-http] fix when post have param (#8434)
---
 docs/en/connector-v2/source/Http.md                | 104 +++++++++++++++------
 .../seatunnel/http/client/HttpClientProvider.java  |  66 +++++++++++--
 .../seatunnel/http/config/HttpConfig.java          |  11 +++
 .../seatunnel/http/config/HttpParameter.java       |  23 ++++-
 .../seatunnel/http/source/HttpSourceReader.java    |  30 ++++--
 .../seatunnel/myhours/source/MyHoursSource.java    |   6 +-
 .../source/config/MyHoursSourceParameter.java      |   6 +-
 .../prometheus/source/PrometheusSourceReader.java  |   3 +-
 .../seatunnel/e2e/connector/http/HttpIT.java       |   9 ++
 .../resources/http_formrequestbody_to_assert.conf  |   1 +
 ...t.conf => http_formrequestbody_to_assert2.conf} |   5 +-
 .../resources/http_page_increase_no_page_num.conf  |   1 +
 .../resources/http_page_increase_page_num.conf     |   1 +
 .../resources/http_page_increase_start_num.conf    |   1 +
 ...rt.conf => http_post_param_json_to_assert.conf} |   9 +-
 .../src/test/resources/mockserver-config.json      |  31 +++++-
 16 files changed, 253 insertions(+), 54 deletions(-)

diff --git a/docs/en/connector-v2/source/Http.md 
b/docs/en/connector-v2/source/Http.md
index 511ba04132..88b6a67504 100644
--- a/docs/en/connector-v2/source/Http.md
+++ b/docs/en/connector-v2/source/Http.md
@@ -42,31 +42,33 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Source Options
 
-|            Name             |  Type   | Required | Default |                 
                                            Description                         
                                     |
-|-----------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------|
-| url                         | String  | Yes      | -       | Http request 
url.                                                                            
                                        |
-| schema                      | Config  | No       | -       | Http and 
seatunnel data structure mapping                                                
                                            |
-| schema.fields               | Config  | No       | -       | The schema 
fields of upstream data                                                         
                                          |
-| json_field                  | Config  | No       | -       | This parameter 
helps you configure the schema,so this parameter must be used with schema.      
                                      |
-| pageing                     | Config  | No       | -       | This parameter 
is used for paging queries                                                      
                                      |
-| pageing.page_field          | String  | No       | -       | This parameter 
is used to specify the page field name in the request parameter                 
                                      |
-| pageing.total_page_size     | Int     | No       | -       | This parameter 
is used to control the total number of pages                                    
                                      |
-| pageing.batch_size          | Int     | No       | -       | The batch size 
returned per request is used to determine whether to continue when the total 
number of pages is unknown               |
-| pageing.start_page_number   | Int     | No       | 1       | Specify the 
page number from which synchronization starts                                   
                                         |
-| content_json                | String  | No       | -       | This parameter 
can get some json data.If you only need the data in the 'book' section, 
configure `content_field = "$.store.book.*"`. |
-| format                      | String  | No       | text    | The format of 
upstream data, now only support `json` `text`, default `text`.                  
                                       |
-| method                      | String  | No       | get     | Http request 
method, only supports GET, POST method.                                         
                                        |
-| headers                     | Map     | No       | -       | Http headers.   
                                                                                
                                     |
-| params                      | Map     | No       | -       | Http params,the 
program will automatically add http header application/x-www-form-urlencoded.   
                                     |
-| body                        | String  | No       | -       | Http body,the 
program will automatically add http header application/json,body is jsonbody.   
                                       |
-| poll_interval_millis        | Int     | No       | -       | Request http 
api interval(millis) in stream mode.                                            
                                        |
-| retry                       | Int     | No       | -       | The max retry 
times if request http return to `IOException`.                                  
                                       |
-| retry_backoff_multiplier_ms | Int     | No       | 100     | The 
retry-backoff times(millis) multiplier if request http failed.                  
                                                 |
-| retry_backoff_max_ms        | Int     | No       | 10000   | The maximum 
retry-backoff times(millis) if request http failed                              
                                         |
-| enable_multi_lines          | Boolean | No       | false   |                 
                                                                                
                                     |
-| connect_timeout_ms          | Int     | No       | 12000   | Connection 
timeout setting, default 12s.                                                   
                                          |
-| socket_timeout_ms           | Int     | No       | 60000   | Socket timeout 
setting, default 60s.                                                           
                                      |
-| common-options              |         | No       | -       | Source plugin 
common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                    |
+|            Name             |  Type   | Required | Default | Description     
                                                                                
                                                                                
  |
+|-----------------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                         | String  | Yes      | -       | Http request 
url.                                                                            
                                                                                
     |
+| schema                      | Config  | No       | -       | Http and 
seatunnel data structure mapping                                                
                                                                                
         |
+| schema.fields               | Config  | No       | -       | The schema 
fields of upstream data                                                         
                                                                                
       |
+| json_field                  | Config  | No       | -       | This parameter 
helps you configure the schema,so this parameter must be used with schema.      
                                                                                
   |
+| pageing                     | Config  | No       | -       | This parameter 
is used for paging queries                                                      
                                                                                
   |
+| pageing.page_field          | String  | No       | -       | This parameter 
is used to specify the page field name in the request parameter                 
                                                                                
   |
+| pageing.total_page_size     | Int     | No       | -       | This parameter 
is used to control the total number of pages                                    
                                                                                
   |
+| pageing.batch_size          | Int     | No       | -       | The batch size 
returned per request is used to determine whether to continue when the total 
number of pages is unknown                                                      
      |
+| pageing.start_page_number   | Int     | No       | 1       | Specify the 
page number from which synchronization starts                                   
                                                                                
      |
+| content_json                | String  | No       | -       | This parameter 
can get some json data.If you only need the data in the 'book' section, 
configure `content_field = "$.store.book.*"`.                                   
           |
+| format                      | String  | No       | text    | The format of 
upstream data, now only support `json` `text`, default `text`.                  
                                                                                
    |
+| method                      | String  | No       | get     | Http request 
method, only supports GET, POST method.                                         
                                                                                
     |
+| headers                     | Map     | No       | -       | Http headers.   
                                                                                
                                                                                
  |
+| params                      | Map     | No       | -       | Http params.    
                                                                                
                                                                                
  |
+| body                        | String  | No       | -       | Http body,the 
program will automatically add http header application/json,body is jsonbody.   
                                                                                
    |
+| poll_interval_millis        | Int     | No       | -       | Request http 
api interval(millis) in stream mode.                                            
                                                                                
     |
+| retry                       | Int     | No       | -       | The max retry 
times if request http return to `IOException`.                                  
                                                                                
    |
+| retry_backoff_multiplier_ms | Int     | No       | 100     | The 
retry-backoff times(millis) multiplier if request http failed.                  
                                                                                
              |
+| retry_backoff_max_ms        | Int     | No       | 10000   | The maximum 
retry-backoff times(millis) if request http failed                              
                                                                                
      |
+| enable_multi_lines          | Boolean | No       | false   |                 
                                                                                
                                                                                
  |
+| connect_timeout_ms          | Int     | No       | 12000   | Connection 
timeout setting, default 12s.                                                   
                                                                                
       |
+| socket_timeout_ms           | Int     | No       | 60000   | Socket timeout 
setting, default 60s.                                                           
                                                                                
   |
+| common-options              |         | No       | -       | Source plugin 
common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                  |
+| keep_params_as_form         |    Boolean     | No       | false       | 
Whether the params are submitted according to the form, used for compatibility 
with legacy behaviors. When true, the value of the params parameter is 
submitted through the form. |
+| keep_page_param_as_http_param         |    Boolean     | No       | false    
   | Whether to set the paging parameters to params. For compatibility with 
legacy behaviors.                                                               
                                                                   |
 
 ## How to Create a Http Data Synchronization Jobs
 
@@ -181,6 +183,46 @@ connector will generate data as the following:
 |----------------------------------------------------------|
 | {"code":  200, "data":  "get success", "success":  true} |
 
+### keep_params_as_form
+For compatibility with old versions of http.
+When set to true,`<params>` and `<pageing>` will be submitted in the form.
+When set to false,`<params>` will be added to the url path,and `<pageing>` 
will not be added to the body or form. It will replace placeholders in params 
and body.
+
+### keep_page_param_as_http_param
+Whether to set the paging parameters to params.
+When set to true,`<pageing>` is set to `<params>`.
+When set to false,When the page field exists in `<body>` or `<params>`, 
replace value.
+
+When set to false,config example:
+```hocon
+body="""{"id":1,"page":"${page}"}"""
+```
+
+```hocon
+params={
+ page: "${page}"
+}
+```
+
+### params
+By default, the parameters will be added to the url path.
+If you need to keep the old version behavior, please check keep_params_as_form.
+
+### body
+The HTTP body is used to carry the actual data in requests or responses, 
including JSON, form submissions. 
+
+The reference format is as follows:
+```hocon
+body="{"id":1,"name":"setunnel"}"
+```
+
+For form submissions,please set the content-type as follows.
+```hocon
+headers {
+    Content-Type = "application/x-www-form-urlencoded"
+}
+```
+
 ### content_json
 
 This parameter can get some json data.If you only need the data in the 'book' 
section, configure `content_field = "$.store.book.*"`.
@@ -318,17 +360,21 @@ source {
 - See this link for task configuration 
[http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
 
 ### pageing
+When you need to concatenate page param in the URL,then add params.
+
+When you need to set page param to the body,add the key of page param in body.
 
 ```hocon
 source {
     Http {
       url = "http://localhost:8080/mock/queryData";
-      method = "GET"
+      method = "POST"
       format = "json"
+      body="""{"id":1,"page":"${page}"}"""
+      content_field = "$.data.*"
       params={
        page: "${page}"
       }
-      content_field = "$.data.*"
       pageing={
        total_page_size=20
        page_field=page
@@ -344,6 +390,7 @@ source {
     }
 }
 
+
 ```
 
 ## Changelog
@@ -354,5 +401,4 @@ source {
 
 ### new version
 
-- [Feature][Connector-V2][HTTP] Use json-path parsing 
([3510](https://github.com/apache/seatunnel/pull/3510))
-
+- [Feature][Connector-V2][HTTP] Use json-path parsing 
([3510](https://github.com/apache/seatunnel/pull/3510))
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
index cbea79a15a..1fb4a54cad 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java
@@ -17,8 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.http.client;
 
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.http.HttpStatus;
@@ -67,6 +69,7 @@ import java.util.concurrent.TimeUnit;
 public class HttpClientProvider implements AutoCloseable {
     private static final String ENCODING = "UTF-8";
     private static final String APPLICATION_JSON = "application/json";
+    private static final String APPLICATION_FORM = 
"application/x-www-form-urlencoded";
     private static final int INITIAL_CAPACITY = 16;
     private RequestConfig requestConfig;
     private final CloseableHttpClient httpClient;
@@ -115,11 +118,26 @@ public class HttpClientProvider implements AutoCloseable {
             String method,
             Map<String, String> headers,
             Map<String, String> params,
-            String body)
+            Map<String, Object> body,
+            boolean keepParamsAsForm)
             throws Exception {
         // convert method option to uppercase
         method = method.toUpperCase(Locale.ROOT);
+        // Keep the original post  logic
+        if (HttpPost.METHOD_NAME.equals(method) && keepParamsAsForm) {
+            // Compatible with old versions
+            if (MapUtils.isNotEmpty(params)) {
+                headers = MapUtils.isEmpty(headers) ? new HashMap<>() : 
headers;
+                headers.putIfAbsent(HTTP.CONTENT_TYPE, APPLICATION_FORM);
+            }
+            if (MapUtils.isEmpty(body)) {
+                body = new HashMap<>();
+            }
+            body.putAll(params);
+            return doPost(url, headers, Collections.emptyMap(), body);
+        }
         if (HttpPost.METHOD_NAME.equals(method)) {
+            // Create access address
             return doPost(url, headers, params, body);
         }
         if (HttpGet.METHOD_NAME.equals(method)) {
@@ -292,7 +310,6 @@ public class HttpClientProvider implements AutoCloseable {
     /**
      * Send a post request with request headers , request parameters and 
request body
      *
-     * @param url request address
      * @param headers request header map
      * @param params request parameter map
      * @param body request body
@@ -300,16 +317,19 @@ public class HttpClientProvider implements AutoCloseable {
      * @throws Exception information
      */
     public HttpResponse doPost(
-            String url, Map<String, String> headers, Map<String, String> 
params, String body)
+            String url,
+            Map<String, String> headers,
+            Map<String, String> params,
+            Map<String, Object> body)
             throws Exception {
-        // create a new http get
-        HttpPost httpPost = new HttpPost(url);
+        URIBuilder uriBuilder = new URIBuilder(url);
+        // add parameter to uri
+        addParameters(uriBuilder, params);
+        HttpPost httpPost = new HttpPost(uriBuilder.build());
         // set default request config
         httpPost.setConfig(requestConfig);
         // set request header
         addHeaders(httpPost, headers);
-        // set request params
-        addParameters(httpPost, params);
         // add body in request
         addBody(httpPost, body);
         // return http response
@@ -429,6 +449,38 @@ public class HttpClientProvider implements AutoCloseable {
         headers.forEach(request::addHeader);
     }
 
+    private void addBody(HttpEntityEnclosingRequestBase request, Map<String, 
Object> body)
+            throws UnsupportedEncodingException {
+        if (MapUtils.isEmpty(body)) {
+            body = new HashMap<>();
+        }
+        boolean isFormSubmit =
+                request.getHeaders(HTTP.CONTENT_TYPE) != null
+                        && request.getHeaders(HTTP.CONTENT_TYPE).length > 0
+                        && APPLICATION_FORM.equalsIgnoreCase(
+                                
request.getHeaders(HTTP.CONTENT_TYPE)[0].getValue());
+        if (isFormSubmit) {
+            if (MapUtils.isNotEmpty(body)) {
+                List<NameValuePair> parameters = new ArrayList<>();
+                Set<Map.Entry<String, Object>> entrySet = body.entrySet();
+                for (Map.Entry<String, Object> e : entrySet) {
+                    String name = e.getKey();
+                    String value = e.getValue().toString();
+                    NameValuePair pair = new BasicNameValuePair(name, value);
+                    parameters.add(pair);
+                }
+                // Set to the request's http object
+                request.setEntity(new UrlEncodedFormEntity(parameters, 
ENCODING));
+            }
+        } else {
+            request.addHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON);
+            StringEntity entity =
+                    new StringEntity(JsonUtils.toJsonString(body), 
ContentType.APPLICATION_JSON);
+            entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, 
APPLICATION_JSON));
+            request.setEntity(entity);
+        }
+    }
+
     private boolean checkAlreadyHaveContentType(HttpEntityEnclosingRequestBase 
request) {
         if (request.getEntity() != null && 
request.getEntity().getContentType() != null) {
             return 
HTTP.CONTENT_TYPE.equals(request.getEntity().getContentType().getName());
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
index 489b8d124b..745534483f 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java
@@ -33,6 +33,17 @@ public class HttpConfig {
     public static final int DEFAULT_SOCKET_TIMEOUT_MS = 6000 * 10;
     public static final Option<String> URL =
             
Options.key("url").stringType().noDefaultValue().withDescription("Http request 
url");
+    public static final Option<Boolean> KEEP_PARAMS_AS_FORM =
+            Options.key("keep_params_as_form")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Keep param as form");
+    public static final Option<Boolean> KEEP_PAGE_PARAM_AS_HTTP_PARAM =
+            Options.key("keep_page_param_as_http_param")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("keep page param as http param");
+
     public static final Option<Long> TOTAL_PAGE_SIZE =
             Options.key("total_page_size")
                     .longType()
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
index e4dffb539a..ca999ad8c7 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.http.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import lombok.Data;
 
@@ -32,7 +33,10 @@ public class HttpParameter implements Serializable {
     protected HttpRequestMethod method;
     protected Map<String, String> headers;
     protected Map<String, String> params;
-    protected String body;
+    protected Map<String, Object> pageParams;
+    protected boolean keepParamsAsForm = false;
+    protected boolean keepPageParamAsHttpParam = false;
+    protected Map<String, Object> body;
     protected int pollIntervalMillis;
     protected int retry;
     protected int retryBackoffMultiplierMillis = 
HttpConfig.DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS;
@@ -44,6 +48,13 @@ public class HttpParameter implements Serializable {
     public void buildWithConfig(Config pluginConfig) {
         // set url
         this.setUrl(pluginConfig.getString(HttpConfig.URL.key()));
+        if (pluginConfig.hasPath(HttpConfig.KEEP_PARAMS_AS_FORM.key())) {
+            
this.setKeepParamsAsForm(pluginConfig.getBoolean(HttpConfig.KEEP_PARAMS_AS_FORM.key()));
+        }
+        if 
(pluginConfig.hasPath(HttpConfig.KEEP_PAGE_PARAM_AS_HTTP_PARAM.key())) {
+            this.setKeepPageParamAsHttpParam(
+                    
pluginConfig.getBoolean(HttpConfig.KEEP_PAGE_PARAM_AS_HTTP_PARAM.key()));
+        }
         // set method
         if (pluginConfig.hasPath(HttpConfig.METHOD.key())) {
             HttpRequestMethod httpRequestMethod =
@@ -75,7 +86,15 @@ public class HttpParameter implements Serializable {
         }
         // set body
         if (pluginConfig.hasPath(HttpConfig.BODY.key())) {
-            this.setBody(pluginConfig.getString(HttpConfig.BODY.key()));
+
+            this.setBody(
+                    
ConfigFactory.parseString(pluginConfig.getString(HttpConfig.BODY.key()))
+                            .entrySet().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            Map.Entry::getKey,
+                                            entry -> 
entry.getValue().unwrapped(),
+                                            (v1, v2) -> v2)));
         }
         if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS.key())) {
             
this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS.key()));
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 7345690569..e6b8499292 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -34,6 +34,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
 
+import org.apache.commons.collections4.MapUtils;
+
 import com.jayway.jsonpath.Configuration;
 import com.jayway.jsonpath.JsonPath;
 import com.jayway.jsonpath.Option;
@@ -116,7 +118,8 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
                         this.httpParameter.getMethod().getMethod(),
                         this.httpParameter.getHeaders(),
                         this.httpParameter.getParams(),
-                        this.httpParameter.getBody());
+                        this.httpParameter.getBody(),
+                        this.httpParameter.isKeepParamsAsForm());
         if (response.getCode() >= 200 && response.getCode() <= 207) {
             String content = response.getContent();
             if (!Strings.isNullOrEmpty(content)) {
@@ -146,12 +149,27 @@ public class HttpSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     }
 
     private void updateRequestParam(PageInfo pageInfo) {
-        if (this.httpParameter.getParams() == null) {
-            httpParameter.setParams(new HashMap<>());
+        // keep page param as http param
+        if (this.httpParameter.isKeepPageParamAsHttpParam()) {
+            if (this.httpParameter.getParams() == null) {
+                httpParameter.setParams(new HashMap<>());
+            }
+            this.httpParameter
+                    .getParams()
+                    .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+            return;
+        }
+
+        if (MapUtils.isNotEmpty(this.httpParameter.getParams())
+                && 
this.httpParameter.getParams().containsKey(pageInfo.getPageField())) {
+            this.httpParameter
+                    .getParams()
+                    .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
+        }
+        if (MapUtils.isNotEmpty(this.httpParameter.getBody())
+                && 
this.httpParameter.getBody().containsKey(pageInfo.getPageField())) {
+            this.httpParameter.getBody().put(pageInfo.getPageField(), 
pageInfo.getPageIndex());
         }
-        this.httpParameter
-                .getParams()
-                .put(pageInfo.getPageField(), 
pageInfo.getPageIndex().toString());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
index 6ac7e88258..6841d15943 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java
@@ -40,6 +40,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.myhours.source.exception.MyHour
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 
 @Slf4j
@@ -89,7 +90,10 @@ public class MyHoursSource extends HttpSource {
         try {
             HttpResponse response =
                     loginHttpClient.doPost(
-                            myHoursLoginParameter.getUrl(), 
myHoursLoginParameter.getBody());
+                            myHoursLoginParameter.getUrl(),
+                            Collections.emptyMap(),
+                            Collections.emptyMap(),
+                            myHoursLoginParameter.getBody());
             if (HttpResponse.STATUS_OK == response.getCode()) {
                 String content = response.getContent();
                 if (!Strings.isNullOrEmpty(content)) {
diff --git 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
index 08a802db41..2171c70a29 100644
--- 
a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
+++ 
b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.myhours.source.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
 
@@ -43,15 +42,14 @@ public class MyHoursSourceParameter extends HttpParameter {
         // set method
         this.setMethod(HttpRequestMethod.valueOf(MyHoursSourceConfig.POST));
         // set body
-        Map<String, String> bodyParams = new HashMap<>();
+        Map<String, Object> bodyParams = new HashMap<>();
         String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL.key());
         String password = 
pluginConfig.getString(MyHoursSourceConfig.PASSWORD.key());
         bodyParams.put(MyHoursSourceConfig.GRANT_TYPE, 
MyHoursSourceConfig.PASSWORD.key());
         bodyParams.put(MyHoursSourceConfig.EMAIL.key(), email);
         bodyParams.put(MyHoursSourceConfig.PASSWORD.key(), password);
         bodyParams.put(MyHoursSourceConfig.CLIENT_ID, MyHoursSourceConfig.API);
-        String body = JsonUtils.toJsonString(bodyParams);
-        this.setBody(body);
+        this.setBody(bodyParams);
         this.setRetryParameters(pluginConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
 
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
index 3734ebed49..c89cf72427 100644
--- 
a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java
@@ -195,7 +195,8 @@ public class PrometheusSourceReader extends 
AbstractSingleSplitReader<SeaTunnelR
                         this.httpParameter.getMethod().getMethod(),
                         this.httpParameter.getHeaders(),
                         this.httpParameter.getParams(),
-                        this.httpParameter.getBody());
+                        this.httpParameter.getBody(),
+                        this.httpParameter.isKeepParamsAsForm());
         if (response.getCode() >= 200 && response.getCode() <= 207) {
             String content = response.getContent();
             if (!Strings.isNullOrEmpty(content)) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
index 5e017f3636..6533e2bad8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java
@@ -271,6 +271,11 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     public void testSourceToAssertSink(TestContainer container)
             throws IOException, InterruptedException {
+        // dynamic param for body
+        Container.ExecResult execResult0 =
+                container.executeJob("/http_post_param_json_to_assert.conf");
+        Assertions.assertEquals(0, execResult0.getExitCode());
+
         // normal http
         Container.ExecResult execResult1 = 
container.executeJob("/http_json_to_assert.conf");
         Assertions.assertEquals(0, execResult1.getExitCode());
@@ -325,6 +330,10 @@ public class HttpIT extends TestSuiteBase implements 
TestResource {
                 container.executeJob("/http_formrequestbody_to_assert.conf");
         Assertions.assertEquals(0, execResult13.getExitCode());
 
+        Container.ExecResult execResult20 =
+                container.executeJob("/http_formrequestbody_to_assert2.conf");
+        Assertions.assertEquals(0, execResult20.getExitCode());
+
         // http httpJsonRequestBody
         Container.ExecResult execResult14 =
                 container.executeJob("/http_jsonrequestbody_to_assert.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
index a8793f73b7..f10cffa8f0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
@@ -25,6 +25,7 @@ source {
     plugin_output = "http"
     url = "http://mockserver:1080/example/formBody";
     method = "POST"
+    keep_params_as_form = true
     params ={id = 1}
     format = "json"
     schema = {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf
similarity index 95%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf
index a8793f73b7..218dcf711c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf
@@ -25,7 +25,10 @@ source {
     plugin_output = "http"
     url = "http://mockserver:1080/example/formBody";
     method = "POST"
-    params ={id = 1}
+    headers {
+        Content-Type = "application/x-www-form-urlencoded"
+    }
+    body="{"id":1}"
     format = "json"
     schema = {
       fields {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
index 0ed8f4d8e6..db6ea2edd2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf
@@ -30,6 +30,7 @@ source {
       name = "$.data[*].name"
       age = "$.data[*].age"
     }
+    keep_page_param_as_http_param = true
     pageing = {
       batch_size=10
       page_field = page
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
index c0812b1789..a70dba8779 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf
@@ -30,6 +30,7 @@ source {
       name = "$.data[*].name"
       age = "$.data[*].age"
     }
+    keep_page_param_as_http_param = true
     pageing = {
       total_page_size = 2
       page_field = page
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
index 06282c675e..731cb3ab0a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf
@@ -30,6 +30,7 @@ source {
       name = "$.data[*].name"
       age = "$.data[*].age"
     }
+    keep_page_param_as_http_param = true
     pageing = {
       total_page_size = 2
       page_field = page
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf
similarity index 89%
copy from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
copy to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf
index a8793f73b7..85b254d98f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf
@@ -23,10 +23,15 @@ env {
 source {
   Http {
     plugin_output = "http"
-    url = "http://mockserver:1080/example/formBody";
+    url = "http://mockserver:1080/example/jsonBody/dynamic/param";
     method = "POST"
-    params ={id = 1}
+    body="""{"id":1,"pageIndex":"${pageIndex}"}"""
     format = "json"
+    pageing={
+       page_field = pageIndex
+       start_page_number = 2
+       batch_size = 10
+    }
     schema = {
       fields {
         name = string
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
index 42d000f713..2fc4a26961 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json
@@ -4476,6 +4476,35 @@
       }
     }
   },
+  {
+    "httpRequest": {
+      "method" : "POST",
+      "path": "/example/jsonBody/dynamic/param",
+      "body": {
+        "type": "JSON",
+        "json": {
+          "id": 1,
+          "pageIndex": 2
+        },
+        "matchType": "STRICT"
+      }
+    },
+    "httpResponse": {
+      "body": [
+        {
+          "name": "lzl",
+          "age": 18
+        },
+        {
+          "name": "pizz",
+          "age": 19
+        }
+      ],
+      "headers": {
+        "Content-Type": "application/json"
+      }
+    }
+  },
   {
     "httpRequest": {
       "path": "/example/formBody",
@@ -4745,4 +4774,4 @@
       }
     }
   }
-]
+]
\ No newline at end of file


Reply via email to