morningman commented on code in PR #17233:
URL: https://github.com/apache/doris/pull/17233#discussion_r1127254996
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult
executeMySqlLoadJobFromStmt(ConnectContext context, Load
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
}
}
+ } catch (Throwable t) {
+ LOG.error("Execute mysql load failed", t);
+ // drain the data from client conn util empty packet received,
otherwise the connection will be reset
+ if (loadContextMap.containsKey(loadId) &&
!loadContextMap.get(loadId).isFinished()) {
+ LOG.warn("not drained yet, try reading left data from client
connection.");
Review Comment:
print load id in log, or this log is meaningless
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult
executeMySqlLoadJobFromStmt(ConnectContext context, Load
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
}
}
+ } catch (Throwable t) {
+ LOG.error("Execute mysql load failed", t);
+ // drain the data from client conn util empty packet received,
otherwise the connection will be reset
+ if (loadContextMap.containsKey(loadId) &&
!loadContextMap.get(loadId).isFinished()) {
+ LOG.warn("not drained yet, try reading left data from client
connection.");
+ ByteBuffer buffer = context.getMysqlChannel().fetchOnePacket();
+ // MySql client will send an empty packet when eof
+ while (buffer != null && buffer.limit() != 0) {
+ buffer = context.getMysqlChannel().fetchOnePacket();
+ }
+ LOG.warn("Finished reading the left bytes.");
Review Comment:
should be debug?
##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -1008,6 +1009,9 @@ public void cancel() {
if (coordRef != null) {
coordRef.cancel();
}
+ if (mysqlLoadId != null) {
Review Comment:
Should we reset `mysqlLoadId` after job is cancelled or finished?
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult
executeMySqlLoadJobFromStmt(ConnectContext context, Load
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
}
}
+ } catch (Throwable t) {
+ LOG.error("Execute mysql load failed", t);
+ // drain the data from client conn util empty packet received,
otherwise the connection will be reset
Review Comment:
Do we have the same problem in normal mysql connection?
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -147,7 +244,11 @@ private void fillByteBufferAsync(ConnectContext context,
ByteBufferNetworkInputS
inputStream.fillByteBuffer(buffer);
buffer = context.getMysqlChannel().fetchOnePacket();
}
+ if (loadContextMap.containsKey(loadId)) {
+ loadContextMap.get(loadId).setFinished(true);
+ }
} catch (IOException | InterruptedException e) {
+ LOG.warn("Failed fetch packet from mysql client", e);
Review Comment:
print load id
##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult
executeMySqlLoadJobFromStmt(ConnectContext context, Load
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
}
}
+ } catch (Throwable t) {
+ LOG.error("Execute mysql load failed", t);
Review Comment:
```suggestion
LOG.warn("Execute mysql load {} failed", loadId, t);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]