This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 3e096dda914 [Cherry-pick]Support kill query in be (#35794)
3e096dda914 is described below
commit 3e096dda914d93aaee66f22a47b98588b89c9ae7
Author: wangbo <[email protected]>
AuthorDate: Mon Jun 3 15:39:30 2024 +0800
[Cherry-pick]Support kill query in be (#35794)
## Proposed changes
pick #35602
```
mysql [information_schema]>kill query '2047df937c66704d-3ac4cfaf17f65eae';
Query OK, 0 rows affected (0.01 sec)
I20240603 15:21:50.373333 3355508 internal_service.cpp:592] Cancel query
2047df937c66704d-3ac4cfaf17f65eae, reason: USER_CANCEL
```
---
.../org/apache/doris/common/util/DebugUtil.java | 24 ++++++++++++
.../java/org/apache/doris/qe/StmtExecutor.java | 30 ++++++++++++---
.../org/apache/doris/rpc/BackendServiceProxy.java | 18 +++++++++
.../apache/doris/common/util/DebugUtilTest.java | 45 ++++++++++++++++++++++
4 files changed, 112 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
index 75fb331347e..2a52420a96d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugUtil.java
@@ -21,6 +21,8 @@ import org.apache.doris.common.Pair;
import org.apache.doris.proto.Types;
import org.apache.doris.thrift.TUniqueId;
+import com.google.common.base.Strings;
+
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.DecimalFormat;
@@ -135,6 +137,28 @@ public class DebugUtil {
return builder.toString();
}
+ // id is a String generated by DebugUtil.printId(TUniqueId)
+ public static TUniqueId parseTUniqueIdFromString(String id) {
+ if (Strings.isNullOrEmpty(id)) {
+ throw new NumberFormatException("invalid query id");
+ }
+
+ String[] parts = id.split("-");
+ if (parts.length != 2) {
+ throw new NumberFormatException("invalid query id");
+ }
+
+ TUniqueId uniqueId = new TUniqueId();
+ try {
+ uniqueId.setHi(Long.parseUnsignedLong(parts[0], 16));
+ uniqueId.setLo(Long.parseUnsignedLong(parts[1], 16));
+ } catch (NumberFormatException e) {
+ throw new NumberFormatException("invalid query id:" +
e.getMessage());
+ }
+
+ return uniqueId;
+ }
+
public static String printId(final UUID id) {
TUniqueId tUniqueId = new TUniqueId(id.getMostSignificantBits(),
id.getLeastSignificantBits());
StringBuilder builder = new StringBuilder();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c90ca0b5ff7..ba6d200847c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -220,6 +220,7 @@ import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -1419,16 +1420,15 @@ public class StmtExecutor {
}
// Handle kill statement.
- private void handleKill() throws DdlException {
+ private void handleKill() throws UserException {
KillStmt killStmt = (KillStmt) parsedStmt;
ConnectContext killCtx = null;
int id = killStmt.getConnectionId();
String queryId = killStmt.getQueryId();
if (id == -1) {
+ // when killCtx == null, this means the query not in FE,
+ // then we just send kill signal to BE
killCtx =
context.getConnectScheduler().getContextWithQueryId(queryId);
- if (killCtx == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY,
queryId);
- }
} else {
killCtx = context.getConnectScheduler().getContext(id);
if (killCtx == null) {
@@ -1436,7 +1436,27 @@ public class StmtExecutor {
}
}
- if (context == killCtx) {
+ if (killCtx == null) {
+ TUniqueId tQueryId = null;
+ try {
+ tQueryId = DebugUtil.parseTUniqueIdFromString(queryId);
+ } catch (NumberFormatException e) {
+ throw new UserException(e.getMessage());
+ }
+ LOG.info("kill query {}", queryId);
+ Collection<Backend> nodesToPublish =
Env.getCurrentSystemInfo().getIdToBackend().values();
+ for (Backend be : nodesToPublish) {
+ if (be.isAlive()) {
+ try {
+ BackendServiceProxy.getInstance()
+
.cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId,
+
Types.PPlanFragmentCancelReason.USER_CANCEL);
+ } catch (Throwable t) {
+ LOG.info("send kill query {} rpc to be {} failed",
queryId, be);
+ }
+ }
+ }
+ } else if (context == killCtx) {
// Suicide
context.setKilled();
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 0488fec2062..0fce50c327b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -276,6 +276,24 @@ public class BackendServiceProxy {
}
}
+ public ListenableFuture<InternalService.PCancelPlanFragmentResult>
cancelPipelineXPlanFragmentAsync(
+ TNetworkAddress address, TUniqueId queryId,
+ Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
+ final InternalService.PCancelPlanFragmentRequest pRequest =
InternalService.PCancelPlanFragmentRequest
+ .newBuilder()
+
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
+ .setCancelReason(cancelReason)
+
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.cancelPlanFragmentAsync(pRequest);
+ } catch (Throwable e) {
+ LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
address.getHostname(), address.getPort(),
+ e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
public Future<InternalService.PFetchDataResult> fetchDataAsync(
TNetworkAddress address, InternalService.PFetchDataRequest
request) throws RpcException {
try {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java
index bebe65cd2e0..54a3f4c388b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugUtilTest.java
@@ -19,10 +19,13 @@ package org.apache.doris.common.util;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
+import org.apache.doris.thrift.TUniqueId;
import org.junit.Assert;
import org.junit.Test;
+import java.util.UUID;
+
public class DebugUtilTest {
@Test
public void testGetUint() {
@@ -97,4 +100,46 @@ public class DebugUtilTest {
.contains("org.apache.doris.common.DdlException: errCode = 2,
detailMessage = only one exception"));
Assert.assertEquals("unknown", Util.getRootCauseStack(null));
}
+
+ @Test
+ public void testParseIdFromString() {
+ // test null
+ TUniqueId nullTUniqueId = null;
+ try {
+ nullTUniqueId = DebugUtil.parseTUniqueIdFromString(null);
+ } catch (NumberFormatException e) {
+ Assert.assertTrue("invalid query id".equals(e.getMessage()));
+ }
+ Assert.assertTrue(nullTUniqueId == null);
+
+
+ try {
+ nullTUniqueId = DebugUtil.parseTUniqueIdFromString("");
+ } catch (NumberFormatException e) {
+ Assert.assertTrue("invalid query id".equals(e.getMessage()));
+ }
+ Assert.assertTrue(nullTUniqueId == null);
+
+ Assert.assertEquals(new TUniqueId(),
DebugUtil.parseTUniqueIdFromString("0-0"));
+
+ try {
+ nullTUniqueId =
DebugUtil.parseTUniqueIdFromString("INVALID-STRING");
+ } catch (NumberFormatException e) {
+ Assert.assertTrue(e.getMessage().contains("For input string"));
+ }
+ Assert.assertTrue(nullTUniqueId == null);
+
+ for (int i = 0; i < 100; i++) {
+ UUID uuid = UUID.randomUUID();
+ TUniqueId originTQueryId = new
TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
+ String originStrQueryId = DebugUtil.printId(originTQueryId);
+
+ TUniqueId convertedTQueryId =
DebugUtil.parseTUniqueIdFromString(originStrQueryId);
+ String convertedStrQueryId = DebugUtil.printId(convertedTQueryId);
+
+ Assert.assertTrue(originTQueryId.hi == convertedTQueryId.hi);
+ Assert.assertTrue(originTQueryId.lo == convertedTQueryId.lo);
+ Assert.assertTrue(originStrQueryId.equals(convertedStrQueryId));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]