Alberne opened a new issue, #9474: URL: https://github.com/apache/seatunnel/issues/9474
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened the Http source plugin currently supports Page Number Pagination& Cursor-Based Pagination. In the following code, an NPE (NullPointerException) will occur when the pageField is null, The specific code snippet is: map.containsKey(pageField). ``` private void processPageMap( Map<String, String> map, String pageField, String pageValue, boolean usePlaceholderReplacement) { if (usePlaceholderReplacement) { // Placeholder replacement Map<String, String> updatedMap = new HashMap<>(); for (Map.Entry<String, String> entry : map.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); String replacedValue = replacePlaceholder(value, pageField, pageValue); if (replacedValue != null) { updatedMap.put(key, replacedValue); } } map.putAll(updatedMap); // NPE (NullPointerException) will occur when the pageField is null, } else if (map.containsKey(pageField)) { // Key-based replacement map.put(pageField, pageValue); } } ``` the processPageMap function is called in the following scenarios, only in updateRequestParam function : ``` // Process headers if (MapUtils.isNotEmpty(this.httpParameter.getHeaders())) { processPageMap( this.httpParameter.getHeaders(), pageField, pageValue.toString(), usePlaceholderReplacement); processPageMap( this.httpParameter.getHeaders(), pageInfo.getPageCursorFieldName(), pageInfo.getCursor(), usePlaceholderReplacement); } // if not set keepPageParamAsHttpParam, but page field is in params, then set page index as if (MapUtils.isNotEmpty(this.httpParameter.getParams())) { processPageMap( this.httpParameter.getParams(), pageField, pageValue.toString(), usePlaceholderReplacement); processPageMap( this.httpParameter.getParams(), pageInfo.getPageCursorFieldName(), pageInfo.getCursor(), usePlaceholderReplacement); } ```  ### SeaTunnel Version 2.3.11 ### SeaTunnel Config ```conf env { parallelism = 1 job.mode = "BATCH" } source { Http { plugin_output = "http" url = "http://127.0.0.1:5000/api/cursor_data" method = "GET" format = "json" content_field = "$.data.*" keep_page_param_as_http_param = true pageing = { page_type="Cursor" cursor_field ="cursor" cursor_response_field="$.paging.cursors.next" } schema = { fields { content=string id=int name=string } } json_field = { content = "$.data[*].content" id = "$.data[*].id" name = "$.data[*].name" } } } # Console printing of the read Http data sink { Console { parallelism = 1 } } ``` ### Running Command ```shell Assuming config file is `http_paging.config` ${SEATUNNEL_HOME}/bin/seatunnel.sh --deploy-mode local -c http_paging.config ``` ### Error Exception ```log The updateRequestParam function, when called within a try...finally block’s exception handling logic, does not throw explicit exceptions. @Override public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception { try { if (pageInfoOptional.isPresent()) { noMoreElementFlag = false; PageInfo info = pageInfoOptional.get(); // cursor pagination if (HttpPaginationType.CURSOR.getCode().equals(info.getPageType())) { while (!noMoreElementFlag) { updateRequestParam(info, info.isUsePlaceholderReplacement()); pollAndCollectData(output); Thread.sleep(10); } } else { // default page number pagination Long pageIndex = info.getPageIndex(); while (!noMoreElementFlag) { // increment page info.setPageIndex(pageIndex); // set request param updateRequestParam(info, info.isUsePlaceholderReplacement()); pollAndCollectData(output); pageIndex += 1; Thread.sleep(10); } } } else { pollAndCollectData(output); } } finally { if (Boundedness.BOUNDED.equals(context.getBoundedness()) && noMoreElementFlag) { // signal to the source that we have reached the end of the data. log.info("Closed the bounded http source"); context.signalNoMoreElement(); } else { if (httpParameter.getPollIntervalMillis() > 0) { Thread.sleep(httpParameter.getPollIntervalMillis()); } } } } ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
