gnehil commented on code in PR #285:
URL:
https://github.com/apache/doris-spark-connector/pull/285#discussion_r2014193959
##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -143,14 +142,26 @@ public AbstractStreamLoadProcessor(DorisConfig config)
throws Exception {
public void load(R row) throws Exception {
if (createNewBatch) {
if (autoRedirect) {
- requestFuture = frontend.requestFrontends((frontEnd,
httpClient) ->
- buildReqAndExec(frontEnd.getHost(),
frontEnd.getHttpPort(), httpClient));
+ requestFuture = frontend.requestFrontends((frontEnd,
httpClient) -> {
+ if (isTwoPhaseCommitEnabled && frontEnd.getHttpPort() <=
0) {
+ throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_FENODES.getName()
+ + "] is not in correct format when ["
+ + DorisOptions.DORIS_SINK_ENABLE_2PC.getName()
+ " = true"
+ + "], for example: host:port[,host2:port]");
+ }
+ return buildReqAndExec(frontEnd.getHost(),
frontEnd.getHttpPort(), httpClient);
+ });
} else {
requestFuture = backendHttpClient.executeReq((backend,
httpClient) ->
buildReqAndExec(backend.getHost(),
backend.getHttpPort(), httpClient));
}
createNewBatch = false;
}
+ if (isFirstRecordOfBatch) {
+ isFirstRecordOfBatch = false;
+ } else {
+ output.write(lineDelimiter);
Review Comment:
Only CSV and JSON formats require line delimiter, writing line delimiter in
ARROW format will cause parsing exceptions.
##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -183,66 +195,94 @@ public String stop() throws Exception {
return null;
}
+ private void execCommitReq(String host, int port, String msg,
CloseableHttpClient httpClient) {
+ HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database,
isHttpsEnabled));
+ try {
+ handleCommitHeaders(httpPut, msg);
+ } catch (OptionRequiredException e) {
+ throw new RuntimeException("stream load handle commit props
failed", e);
+ }
+ try {
+ CloseableHttpResponse response = httpClient.execute(httpPut);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new RuntimeException("commit transaction failed,
transaction: " + msg + ", status: "
+ + response.getStatusLine().getStatusCode() + ",
reason: " + response.getStatusLine()
+ .getReasonPhrase());
+ } else {
+ String resEntity = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity()));
+ this.logger.info("commit: {} response: {}", msg, resEntity);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("commit transaction failed,
transaction: " + msg, e);
+ }
+ }
+
@Override
public void commit(String msg) throws Exception {
if (isTwoPhaseCommitEnabled) {
logger.info("begin to commit transaction {}", msg);
- frontend.requestFrontends((frontEnd, httpClient) -> {
- HttpPut httpPut = new
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(),
database, isHttpsEnabled));
- try {
- handleCommitHeaders(httpPut, msg);
- } catch (OptionRequiredException e) {
- throw new RuntimeException("stream load handle commit
props failed", e);
- }
- try {
- CloseableHttpResponse response =
httpClient.execute(httpPut);
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new RuntimeException("commit transaction failed,
transaction: " + msg
- + ", status: " +
response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- } catch (IOException e) {
- throw new RuntimeException("commit transaction failed,
transaction: " + msg, e);
- }
- return null;
- });
+ if (autoRedirect) {
+ frontend.requestFrontends((frontEnd, httpClient) -> {
+ execCommitReq(frontEnd.getHost(), frontEnd.getHttpPort(),
msg, httpClient);
+ return null;
+ });
+ } else {
+ backendHttpClient.executeReq((backend, httpClient) -> {
+ execCommitReq(backend.getHost(), backend.getHttpPort(),
msg, httpClient);
+ return null;
+ });
+ }
logger.info("success to commit transaction {}", msg);
}
}
+ private void execAbortReq(String host, int port, String msg,
CloseableHttpClient httpClient) {
+ HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database,
isHttpsEnabled));
+ try {
+ handleAbortHeaders(httpPut, msg);
+ } catch (OptionRequiredException e) {
+ throw new RuntimeException("stream load handle abort props
failed", e);
+ }
+ try {
+ CloseableHttpResponse response = httpClient.execute(httpPut);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new RuntimeException("abort transaction failed,
transaction: " + msg + ", status: "
+ + response.getStatusLine().getStatusCode() + ",
reason: " + response.getStatusLine()
+ .getReasonPhrase());
+ } else {
+ String resEntity = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity()));
+ this.logger.info("abort: {} response: {}", msg, resEntity);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("abort transaction failed, transaction:
" + msg, e);
+ }
+ }
+
@Override
public void abort(String msg) throws Exception {
if (isTwoPhaseCommitEnabled) {
logger.info("begin to abort transaction {}", msg);
- frontend.requestFrontends((frontEnd, httpClient) -> {
- HttpPut httpPut = new
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(),
database, isHttpsEnabled));
- try {
- handleAbortHeaders(httpPut, msg);
- } catch (OptionRequiredException e) {
- throw new RuntimeException("stream load handle abort props
failed", e);
- }
- try {
- CloseableHttpResponse response =
httpClient.execute(httpPut);
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new RuntimeException("abort transaction failed,
transaction: " + msg
- + ", status: " +
response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- } catch (IOException e) {
- throw new RuntimeException("abort transaction failed,
transaction: " + msg, e);
- }
- return null; // Returning null as the callback does not return
anything
- });
+ if (autoRedirect) {
Review Comment:
Same as commit part.
##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -183,66 +195,94 @@ public String stop() throws Exception {
return null;
}
+ private void execCommitReq(String host, int port, String msg,
CloseableHttpClient httpClient) {
+ HttpPut httpPut = new HttpPut(URLs.streamLoad2PC(host, port, database,
isHttpsEnabled));
+ try {
+ handleCommitHeaders(httpPut, msg);
+ } catch (OptionRequiredException e) {
+ throw new RuntimeException("stream load handle commit props
failed", e);
+ }
+ try {
+ CloseableHttpResponse response = httpClient.execute(httpPut);
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new RuntimeException("commit transaction failed,
transaction: " + msg + ", status: "
+ + response.getStatusLine().getStatusCode() + ",
reason: " + response.getStatusLine()
+ .getReasonPhrase());
+ } else {
+ String resEntity = EntityUtils.toString(new
BufferedHttpEntity(response.getEntity()));
+ this.logger.info("commit: {} response: {}", msg, resEntity);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("commit transaction failed,
transaction: " + msg, e);
+ }
+ }
+
@Override
public void commit(String msg) throws Exception {
if (isTwoPhaseCommitEnabled) {
logger.info("begin to commit transaction {}", msg);
- frontend.requestFrontends((frontEnd, httpClient) -> {
- HttpPut httpPut = new
HttpPut(URLs.streamLoad2PC(frontEnd.getHost(), frontEnd.getHttpPort(),
database, isHttpsEnabled));
- try {
- handleCommitHeaders(httpPut, msg);
- } catch (OptionRequiredException e) {
- throw new RuntimeException("stream load handle commit
props failed", e);
- }
- try {
- CloseableHttpResponse response =
httpClient.execute(httpPut);
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new RuntimeException("commit transaction failed,
transaction: " + msg
- + ", status: " +
response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- } catch (IOException e) {
- throw new RuntimeException("commit transaction failed,
transaction: " + msg, e);
- }
- return null;
- });
+ if (autoRedirect) {
Review Comment:
It is unnecessary to start the commit through BE, and the transaction will
be processed through FE in the end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]