This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c66b512 [fix](connector) fix npe when empty partition commit txn
(#286)
c66b512 is described below
commit c66b5122089073d19e4fcd4c14851ef9398bf7ab
Author: gnehil <[email protected]>
AuthorDate: Tue Mar 18 17:34:27 2025 +0800
[fix](connector) fix npe when empty partition commit txn (#286)
---
.../client/write/AbstractStreamLoadProcessor.java | 43 ++++++++++++----------
1 file changed, 23 insertions(+), 20 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 9222d81..9bc3037 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -155,27 +155,30 @@ public abstract class AbstractStreamLoadProcessor<R>
implements DorisWriter<R>,
@Override
public String stop() throws Exception {
- // arrow format need to send all buffer data before stop
- if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
- List<R> rs = new LinkedList<>(recordBuffer);
- recordBuffer.clear();
- output.write(toArrowFormat(rs));
- }
- output.close();
- CloseableHttpResponse res = requestFuture.get();
- if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
- throw new StreamLoadException("stream load execute failed, status:
" + res.getStatusLine().getStatusCode()
- + ", msg: " + res.getStatusLine().getReasonPhrase());
- }
- String resEntity = EntityUtils.toString(new
BufferedHttpEntity(res.getEntity()));
- logger.info("stream load response: {}", resEntity);
- StreamLoadResponse response = MAPPER.readValue(resEntity,
StreamLoadResponse.class);
- if (response != null && response.isSuccess()) {
- createNewBatch = true;
- return isTwoPhaseCommitEnabled ?
String.valueOf(response.getTxnId()) : null;
- } else {
- throw new StreamLoadException("stream load execute failed,
response: " + resEntity);
+ if (requestFuture != null) {
+ // arrow format need to send all buffer data before stop
+ if (!recordBuffer.isEmpty() && "arrow".equalsIgnoreCase(format)) {
+ List<R> rs = new LinkedList<>(recordBuffer);
+ recordBuffer.clear();
+ output.write(toArrowFormat(rs));
+ }
+ output.close();
+ CloseableHttpResponse res = requestFuture.get();
+ if (res.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new StreamLoadException("stream load execute failed,
status: " + res.getStatusLine().getStatusCode()
+ + ", msg: " + res.getStatusLine().getReasonPhrase());
+ }
+ String resEntity = EntityUtils.toString(new
BufferedHttpEntity(res.getEntity()));
+ logger.info("stream load response: {}", resEntity);
+ StreamLoadResponse response = MAPPER.readValue(resEntity,
StreamLoadResponse.class);
+ if (response != null && response.isSuccess()) {
+ createNewBatch = true;
+ return isTwoPhaseCommitEnabled ?
String.valueOf(response.getTxnId()) : null;
+ } else {
+ throw new StreamLoadException("stream load execute failed,
response: " + resEntity);
+ }
}
+ return null;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]