chncaesar opened a new issue, #9442: URL: https://github.com/apache/seatunnel/issues/9442
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened ## Description I was using `PlaceholderReplacement` pageing to fetch HTTP API data , ```text pageing={ page_type="PageNumber" use_placeholder_replacement = true page_field = "pageNo" batch_size = 100 } body: """ { "pageNo": "${pageNo}", "pageSize": "100" } """ ``` However, seatunnel task ran infinitely and always requested for one page. Specifically , seatunnel sent request body ```text body: """ { "pageNo": "1", "pageSize": "100" } ``` Here's the log ```text paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK paging: pageNo=1 pageSize=100 INFO: 127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK ``` it failed to increase the `pageNo`. I have to manually kill the task. ## Expected result Seatunnel would fetch all pages and finish successfully. ## Root cause analysis The root cause lies in `HttpSourceReader`, method `updateRequestParam` and `processBody` In the while loop, the placeholder in request body is substitued to pageValue in the first iteration, however, in the subsequent replacement, the subsituion failed because placeholder is missing, processBodyString changes request body. 中文 每次循环时, processBodyString 直接修改 http request body, 造成 placeholder 只能被替换一次,后续的替换失败。因而在分页查询时,pageNo 不递增,陷入死循环。 ```java Long pageIndex = info.getPageIndex(); while (!noMoreElementFlag) { // increment page info.setPageIndex(pageIndex); // set request param updateRequestParam(info, info.isUsePlaceholderReplacement()); pollAndCollectData(output); pageIndex += 1; Thread.sleep(10); } String processedBody = processBodyString( this.httpParameter.getBody(), pageField, pageValue, usePlaceholderReplacement); private String processBodyString( String bodyString, String pageField, Object pageValue, boolean usePlaceholderReplacement) { if (pageField == null || pageValue == null || Strings.isNullOrEmpty(bodyString)) { return bodyString; } if (usePlaceholderReplacement) { String unquotedPlaceholder = "${" + pageField + "}"; if (bodyString.contains(unquotedPlaceholder)) { bodyString = bodyString.replace(unquotedPlaceholder, pageValue.toString()); } return bodyString; } else { // Key-based replacement Map<String, Object> bodyMap = JsonUtils.parseObject(bodyString, new TypeReference<Map<String, Object>>() {}); if (bodyMap != null) { processBodyMapRecursively(bodyMap, pageField, pageValue); return JsonUtils.toJsonString(bodyMap); } return bodyString; } } ``` ## Reference The python HTTP API from fastapi import FastAPI, HTTPException, Depends, Header, Body from pydantic import BaseModel, Field ```python class PagingRequest(BaseModel): pageNo: int = Field(..., description="当前页码") pageSize: int = Field(..., description="每页记录数") @app.post("/api/items_page", response_model=ItemResponse) async def get_items(paging: PagingRequest = Body(..., description="分页请求")): """ 获取示例数据列表(分页) """ try: # 示例数据 items = [ {"id": i, "name": f"用户{i}"} for i in range(1, 1001) ] print(f"paging: {paging}") # 计算分页信息 total = len(items) total_pages = (total + paging.pageSize - 1) // paging.pageSize # 验证页码 if paging.pageNo > total_pages: return { "code": "200", "msg": "OK", "data": [], "total": total, "pageNo": paging.pageNo, "pageSize": 0, "totalPages": total_pages } # 计算当前页的数据 start_idx = (paging.pageNo - 1) * paging.pageSize end_idx = start_idx + paging.pageSize return { "code": "200", "msg": "OK", "data": items[start_idx:end_idx], "total": total, "pageNo": paging.pageNo, "pageSize": paging.pageSize, "totalPages": total_pages } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) ``` ### SeaTunnel Version Seatunnel 2.3.11 ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = "BATCH" } source { Http { url = "http://localhost:8000/api/items_page" method = "POST" format = "json" headers { "Content-Type" = "application/json" } body: """ { "pageNo": "${pageNo}", "pageSize": "100" } """ json_field { id = "$.data[*].id" name = "$.data[*].name" } schema { fields { id = "string" name = "string" } } pageing={ page_type="PageNumber" use_placeholder_replacement = true page_field = "pageNo" batch_size = 100 } } } sink { Doris { fenodes = "192.168.1.236:8030" username = hello password = "password" database = "test" table = "test_http" sink.label-prefix = "test-doris" sink.enable-delete = "false" doris.config { format = "json" read_json_by_line = "true" } plugin_input = "fake_table" } } ``` ### Running Command ```shell Assuming config file is `http_paging.config` ${SEATUNNEL_HOME}/bin/seatunnel.sh --deploy-mode local -c http_paging.config ``` ### Error Exception ```log The job runs infiniely. ``` ### Zeta or Flink or Spark Version Zeta engine, runns locally. However, the issue should effect spark/flink engine as well. ### Java or Scala Version openJDK 17 ``` openjdk version "17.0.14" 2025-01-21 OpenJDK Runtime Environment OpenLogic-OpenJDK (build 17.0.14+7-adhoc.root.jdk17u) OpenJDK 64-Bit Server VM OpenLogic-OpenJDK (build 17.0.14+7-adhoc.root.jdk17u, mixed mode, sharing) ``` ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
