chncaesar opened a new issue, #9442:
URL: https://github.com/apache/seatunnel/issues/9442

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   ## Description
   I was using `PlaceholderReplacement` pageing to fetch HTTP  API data ,
   ```text
       pageing={
          page_type="PageNumber"
          use_placeholder_replacement = true 
          page_field = "pageNo"
          batch_size = 100
       }
   
       body: """ {
         "pageNo": "${pageNo}",
         "pageSize": "100"
       } """
   ```
   
   However, seatunnel task ran infinitely and always requested for one page. 
Specifically , seatunnel  sent  request body
   ```text
   body: """ {
         "pageNo": "1",
         "pageSize": "100"
   }
   ```
   Here's the log
   ```text
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   paging: pageNo=1 pageSize=100
   INFO:     127.0.0.1:47260 - "POST /api/items_page HTTP/1.1" 200 OK
   ```
   it failed to increase the `pageNo`. I have to manually kill the task.
   
   
   ## Expected result
   Seatunnel would fetch all pages and finish successfully.
   
   ## Root cause analysis
   The root cause lies in `HttpSourceReader`, method `updateRequestParam` and 
`processBody`
   In  the while loop,  the placeholder in request body is substitued to 
pageValue in the first iteration, however, in the subsequent replacement, the 
subsituion failed because placeholder is missing, processBodyString changes 
request body. 
   
   中文
   每次循环时, processBodyString 直接修改 http request body, 造成 placeholder 
只能被替换一次,后续的替换失败。因而在分页查询时,pageNo 不递增,陷入死循环。
   
   ```java
               Long pageIndex = info.getPageIndex();
               while (!noMoreElementFlag) {
                   // increment page
                   info.setPageIndex(pageIndex);
                   // set request param
                   updateRequestParam(info, info.isUsePlaceholderReplacement());
                   pollAndCollectData(output);
                   pageIndex += 1;
                   Thread.sleep(10);
               }
   
               String processedBody =
                       processBodyString(
                               this.httpParameter.getBody(),
                               pageField,
                               pageValue,
                               usePlaceholderReplacement);
   
   private String processBodyString(
               String bodyString,
               String pageField,
               Object pageValue,
               boolean usePlaceholderReplacement) {
           if (pageField == null || pageValue == null || 
Strings.isNullOrEmpty(bodyString)) {
               return bodyString;
           }
           if (usePlaceholderReplacement) {
               String unquotedPlaceholder = "${" + pageField + "}";
               if (bodyString.contains(unquotedPlaceholder)) {
                   bodyString = bodyString.replace(unquotedPlaceholder, 
pageValue.toString());
               }
   
               return bodyString;
           } else {
               // Key-based replacement
               Map<String, Object> bodyMap =
                       JsonUtils.parseObject(bodyString, new 
TypeReference<Map<String, Object>>() {});
               if (bodyMap != null) {
                   processBodyMapRecursively(bodyMap, pageField, pageValue);
                   return JsonUtils.toJsonString(bodyMap);
               }
               return bodyString;
           }
       }
   ```
   
   ## Reference
   The python HTTP API
   from fastapi import FastAPI, HTTPException, Depends, Header, Body
   from pydantic import BaseModel, Field
   
   ```python
   class PagingRequest(BaseModel):
       pageNo: int = Field(..., description="当前页码")
       pageSize: int = Field(..., description="每页记录数")
   
   @app.post("/api/items_page", response_model=ItemResponse)
   async def get_items(paging: PagingRequest = Body(..., description="分页请求")):
       """
       获取示例数据列表(分页)
       """
       try:
           # 示例数据
           items = [
               {"id": i, "name": f"用户{i}"} for i in range(1, 1001)
           ]
           print(f"paging: {paging}")
           # 计算分页信息
           total = len(items)
           total_pages = (total + paging.pageSize - 1) // paging.pageSize
           
           # 验证页码
           if paging.pageNo > total_pages:
               return {
               "code": "200",
               "msg": "OK",
               "data": [],
               "total": total,
               "pageNo": paging.pageNo,
               "pageSize": 0,
               "totalPages": total_pages
           }
   
           # 计算当前页的数据
           start_idx = (paging.pageNo - 1) * paging.pageSize
           end_idx = start_idx + paging.pageSize
           
           return {
               "code": "200",
               "msg": "OK",
               "data": items[start_idx:end_idx],
               "total": total,
               "pageNo": paging.pageNo,
               "pageSize": paging.pageSize,
               "totalPages": total_pages
           }
       except Exception as e:
           raise HTTPException(status_code=500, detail=str(e))
   ```
   
   ### SeaTunnel Version
   
   Seatunnel 2.3.11
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 1
     job.mode = "BATCH"
   }
   
   source {
     Http {    
       url = "http://localhost:8000/api/items_page";
       method = "POST"
       format = "json"
       headers {
         "Content-Type" = "application/json"
       }
       body: """ {
         "pageNo": "${pageNo}",
         "pageSize": "100"
       } """
       json_field {
         id = "$.data[*].id"
         name = "$.data[*].name"
       }
       schema {
         fields {
           id = "string"
           name = "string"
         }
       }
       pageing={
          page_type="PageNumber"
          use_placeholder_replacement = true 
          page_field = "pageNo"
          batch_size = 100
       }
     }
   }
   
   
   sink {
     Doris {
       fenodes = "192.168.1.236:8030"
       username = hello
       password = "password"
       database = "test"
       table = "test_http"
       sink.label-prefix = "test-doris"
       sink.enable-delete = "false"
       doris.config {
         format = "json"
         read_json_by_line = "true"
       }
       plugin_input = "fake_table"
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   Assuming config file is `http_paging.config`
   
   ${SEATUNNEL_HOME}/bin/seatunnel.sh --deploy-mode local -c http_paging.config
   ```
   
   ### Error Exception
   
   ```log
   The job runs infiniely.
   ```
   
   ### Zeta or Flink or Spark Version
   
   Zeta engine, runns locally. However, the issue should effect spark/flink 
engine as well.
   
   ### Java or Scala Version
   
   
   openJDK 17
   ```
   openjdk version "17.0.14" 2025-01-21
   OpenJDK Runtime Environment OpenLogic-OpenJDK (build 
17.0.14+7-adhoc.root.jdk17u)
   OpenJDK 64-Bit Server VM OpenLogic-OpenJDK (build 
17.0.14+7-adhoc.root.jdk17u, mixed mode, sharing)
   ```
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to