Copilot commented on code in PR #37472:
URL: https://github.com/apache/shardingsphere/pull/37472#discussion_r2649198553
##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java:
##########
@@ -55,24 +54,29 @@ public static ProcessRegistry getInstance() {
* @param process process
*/
public void add(final Process process) {
- if (isSameExecutionProcess(process)) {
- merge(processes.get(process.getId()), process);
- } else {
- processes.put(process.getId(), process);
- }
- }
-
- private boolean isSameExecutionProcess(final Process process) {
- return !Strings.isNullOrEmpty(process.getSql()) &&
processes.containsKey(process.getId()) &&
processes.get(process.getId()).getSql().equalsIgnoreCase(process.getSql());
+ processes.merge(
+ process.getId(),
+ process,
+ this::mergeProcess);
}
- private void merge(final Process oldProcess, final Process newProcess) {
- ShardingSpherePreconditions.checkState(!oldProcess.isInterrupted(),
SQLExecutionInterruptedException::new);
-
oldProcess.getTotalUnitCount().addAndGet(newProcess.getTotalUnitCount().get());
-
oldProcess.getCompletedUnitCount().addAndGet(newProcess.getCompletedUnitCount().get());
+ private Process mergeProcess(final Process oldProcess, final Process
newProcess) {
+ ShardingSpherePreconditions.checkState(
+ !oldProcess.isInterrupted(),
+ SQLExecutionInterruptedException::new);
+
+ oldProcess.getTotalUnitCount()
+ .addAndGet(newProcess.getTotalUnitCount().get());
+
+ oldProcess.getCompletedUnitCount()
+ .addAndGet(newProcess.getCompletedUnitCount().get());
+
oldProcess.getIdle().set(newProcess.getIdle().get());
- oldProcess.getInterrupted().compareAndSet(false,
newProcess.getInterrupted().get());
+
+ oldProcess.getInterrupted()
+ .compareAndSet(false, newProcess.getInterrupted().get());
oldProcess.getProcessStatements().putAll(newProcess.getProcessStatements());
Review Comment:
The mergeProcess method in ProcessRegistry now merges the SQL from the old
process, but it never updates it with the new process's SQL. This differs from
the Process.mergeExecutionGroupContext method which explicitly updates the SQL.
This inconsistency could lead to the SQL not being updated during the merge
operation in the registry, causing the process to show stale SQL information.
```suggestion
oldProcess.getProcessStatements().putAll(newProcess.getProcessStatements());
oldProcess.setSql(newProcess.getSql());
```
##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java:
##########
@@ -75,28 +76,55 @@ public void channelRead(final ChannelHandlerContext
context, final Object messag
private boolean authenticate(final ChannelHandlerContext context, final
ByteBuf message) {
try {
- AuthenticationResult authResult =
databaseProtocolFrontendEngine.getAuthenticationEngine().authenticate(context,
-
databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload(message,
context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()));
+ AuthenticationResult authResult = databaseProtocolFrontendEngine
+ .getAuthenticationEngine()
+ .authenticate(
+ context,
+ databaseProtocolFrontendEngine.getCodecEngine()
+ .createPacketPayload(
+ message,
+ context.channel()
+
.attr(CommonConstants.CHARSET_ATTRIBUTE_KEY)
+ .get()));
+
if (authResult.isFinished()) {
- connectionSession.setGrantee(new
Grantee(authResult.getUsername(), authResult.getHostname()));
+ connectionSession.setGrantee(
+ new Grantee(authResult.getUsername(),
authResult.getHostname()));
connectionSession.setCurrentDatabaseName(authResult.getDatabase());
-
connectionSession.setProcessId(processEngine.connect(connectionSession.getUsedDatabaseName(),
connectionSession.getConnectionContext().getGrantee()));
+
+ String processId = processEngine.connect(
+ connectionSession.getUsedDatabaseName(),
+ connectionSession.getConnectionContext().getGrantee());
+ connectionSession.setProcessId(processId);
+
+ databaseProtocolFrontendEngine
+ .bindProcessAfterAuthentication(context.channel(),
connectionSession);
}
return authResult.isFinished();
- // CHECKSTYLE:OFF
- } catch (final Exception ex) {
- // CHECKSTYLE:ON
- if (ExpectedExceptions.isExpected(ex.getClass())) {
- log.debug("Exception occur: ", ex);
- } else {
- log.error("Exception occur: ", ex);
- }
-
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex));
- context.close();
} finally {
message.release();
}
- return false;
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext context, final
Throwable cause) {
+ handleAuthenticationException(context, cause);
Review Comment:
The exceptionCaught method now always handles exceptions as authentication
errors, but it will also be called for exceptions that occur after
authentication succeeds. This could lead to incorrect error responses being
sent to clients during command execution phase. The method should check the
authenticated state before treating all exceptions as authentication failures.
```suggestion
if (authenticated.get()) {
final Exception exception = cause instanceof Exception ?
(Exception) cause : new SQLException(cause);
processException(exception);
context.close();
} else {
handleAuthenticationException(context, cause);
}
```
##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java:
##########
@@ -125,20 +124,34 @@ private static synchronized void persist(final String
jobId, final int shardingI
private static void persist0(final String jobId, final int
shardingItem, final PipelineJobProgressPersistContext persistContext) {
long currentUnhandledEventCount =
persistContext.getUnhandledEventCount().get();
- ShardingSpherePreconditions.checkState(currentUnhandledEventCount
>= 0L, () -> new IllegalStateException("Current unhandled event count must be
greater than or equal to 0"));
+
+ // Negative count is an error path (tests rely on this)
+ if (currentUnhandledEventCount < 0L) {
+ throw new IllegalStateException("Current unhandled event count
must be greater than or equal to 0");
+ }
+
if (0L == currentUnhandledEventCount) {
return;
}
+
Optional<PipelineJobItemContext> jobItemContext =
PipelineJobRegistry.getItemContext(jobId, shardingItem);
if (!jobItemContext.isPresent()) {
return;
}
+
long startTimeMillis = System.currentTimeMillis();
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
-
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption().getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
-
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
+ new PipelineJobItemManager<>(
+ TypedSPILoader.getService(
+ PipelineJobType.class,
+
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption().getYamlJobItemProgressSwapper())
+ .updateProgress(jobItemContext.get());
+
+ // Reset ONLY after successful persist
+ persistContext.getUnhandledEventCount().set(0L);
Review Comment:
The change from addAndGet(-currentUnhandledEventCount) to set(0L) introduces
a race condition. If new events are added to unhandledEventCount after
capturing currentUnhandledEventCount but before calling set(0L), those new
events will be lost. The original implementation correctly decremented by the
captured count to preserve any concurrent additions.
```suggestion
// Reset ONLY after successful persist, preserving any
concurrent additions
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
```
##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -160,8 +189,81 @@ public void removeProcessStatement(final ExecutionUnit
executionUnit) {
*/
public void kill() throws SQLException {
setInterrupted(true);
+ SQLException exception = null;
for (Statement each : processStatements.values()) {
- each.cancel();
+ try {
+ each.cancel();
+ } catch (final SQLException ex) {
+ try {
+ each.close();
+ } catch (final SQLException closeEx) {
+ if (null == exception) {
+ exception = closeEx;
+ } else {
+ exception.setNextException(closeEx);
+ }
+ }
+ if (null == exception) {
+ exception = ex;
+ } else {
+ exception.setNextException(ex);
+ }
+ }
+ }
+
+ if (null != exception) {
+ throw exception;
+ }
+ }
+
+ /**
+ * Merge a new execution group context into the current process.
+ * <p>
+ * This method marks the process as active, updates the total execution
unit count,
+ * and registers statements from the given execution group context.
+ * </p>
+ *
+ * @param executionGroupContext execution group context to merge
+ * @param sql SQL to be executed
+ */
+ public void mergeExecutionGroupContext(
+ final ExecutionGroupContext<?
extends SQLExecutionUnit> executionGroupContext,
+ final String sql) {
+ idle.set(false);
+ // ✅ FIX: update SQL on execution
+ if (!Strings.isNullOrEmpty(sql)) {
+ this.sql = sql;
+ }
+ totalUnitCount.addAndGet(getTotalUnitCount(executionGroupContext));
+
processStatements.putAll(createProcessStatements(executionGroupContext));
+ }
+
+ private static final class StatementIdentity {
+
+ private final Statement statement;
+
+ private final int hash;
+
+ StatementIdentity(final Statement statement) {
+ this.statement = statement;
+ hash = System.identityHashCode(statement);
+ }
+
+ @Override
+ public int hashCode() {
+ return hash;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof StatementIdentity)) {
+ return false;
+ }
+ StatementIdentity other = (StatementIdentity) obj;
+ return statement == other.statement;
}
Review Comment:
The StatementIdentity class does not override toString(), which can make
debugging difficult when examining the contents of processStatements map.
Consider adding a toString() method that provides meaningful information about
the statement identity.
```suggestion
}
@Override
public String toString() {
StringBuilder result = new StringBuilder("StatementIdentity{");
result.append("identityHash=").append(hash);
if (statement != null) {
result.append(",
statementClass=").append(statement.getClass().getName());
result.append(", statement=").append(statement.toString());
} else {
result.append(", statement=null");
}
result.append('}');
return result.toString();
}
```
##########
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java:
##########
@@ -83,6 +83,14 @@ public ConnectionSession(final DatabaseType protocolType,
final AttributeMap att
statementManager = new JDBCBackendStatement();
}
+ /**
+ * Set ProcessId.
+ * @param processId current processId
Review Comment:
The JavaDoc formatting for setProcessId is inconsistent with the rest of the
codebase. The parameter description should be on a separate line with proper
@param annotation, not on the same line.
```suggestion
*
* @param processId current process id
```
##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -160,8 +189,81 @@ public void removeProcessStatement(final ExecutionUnit
executionUnit) {
*/
public void kill() throws SQLException {
setInterrupted(true);
+ SQLException exception = null;
for (Statement each : processStatements.values()) {
- each.cancel();
+ try {
+ each.cancel();
+ } catch (final SQLException ex) {
+ try {
+ each.close();
+ } catch (final SQLException closeEx) {
+ if (null == exception) {
+ exception = closeEx;
+ } else {
+ exception.setNextException(closeEx);
+ }
+ }
+ if (null == exception) {
+ exception = ex;
+ } else {
+ exception.setNextException(ex);
+ }
+ }
Review Comment:
In the Process.kill() method, when cancel() fails with a SQLException, the
code attempts to close() the statement and captures both exceptions. However,
the logic for tracking the first exception is flawed. After catching the
SQLException from cancel(), it tries to close() the statement, and if that also
fails, it sets exception to the close exception. Then it checks if exception is
null and sets it to the cancel exception. This means that if close() fails, the
cancel exception is lost. The cancel exception should be captured first before
attempting close().
##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -89,6 +88,30 @@ private Process(final String sql, final
ExecutionGroupContext<? extends SQLExecu
interrupted = new AtomicBoolean();
}
+ // Constructor for YAML swapper / deserialization
+ public Process(final String id,
+ final long startMillis,
+ final String sql,
+ final String databaseName,
+ final String username,
+ final String hostname,
+ final AtomicInteger totalUnitCount,
+ final AtomicInteger completedUnitCount,
+ final AtomicBoolean idle,
+ final AtomicBoolean interrupted) {
+
+ this.id = id;
+ this.startMillis = startMillis;
+ this.sql = sql;
+ this.databaseName = databaseName;
+ this.username = username;
+ this.hostname = hostname;
+ this.totalUnitCount = totalUnitCount;
+ this.completedUnitCount = completedUnitCount;
+ this.idle = idle;
+ this.interrupted = interrupted;
+ }
+
Review Comment:
The new constructor added at lines 92-113 is documented as being for YAML
swapper/deserialization, but this constructor is never actually used in the
diff or shown to be wired to any YAML swapper. Adding a constructor without
actual usage can lead to maintenance issues as the unused code path may not be
tested or maintained properly.
```suggestion
```
##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java:
##########
@@ -81,8 +86,22 @@ public void disconnect(final String processId) {
* @param executionGroupContext execution group context
* @param queryContext query context
*/
- public void executeSQL(final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
- ProcessRegistry.getInstance().add(new Process(queryContext.getSql(),
executionGroupContext));
+ public void executeSQL(
+ final ExecutionGroupContext<? extends
SQLExecutionUnit> executionGroupContext,
+ final QueryContext queryContext) {
+
+ String processId =
executionGroupContext.getReportContext().getProcessId();
+ if (Strings.isNullOrEmpty(processId)) {
+ return;
+ }
+ ProcessRegistry registry = ProcessRegistry.getInstance();
+ Process process = registry.get(processId);
+ if (null == process) {
+ // Federation execution: processId is externally managed
+ registry.add(new Process(executionGroupContext));
+ process = registry.get(processId);
Review Comment:
The executeSQL method has a comment indicating that federation execution
creates a new process if it doesn't exist. However, after creating the process
at line 101, it immediately retrieves it again from the registry at line 102.
This is redundant since we just created it and can use the Process
constructor's return value directly, or we can use the newly added process
directly without fetching it again.
```suggestion
process = new Process(executionGroupContext);
registry.add(process);
```
##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionIdGenerator.java:
##########
@@ -45,9 +54,95 @@ public static ConnectionIdGenerator getInstance() {
* @return next connection ID
*/
public synchronized int nextId() {
+ Integer clusterGeneratedId = tryGenerateClusterConnectionId();
+ if (null != clusterGeneratedId) {
+ return clusterGeneratedId;
+ }
if (currentId == Integer.MAX_VALUE) {
currentId = 0;
}
return ++currentId;
Review Comment:
After MAX_CLUSTER_GENERATE_ATTEMPTS failed attempts to generate a unique
cluster connection ID, the method falls back to using the local sequential ID
generator. However, this fallback ID is not registered in the cluster
repository, which could lead to collisions across different proxy instances in
a cluster. When cluster mode is active but ID generation fails, the method
should either retry with exponential backoff, throw an exception, or register
the fallback ID in the cluster repository.
##########
proxy/frontend/dialect/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java:
##########
@@ -64,12 +66,59 @@ public final class MySQLComQueryPacketExecutor implements
QueryCommandExecutor {
public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final
ConnectionSession connectionSession) throws SQLException {
this.connectionSession = connectionSession;
+
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "MySQL");
- SQLStatement sqlStatement =
ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
+
+ // 1. Parse SQL into AST
+ SQLStatement sqlStatement = ProxySQLComQueryParser.parse(
+ packet.getSQL(), databaseType, connectionSession);
+
+ // 2. Normalize KILL / KILL QUERY
+ sqlStatement = normalizeKillStatement(sqlStatement);
+
+ // 3. Create backend handler
proxyBackendHandler = areMultiStatements(connectionSession,
sqlStatement, packet.getSQL())
- ? new
MySQLMultiStatementsProxyBackendHandler(connectionSession, sqlStatement,
packet.getSQL())
- : ProxyBackendHandlerFactory.newInstance(databaseType,
packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
- characterSet =
connectionSession.getAttributeMap().attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
+ ? new MySQLMultiStatementsProxyBackendHandler(
+ connectionSession, sqlStatement, packet.getSQL())
+ : ProxyBackendHandlerFactory.newInstance(
+ databaseType,
+ packet.getSQL(),
+ sqlStatement,
+ connectionSession,
+ packet.getHintValueContext());
+
+ // 4. Resolve character set
+ characterSet = connectionSession.getAttributeMap()
+ .attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY)
+ .get()
+ .getId();
+ }
+
+ private SQLStatement normalizeKillStatement(final SQLStatement
sqlStatement) {
+ if (!(sqlStatement instanceof MySQLKillStatement)) {
+ return sqlStatement;
+ }
+ MySQLKillStatement kill = (MySQLKillStatement) sqlStatement;
+ String scope = kill.getScope();
+ // Only normalize KILL QUERY <connectionId>.
+ // KILL <id> without scope is treated as CONNECTION semantics and is
not rewritten.
+ if (null == scope || !"QUERY".equalsIgnoreCase(scope)) {
+ return sqlStatement;
+ }
+ String id = kill.getProcessId();
+ if (null == id || !id.chars().allMatch(Character::isDigit)) {
+ return sqlStatement;
+ }
+ String processId = MySQLConnectionIdRegistry.getInstance()
+ .getProcessId(Long.parseLong(id));
Review Comment:
Potential uncaught 'java.lang.NumberFormatException'.
```suggestion
long numericId;
try {
numericId = Long.parseLong(id);
} catch (NumberFormatException ex) {
// If the ID cannot be parsed as a long (e.g., overflow or empty
string), skip normalization.
return sqlStatement;
}
String processId = MySQLConnectionIdRegistry.getInstance()
.getProcessId(numericId);
```
##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -44,13 +43,13 @@
@Getter
public final class Process {
- private final Map<Integer, Statement> processStatements = new
ConcurrentHashMap<>();
+ private final Map<StatementIdentity, Statement> processStatements = new
ConcurrentHashMap<>();
Review Comment:
getProcessStatements exposes the internal representation stored in field
processStatements. The value may be modified [after this call to
getProcessStatements](1).
##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionIdGenerator.java:
##########
@@ -45,9 +54,95 @@ public static ConnectionIdGenerator getInstance() {
* @return next connection ID
*/
public synchronized int nextId() {
+ Integer clusterGeneratedId = tryGenerateClusterConnectionId();
+ if (null != clusterGeneratedId) {
+ return clusterGeneratedId;
+ }
if (currentId == Integer.MAX_VALUE) {
currentId = 0;
}
return ++currentId;
}
Review Comment:
The new cluster mode connection ID generation logic in ConnectionIdGenerator
is not covered by tests. The existing tests only verify the fallback local ID
generation, but don't test the tryGenerateClusterConnectionId path, releaseId
functionality, or the cluster repository integration.
##########
proxy/frontend/dialect/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java:
##########
@@ -60,4 +71,22 @@ public void handleException(final ConnectionSession
connectionSession, final Exc
public String getDatabaseType() {
return "MySQL";
}
+
+ /**
+ * Bind MySQL native connection ID with process ID after authentication.
+ *
+ * @param channel Netty channel
+ * @param session connection session
+ */
Review Comment:
This method overrides
[DatabaseProtocolFrontendEngine.bindProcessAfterAuthentication](1); it is
advisable to add an Override annotation.
```suggestion
*/
@Override
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]