This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new c1b2675ab0 [Fix][connector-http] fix when post have param (#8434) c1b2675ab0 is described below commit c1b2675ab0826bc90a16766b1811f9f3dc4c2c0c Author: CosmosNi <nijiahui_y...@cmss.chinamobile.com> AuthorDate: Mon Feb 24 10:15:59 2025 +0800 [Fix][connector-http] fix when post have param (#8434) --- docs/en/connector-v2/source/Http.md | 104 +++++++++++++++------ .../seatunnel/http/client/HttpClientProvider.java | 66 +++++++++++-- .../seatunnel/http/config/HttpConfig.java | 11 +++ .../seatunnel/http/config/HttpParameter.java | 23 ++++- .../seatunnel/http/source/HttpSourceReader.java | 30 ++++-- .../seatunnel/myhours/source/MyHoursSource.java | 6 +- .../source/config/MyHoursSourceParameter.java | 6 +- .../prometheus/source/PrometheusSourceReader.java | 3 +- .../seatunnel/e2e/connector/http/HttpIT.java | 9 ++ .../resources/http_formrequestbody_to_assert.conf | 1 + ...t.conf => http_formrequestbody_to_assert2.conf} | 5 +- .../resources/http_page_increase_no_page_num.conf | 1 + .../resources/http_page_increase_page_num.conf | 1 + .../resources/http_page_increase_start_num.conf | 1 + ...rt.conf => http_post_param_json_to_assert.conf} | 9 +- .../src/test/resources/mockserver-config.json | 31 +++++- 16 files changed, 253 insertions(+), 54 deletions(-) diff --git a/docs/en/connector-v2/source/Http.md b/docs/en/connector-v2/source/Http.md index 511ba04132..88b6a67504 100644 --- a/docs/en/connector-v2/source/Http.md +++ b/docs/en/connector-v2/source/Http.md @@ -42,31 +42,33 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description | -|-----------------------------|---------|----------|---------|--------------------------------------------------------------------------------------------------------------------------------------| -| url | String | Yes | - | Http request url. | -| schema | Config | No | - | Http and seatunnel data structure mapping | -| schema.fields | Config | No | - | The schema fields of upstream data | -| json_field | Config | No | - | This parameter helps you configure the schema,so this parameter must be used with schema. | -| pageing | Config | No | - | This parameter is used for paging queries | -| pageing.page_field | String | No | - | This parameter is used to specify the page field name in the request parameter | -| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages | -| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown | -| pageing.start_page_number | Int | No | 1 | Specify the page number from which synchronization starts | -| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. | -| format | String | No | text | The format of upstream data, now only support `json` `text`, default `text`. | -| method | String | No | get | Http request method, only supports GET, POST method. | -| headers | Map | No | - | Http headers. | -| params | Map | No | - | Http params,the program will automatically add http header application/x-www-form-urlencoded. | -| body | String | No | - | Http body,the program will automatically add http header application/json,body is jsonbody. | -| poll_interval_millis | Int | No | - | Request http api interval(millis) in stream mode. | -| retry | Int | No | - | The max retry times if request http return to `IOException`. | -| retry_backoff_multiplier_ms | Int | No | 100 | The retry-backoff times(millis) multiplier if request http failed. | -| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed | -| enable_multi_lines | Boolean | No | false | | -| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. | -| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. | -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | +| Name | Type | Required | Default | Description | +|-----------------------------|---------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | Http request url. | +| schema | Config | No | - | Http and seatunnel data structure mapping | +| schema.fields | Config | No | - | The schema fields of upstream data | +| json_field | Config | No | - | This parameter helps you configure the schema,so this parameter must be used with schema. | +| pageing | Config | No | - | This parameter is used for paging queries | +| pageing.page_field | String | No | - | This parameter is used to specify the page field name in the request parameter | +| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages | +| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown | +| pageing.start_page_number | Int | No | 1 | Specify the page number from which synchronization starts | +| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. | +| format | String | No | text | The format of upstream data, now only support `json` `text`, default `text`. | +| method | String | No | get | Http request method, only supports GET, POST method. | +| headers | Map | No | - | Http headers. | +| params | Map | No | - | Http params. | +| body | String | No | - | Http body,the program will automatically add http header application/json,body is jsonbody. | +| poll_interval_millis | Int | No | - | Request http api interval(millis) in stream mode. | +| retry | Int | No | - | The max retry times if request http return to `IOException`. | +| retry_backoff_multiplier_ms | Int | No | 100 | The retry-backoff times(millis) multiplier if request http failed. | +| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed | +| enable_multi_lines | Boolean | No | false | | +| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. | +| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | +| keep_params_as_form | Boolean | No | false | Whether the params are submitted according to the form, used for compatibility with legacy behaviors. When true, the value of the params parameter is submitted through the form. | +| keep_page_param_as_http_param | Boolean | No | false | Whether to set the paging parameters to params. For compatibility with legacy behaviors. | ## How to Create a Http Data Synchronization Jobs @@ -181,6 +183,46 @@ connector will generate data as the following: |----------------------------------------------------------| | {"code": 200, "data": "get success", "success": true} | +### keep_params_as_form +For compatibility with old versions of http. +When set to true,`<params>` and `<pageing>` will be submitted in the form. +When set to false,`<params>` will be added to the url path,and `<pageing>` will not be added to the body or form. It will replace placeholders in params and body. + +### keep_page_param_as_http_param +Whether to set the paging parameters to params. +When set to true,`<pageing>` is set to `<params>`. +When set to false,When the page field exists in `<body>` or `<params>`, replace value. + +When set to false,config example: +```hocon +body="""{"id":1,"page":"${page}"}""" +``` + +```hocon +params={ + page: "${page}" +} +``` + +### params +By default, the parameters will be added to the url path. +If you need to keep the old version behavior, please check keep_params_as_form. + +### body +The HTTP body is used to carry the actual data in requests or responses, including JSON, form submissions. + +The reference format is as follows: +```hocon +body="{"id":1,"name":"setunnel"}" +``` + +For form submissions,please set the content-type as follows. +```hocon +headers { + Content-Type = "application/x-www-form-urlencoded" +} +``` + ### content_json This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. @@ -318,17 +360,21 @@ source { - See this link for task configuration [http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf). ### pageing +When you need to concatenate page param in the URL,then add params. + +When you need to set page param to the body,add the key of page param in body. ```hocon source { Http { url = "http://localhost:8080/mock/queryData" - method = "GET" + method = "POST" format = "json" + body="""{"id":1,"page":"${page}"}""" + content_field = "$.data.*" params={ page: "${page}" } - content_field = "$.data.*" pageing={ total_page_size=20 page_field=page @@ -344,6 +390,7 @@ source { } } + ``` ## Changelog @@ -354,5 +401,4 @@ source { ### new version -- [Feature][Connector-V2][HTTP] Use json-path parsing ([3510](https://github.com/apache/seatunnel/pull/3510)) - +- [Feature][Connector-V2][HTTP] Use json-path parsing ([3510](https://github.com/apache/seatunnel/pull/3510)) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java index cbea79a15a..1fb4a54cad 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java @@ -17,8 +17,10 @@ package org.apache.seatunnel.connectors.seatunnel.http.client; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.http.HttpStatus; @@ -67,6 +69,7 @@ import java.util.concurrent.TimeUnit; public class HttpClientProvider implements AutoCloseable { private static final String ENCODING = "UTF-8"; private static final String APPLICATION_JSON = "application/json"; + private static final String APPLICATION_FORM = "application/x-www-form-urlencoded"; private static final int INITIAL_CAPACITY = 16; private RequestConfig requestConfig; private final CloseableHttpClient httpClient; @@ -115,11 +118,26 @@ public class HttpClientProvider implements AutoCloseable { String method, Map<String, String> headers, Map<String, String> params, - String body) + Map<String, Object> body, + boolean keepParamsAsForm) throws Exception { // convert method option to uppercase method = method.toUpperCase(Locale.ROOT); + // Keep the original post logic + if (HttpPost.METHOD_NAME.equals(method) && keepParamsAsForm) { + // Compatible with old versions + if (MapUtils.isNotEmpty(params)) { + headers = MapUtils.isEmpty(headers) ? new HashMap<>() : headers; + headers.putIfAbsent(HTTP.CONTENT_TYPE, APPLICATION_FORM); + } + if (MapUtils.isEmpty(body)) { + body = new HashMap<>(); + } + body.putAll(params); + return doPost(url, headers, Collections.emptyMap(), body); + } if (HttpPost.METHOD_NAME.equals(method)) { + // Create access address return doPost(url, headers, params, body); } if (HttpGet.METHOD_NAME.equals(method)) { @@ -292,7 +310,6 @@ public class HttpClientProvider implements AutoCloseable { /** * Send a post request with request headers , request parameters and request body * - * @param url request address * @param headers request header map * @param params request parameter map * @param body request body @@ -300,16 +317,19 @@ public class HttpClientProvider implements AutoCloseable { * @throws Exception information */ public HttpResponse doPost( - String url, Map<String, String> headers, Map<String, String> params, String body) + String url, + Map<String, String> headers, + Map<String, String> params, + Map<String, Object> body) throws Exception { - // create a new http get - HttpPost httpPost = new HttpPost(url); + URIBuilder uriBuilder = new URIBuilder(url); + // add parameter to uri + addParameters(uriBuilder, params); + HttpPost httpPost = new HttpPost(uriBuilder.build()); // set default request config httpPost.setConfig(requestConfig); // set request header addHeaders(httpPost, headers); - // set request params - addParameters(httpPost, params); // add body in request addBody(httpPost, body); // return http response @@ -429,6 +449,38 @@ public class HttpClientProvider implements AutoCloseable { headers.forEach(request::addHeader); } + private void addBody(HttpEntityEnclosingRequestBase request, Map<String, Object> body) + throws UnsupportedEncodingException { + if (MapUtils.isEmpty(body)) { + body = new HashMap<>(); + } + boolean isFormSubmit = + request.getHeaders(HTTP.CONTENT_TYPE) != null + && request.getHeaders(HTTP.CONTENT_TYPE).length > 0 + && APPLICATION_FORM.equalsIgnoreCase( + request.getHeaders(HTTP.CONTENT_TYPE)[0].getValue()); + if (isFormSubmit) { + if (MapUtils.isNotEmpty(body)) { + List<NameValuePair> parameters = new ArrayList<>(); + Set<Map.Entry<String, Object>> entrySet = body.entrySet(); + for (Map.Entry<String, Object> e : entrySet) { + String name = e.getKey(); + String value = e.getValue().toString(); + NameValuePair pair = new BasicNameValuePair(name, value); + parameters.add(pair); + } + // Set to the request's http object + request.setEntity(new UrlEncodedFormEntity(parameters, ENCODING)); + } + } else { + request.addHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON); + StringEntity entity = + new StringEntity(JsonUtils.toJsonString(body), ContentType.APPLICATION_JSON); + entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, APPLICATION_JSON)); + request.setEntity(entity); + } + } + private boolean checkAlreadyHaveContentType(HttpEntityEnclosingRequestBase request) { if (request.getEntity() != null && request.getEntity().getContentType() != null) { return HTTP.CONTENT_TYPE.equals(request.getEntity().getContentType().getName()); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java index 489b8d124b..745534483f 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpConfig.java @@ -33,6 +33,17 @@ public class HttpConfig { public static final int DEFAULT_SOCKET_TIMEOUT_MS = 6000 * 10; public static final Option<String> URL = Options.key("url").stringType().noDefaultValue().withDescription("Http request url"); + public static final Option<Boolean> KEEP_PARAMS_AS_FORM = + Options.key("keep_params_as_form") + .booleanType() + .defaultValue(false) + .withDescription("Keep param as form"); + public static final Option<Boolean> KEEP_PAGE_PARAM_AS_HTTP_PARAM = + Options.key("keep_page_param_as_http_param") + .booleanType() + .defaultValue(false) + .withDescription("keep page param as http param"); + public static final Option<Long> TOTAL_PAGE_SIZE = Options.key("total_page_size") .longType() diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java index e4dffb539a..ca999ad8c7 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.http.config; import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import lombok.Data; @@ -32,7 +33,10 @@ public class HttpParameter implements Serializable { protected HttpRequestMethod method; protected Map<String, String> headers; protected Map<String, String> params; - protected String body; + protected Map<String, Object> pageParams; + protected boolean keepParamsAsForm = false; + protected boolean keepPageParamAsHttpParam = false; + protected Map<String, Object> body; protected int pollIntervalMillis; protected int retry; protected int retryBackoffMultiplierMillis = HttpConfig.DEFAULT_RETRY_BACKOFF_MULTIPLIER_MS; @@ -44,6 +48,13 @@ public class HttpParameter implements Serializable { public void buildWithConfig(Config pluginConfig) { // set url this.setUrl(pluginConfig.getString(HttpConfig.URL.key())); + if (pluginConfig.hasPath(HttpConfig.KEEP_PARAMS_AS_FORM.key())) { + this.setKeepParamsAsForm(pluginConfig.getBoolean(HttpConfig.KEEP_PARAMS_AS_FORM.key())); + } + if (pluginConfig.hasPath(HttpConfig.KEEP_PAGE_PARAM_AS_HTTP_PARAM.key())) { + this.setKeepPageParamAsHttpParam( + pluginConfig.getBoolean(HttpConfig.KEEP_PAGE_PARAM_AS_HTTP_PARAM.key())); + } // set method if (pluginConfig.hasPath(HttpConfig.METHOD.key())) { HttpRequestMethod httpRequestMethod = @@ -75,7 +86,15 @@ public class HttpParameter implements Serializable { } // set body if (pluginConfig.hasPath(HttpConfig.BODY.key())) { - this.setBody(pluginConfig.getString(HttpConfig.BODY.key())); + + this.setBody( + ConfigFactory.parseString(pluginConfig.getString(HttpConfig.BODY.key())) + .entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().unwrapped(), + (v1, v2) -> v2))); } if (pluginConfig.hasPath(HttpConfig.POLL_INTERVAL_MILLS.key())) { this.setPollIntervalMillis(pluginConfig.getInt(HttpConfig.POLL_INTERVAL_MILLS.key())); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index 7345690569..e6b8499292 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -34,6 +34,8 @@ import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; +import org.apache.commons.collections4.MapUtils; + import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; @@ -116,7 +118,8 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> { this.httpParameter.getMethod().getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams(), - this.httpParameter.getBody()); + this.httpParameter.getBody(), + this.httpParameter.isKeepParamsAsForm()); if (response.getCode() >= 200 && response.getCode() <= 207) { String content = response.getContent(); if (!Strings.isNullOrEmpty(content)) { @@ -146,12 +149,27 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> { } private void updateRequestParam(PageInfo pageInfo) { - if (this.httpParameter.getParams() == null) { - httpParameter.setParams(new HashMap<>()); + // keep page param as http param + if (this.httpParameter.isKeepPageParamAsHttpParam()) { + if (this.httpParameter.getParams() == null) { + httpParameter.setParams(new HashMap<>()); + } + this.httpParameter + .getParams() + .put(pageInfo.getPageField(), pageInfo.getPageIndex().toString()); + return; + } + + if (MapUtils.isNotEmpty(this.httpParameter.getParams()) + && this.httpParameter.getParams().containsKey(pageInfo.getPageField())) { + this.httpParameter + .getParams() + .put(pageInfo.getPageField(), pageInfo.getPageIndex().toString()); + } + if (MapUtils.isNotEmpty(this.httpParameter.getBody()) + && this.httpParameter.getBody().containsKey(pageInfo.getPageField())) { + this.httpParameter.getBody().put(pageInfo.getPageField(), pageInfo.getPageIndex()); } - this.httpParameter - .getParams() - .put(pageInfo.getPageField(), pageInfo.getPageIndex().toString()); } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java index 6ac7e88258..6841d15943 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -40,6 +40,7 @@ import org.apache.seatunnel.connectors.seatunnel.myhours.source.exception.MyHour import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.util.Collections; import java.util.Map; @Slf4j @@ -89,7 +90,10 @@ public class MyHoursSource extends HttpSource { try { HttpResponse response = loginHttpClient.doPost( - myHoursLoginParameter.getUrl(), myHoursLoginParameter.getBody()); + myHoursLoginParameter.getUrl(), + Collections.emptyMap(), + Collections.emptyMap(), + myHoursLoginParameter.getBody()); if (HttpResponse.STATUS_OK == response.getCode()) { String content = response.getContent(); if (!Strings.isNullOrEmpty(content)) { diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java index 08a802db41..2171c70a29 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java @@ -19,7 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; import org.apache.seatunnel.shade.com.typesafe.config.Config; -import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod; @@ -43,15 +42,14 @@ public class MyHoursSourceParameter extends HttpParameter { // set method this.setMethod(HttpRequestMethod.valueOf(MyHoursSourceConfig.POST)); // set body - Map<String, String> bodyParams = new HashMap<>(); + Map<String, Object> bodyParams = new HashMap<>(); String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL.key()); String password = pluginConfig.getString(MyHoursSourceConfig.PASSWORD.key()); bodyParams.put(MyHoursSourceConfig.GRANT_TYPE, MyHoursSourceConfig.PASSWORD.key()); bodyParams.put(MyHoursSourceConfig.EMAIL.key(), email); bodyParams.put(MyHoursSourceConfig.PASSWORD.key(), password); bodyParams.put(MyHoursSourceConfig.CLIENT_ID, MyHoursSourceConfig.API); - String body = JsonUtils.toJsonString(bodyParams); - this.setBody(body); + this.setBody(bodyParams); this.setRetryParameters(pluginConfig); } } diff --git a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java index 3734ebed49..c89cf72427 100644 --- a/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java +++ b/seatunnel-connectors-v2/connector-prometheus/src/main/java/org/apache/seatunnel/connectors/seatunnel/prometheus/source/PrometheusSourceReader.java @@ -195,7 +195,8 @@ public class PrometheusSourceReader extends AbstractSingleSplitReader<SeaTunnelR this.httpParameter.getMethod().getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams(), - this.httpParameter.getBody()); + this.httpParameter.getBody(), + this.httpParameter.isKeepParamsAsForm()); if (response.getCode() >= 200 && response.getCode() <= 207) { String content = response.getContent(); if (!Strings.isNullOrEmpty(content)) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java index 5e017f3636..6533e2bad8 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java @@ -271,6 +271,11 @@ public class HttpIT extends TestSuiteBase implements TestResource { @TestTemplate public void testSourceToAssertSink(TestContainer container) throws IOException, InterruptedException { + // dynamic param for body + Container.ExecResult execResult0 = + container.executeJob("/http_post_param_json_to_assert.conf"); + Assertions.assertEquals(0, execResult0.getExitCode()); + // normal http Container.ExecResult execResult1 = container.executeJob("/http_json_to_assert.conf"); Assertions.assertEquals(0, execResult1.getExitCode()); @@ -325,6 +330,10 @@ public class HttpIT extends TestSuiteBase implements TestResource { container.executeJob("/http_formrequestbody_to_assert.conf"); Assertions.assertEquals(0, execResult13.getExitCode()); + Container.ExecResult execResult20 = + container.executeJob("/http_formrequestbody_to_assert2.conf"); + Assertions.assertEquals(0, execResult20.getExitCode()); + // http httpJsonRequestBody Container.ExecResult execResult14 = container.executeJob("/http_jsonrequestbody_to_assert.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf index a8793f73b7..f10cffa8f0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf @@ -25,6 +25,7 @@ source { plugin_output = "http" url = "http://mockserver:1080/example/formBody" method = "POST" + keep_params_as_form = true params ={id = 1} format = "json" schema = { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf similarity index 95% copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf index a8793f73b7..218dcf711c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert2.conf @@ -25,7 +25,10 @@ source { plugin_output = "http" url = "http://mockserver:1080/example/formBody" method = "POST" - params ={id = 1} + headers { + Content-Type = "application/x-www-form-urlencoded" + } + body="{"id":1}" format = "json" schema = { fields { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf index 0ed8f4d8e6..db6ea2edd2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf @@ -30,6 +30,7 @@ source { name = "$.data[*].name" age = "$.data[*].age" } + keep_page_param_as_http_param = true pageing = { batch_size=10 page_field = page diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf index c0812b1789..a70dba8779 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf @@ -30,6 +30,7 @@ source { name = "$.data[*].name" age = "$.data[*].age" } + keep_page_param_as_http_param = true pageing = { total_page_size = 2 page_field = page diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf index 06282c675e..731cb3ab0a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_start_num.conf @@ -30,6 +30,7 @@ source { name = "$.data[*].name" age = "$.data[*].age" } + keep_page_param_as_http_param = true pageing = { total_page_size = 2 page_field = page diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf similarity index 89% copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf index a8793f73b7..85b254d98f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_formrequestbody_to_assert.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_post_param_json_to_assert.conf @@ -23,10 +23,15 @@ env { source { Http { plugin_output = "http" - url = "http://mockserver:1080/example/formBody" + url = "http://mockserver:1080/example/jsonBody/dynamic/param" method = "POST" - params ={id = 1} + body="""{"id":1,"pageIndex":"${pageIndex}"}""" format = "json" + pageing={ + page_field = pageIndex + start_page_number = 2 + batch_size = 10 + } schema = { fields { name = string diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json index 42d000f713..2fc4a26961 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json @@ -4476,6 +4476,35 @@ } } }, + { + "httpRequest": { + "method" : "POST", + "path": "/example/jsonBody/dynamic/param", + "body": { + "type": "JSON", + "json": { + "id": 1, + "pageIndex": 2 + }, + "matchType": "STRICT" + } + }, + "httpResponse": { + "body": [ + { + "name": "lzl", + "age": 18 + }, + { + "name": "pizz", + "age": 19 + } + ], + "headers": { + "Content-Type": "application/json" + } + } + }, { "httpRequest": { "path": "/example/formBody", @@ -4745,4 +4774,4 @@ } } } -] +] \ No newline at end of file