Copilot commented on code in PR #37472:
URL: https://github.com/apache/shardingsphere/pull/37472#discussion_r2649198553


##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessRegistry.java:
##########
@@ -55,24 +54,29 @@ public static ProcessRegistry getInstance() {
      * @param process process
      */
     public void add(final Process process) {
-        if (isSameExecutionProcess(process)) {
-            merge(processes.get(process.getId()), process);
-        } else {
-            processes.put(process.getId(), process);
-        }
-    }
-    
-    private boolean isSameExecutionProcess(final Process process) {
-        return !Strings.isNullOrEmpty(process.getSql()) && 
processes.containsKey(process.getId()) && 
processes.get(process.getId()).getSql().equalsIgnoreCase(process.getSql());
+        processes.merge(
+                process.getId(),
+                process,
+                this::mergeProcess);
     }
     
-    private void merge(final Process oldProcess, final Process newProcess) {
-        ShardingSpherePreconditions.checkState(!oldProcess.isInterrupted(), 
SQLExecutionInterruptedException::new);
-        
oldProcess.getTotalUnitCount().addAndGet(newProcess.getTotalUnitCount().get());
-        
oldProcess.getCompletedUnitCount().addAndGet(newProcess.getCompletedUnitCount().get());
+    private Process mergeProcess(final Process oldProcess, final Process 
newProcess) {
+        ShardingSpherePreconditions.checkState(
+                !oldProcess.isInterrupted(),
+                SQLExecutionInterruptedException::new);
+        
+        oldProcess.getTotalUnitCount()
+                .addAndGet(newProcess.getTotalUnitCount().get());
+        
+        oldProcess.getCompletedUnitCount()
+                .addAndGet(newProcess.getCompletedUnitCount().get());
+        
         oldProcess.getIdle().set(newProcess.getIdle().get());
-        oldProcess.getInterrupted().compareAndSet(false, 
newProcess.getInterrupted().get());
+        
+        oldProcess.getInterrupted()
+                .compareAndSet(false, newProcess.getInterrupted().get());
         
oldProcess.getProcessStatements().putAll(newProcess.getProcessStatements());

Review Comment:
   The mergeProcess method in ProcessRegistry now merges the SQL from the old 
process, but it never updates it with the new process's SQL. This differs from 
the Process.mergeExecutionGroupContext method which explicitly updates the SQL. 
This inconsistency could lead to the SQL not being updated during the merge 
operation in the registry, causing the process to show stale SQL information.
   ```suggestion
           
oldProcess.getProcessStatements().putAll(newProcess.getProcessStatements());
           oldProcess.setSql(newProcess.getSql());
   ```



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/netty/FrontendChannelInboundHandler.java:
##########
@@ -75,28 +76,55 @@ public void channelRead(final ChannelHandlerContext 
context, final Object messag
     
     private boolean authenticate(final ChannelHandlerContext context, final 
ByteBuf message) {
         try {
-            AuthenticationResult authResult = 
databaseProtocolFrontendEngine.getAuthenticationEngine().authenticate(context,
-                    
databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload(message, 
context.channel().attr(CommonConstants.CHARSET_ATTRIBUTE_KEY).get()));
+            AuthenticationResult authResult = databaseProtocolFrontendEngine
+                    .getAuthenticationEngine()
+                    .authenticate(
+                            context,
+                            databaseProtocolFrontendEngine.getCodecEngine()
+                                    .createPacketPayload(
+                                            message,
+                                            context.channel()
+                                                    
.attr(CommonConstants.CHARSET_ATTRIBUTE_KEY)
+                                                    .get()));
+            
             if (authResult.isFinished()) {
-                connectionSession.setGrantee(new 
Grantee(authResult.getUsername(), authResult.getHostname()));
+                connectionSession.setGrantee(
+                        new Grantee(authResult.getUsername(), 
authResult.getHostname()));
                 
connectionSession.setCurrentDatabaseName(authResult.getDatabase());
-                
connectionSession.setProcessId(processEngine.connect(connectionSession.getUsedDatabaseName(),
 connectionSession.getConnectionContext().getGrantee()));
+                
+                String processId = processEngine.connect(
+                        connectionSession.getUsedDatabaseName(),
+                        connectionSession.getConnectionContext().getGrantee());
+                connectionSession.setProcessId(processId);
+                
+                databaseProtocolFrontendEngine
+                        .bindProcessAfterAuthentication(context.channel(), 
connectionSession);
             }
             return authResult.isFinished();
-            // CHECKSTYLE:OFF
-        } catch (final Exception ex) {
-            // CHECKSTYLE:ON
-            if (ExpectedExceptions.isExpected(ex.getClass())) {
-                log.debug("Exception occur: ", ex);
-            } else {
-                log.error("Exception occur: ", ex);
-            }
-            
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex));
-            context.close();
         } finally {
             message.release();
         }
-        return false;
+    }
+    
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext context, final 
Throwable cause) {
+        handleAuthenticationException(context, cause);

Review Comment:
   The exceptionCaught method now always handles exceptions as authentication 
errors, but it will also be called for exceptions that occur after 
authentication succeeds. This could lead to incorrect error responses being 
sent to clients during command execution phase. The method should check the 
authenticated state before treating all exceptions as authentication failures.
   ```suggestion
           if (authenticated.get()) {
               final Exception exception = cause instanceof Exception ? 
(Exception) cause : new SQLException(cause);
               processException(exception);
               context.close();
           } else {
               handleAuthenticationException(context, cause);
           }
   ```



##########
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java:
##########
@@ -125,20 +124,34 @@ private static synchronized void persist(final String 
jobId, final int shardingI
         
         private static void persist0(final String jobId, final int 
shardingItem, final PipelineJobProgressPersistContext persistContext) {
             long currentUnhandledEventCount = 
persistContext.getUnhandledEventCount().get();
-            ShardingSpherePreconditions.checkState(currentUnhandledEventCount 
>= 0L, () -> new IllegalStateException("Current unhandled event count must be 
greater than or equal to 0"));
+            
+            // Negative count is an error path (tests rely on this)
+            if (currentUnhandledEventCount < 0L) {
+                throw new IllegalStateException("Current unhandled event count 
must be greater than or equal to 0");
+            }
+            
             if (0L == currentUnhandledEventCount) {
                 return;
             }
+            
             Optional<PipelineJobItemContext> jobItemContext = 
PipelineJobRegistry.getItemContext(jobId, shardingItem);
             if (!jobItemContext.isPresent()) {
                 return;
             }
+            
             long startTimeMillis = System.currentTimeMillis();
-            new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
-                    
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption().getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
-            
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
+            new PipelineJobItemManager<>(
+                    TypedSPILoader.getService(
+                            PipelineJobType.class,
+                            
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption().getYamlJobItemProgressSwapper())
+                    .updateProgress(jobItemContext.get());
+            
+            // Reset ONLY after successful persist
+            persistContext.getUnhandledEventCount().set(0L);

Review Comment:
   The change from addAndGet(-currentUnhandledEventCount) to set(0L) introduces 
a race condition. If new events are added to unhandledEventCount after 
capturing currentUnhandledEventCount but before calling set(0L), those new 
events will be lost. The original implementation correctly decremented by the 
captured count to preserve any concurrent additions.
   ```suggestion
               // Reset ONLY after successful persist, preserving any 
concurrent additions
               
persistContext.getUnhandledEventCount().addAndGet(-currentUnhandledEventCount);
   ```



##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -160,8 +189,81 @@ public void removeProcessStatement(final ExecutionUnit 
executionUnit) {
      */
     public void kill() throws SQLException {
         setInterrupted(true);
+        SQLException exception = null;
         for (Statement each : processStatements.values()) {
-            each.cancel();
+            try {
+                each.cancel();
+            } catch (final SQLException ex) {
+                try {
+                    each.close();
+                } catch (final SQLException closeEx) {
+                    if (null == exception) {
+                        exception = closeEx;
+                    } else {
+                        exception.setNextException(closeEx);
+                    }
+                }
+                if (null == exception) {
+                    exception = ex;
+                } else {
+                    exception.setNextException(ex);
+                }
+            }
+        }
+        
+        if (null != exception) {
+            throw exception;
+        }
+    }
+    
+    /**
+     * Merge a new execution group context into the current process.
+     * <p>
+     * This method marks the process as active, updates the total execution 
unit count,
+     * and registers statements from the given execution group context.
+     * </p>
+     *
+     * @param executionGroupContext execution group context to merge
+     * @param sql SQL to be executed
+     */
+    public void mergeExecutionGroupContext(
+                                           final ExecutionGroupContext<? 
extends SQLExecutionUnit> executionGroupContext,
+                                           final String sql) {
+        idle.set(false);
+        // ✅ FIX: update SQL on execution
+        if (!Strings.isNullOrEmpty(sql)) {
+            this.sql = sql;
+        }
+        totalUnitCount.addAndGet(getTotalUnitCount(executionGroupContext));
+        
processStatements.putAll(createProcessStatements(executionGroupContext));
+    }
+    
+    private static final class StatementIdentity {
+        
+        private final Statement statement;
+        
+        private final int hash;
+        
+        StatementIdentity(final Statement statement) {
+            this.statement = statement;
+            hash = System.identityHashCode(statement);
+        }
+        
+        @Override
+        public int hashCode() {
+            return hash;
+        }
+        
+        @Override
+        public boolean equals(final Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof StatementIdentity)) {
+                return false;
+            }
+            StatementIdentity other = (StatementIdentity) obj;
+            return statement == other.statement;
         }

Review Comment:
   The StatementIdentity class does not override toString(), which can make 
debugging difficult when examining the contents of processStatements map. 
Consider adding a toString() method that provides meaningful information about 
the statement identity.
   ```suggestion
           }
           
           @Override
           public String toString() {
               StringBuilder result = new StringBuilder("StatementIdentity{");
               result.append("identityHash=").append(hash);
               if (statement != null) {
                   result.append(", 
statementClass=").append(statement.getClass().getName());
                   result.append(", statement=").append(statement.toString());
               } else {
                   result.append(", statement=null");
               }
               result.append('}');
               return result.toString();
           }
   ```



##########
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java:
##########
@@ -83,6 +83,14 @@ public ConnectionSession(final DatabaseType protocolType, 
final AttributeMap att
         statementManager = new JDBCBackendStatement();
     }
     
+    /**
+     * Set ProcessId.
+     * @param  processId current processId

Review Comment:
   The JavaDoc formatting for setProcessId is inconsistent with the rest of the 
codebase. The parameter description should be on a separate line with proper 
@param annotation, not on the same line.
   ```suggestion
        *
        * @param processId current process id
   ```



##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -160,8 +189,81 @@ public void removeProcessStatement(final ExecutionUnit 
executionUnit) {
      */
     public void kill() throws SQLException {
         setInterrupted(true);
+        SQLException exception = null;
         for (Statement each : processStatements.values()) {
-            each.cancel();
+            try {
+                each.cancel();
+            } catch (final SQLException ex) {
+                try {
+                    each.close();
+                } catch (final SQLException closeEx) {
+                    if (null == exception) {
+                        exception = closeEx;
+                    } else {
+                        exception.setNextException(closeEx);
+                    }
+                }
+                if (null == exception) {
+                    exception = ex;
+                } else {
+                    exception.setNextException(ex);
+                }
+            }

Review Comment:
   In the Process.kill() method, when cancel() fails with a SQLException, the 
code attempts to close() the statement and captures both exceptions. However, 
the logic for tracking the first exception is flawed. After catching the 
SQLException from cancel(), it tries to close() the statement, and if that also 
fails, it sets exception to the close exception. Then it checks if exception is 
null and sets it to the cancel exception. This means that if close() fails, the 
cancel exception is lost. The cancel exception should be captured first before 
attempting close().



##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -89,6 +88,30 @@ private Process(final String sql, final 
ExecutionGroupContext<? extends SQLExecu
         interrupted = new AtomicBoolean();
     }
     
+    // Constructor for YAML swapper / deserialization
+    public Process(final String id,
+                   final long startMillis,
+                   final String sql,
+                   final String databaseName,
+                   final String username,
+                   final String hostname,
+                   final AtomicInteger totalUnitCount,
+                   final AtomicInteger completedUnitCount,
+                   final AtomicBoolean idle,
+                   final AtomicBoolean interrupted) {
+        
+        this.id = id;
+        this.startMillis = startMillis;
+        this.sql = sql;
+        this.databaseName = databaseName;
+        this.username = username;
+        this.hostname = hostname;
+        this.totalUnitCount = totalUnitCount;
+        this.completedUnitCount = completedUnitCount;
+        this.idle = idle;
+        this.interrupted = interrupted;
+    }
+    

Review Comment:
   The new constructor added at lines 92-113 is documented as being for YAML 
swapper/deserialization, but this constructor is never actually used in the 
diff or shown to be wired to any YAML swapper. Adding a constructor without 
actual usage can lead to maintenance issues as the unused code path may not be 
tested or maintained properly.
   ```suggestion
   
   ```



##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java:
##########
@@ -81,8 +86,22 @@ public void disconnect(final String processId) {
      * @param executionGroupContext execution group context
      * @param queryContext query context
      */
-    public void executeSQL(final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext, final QueryContext queryContext) {
-        ProcessRegistry.getInstance().add(new Process(queryContext.getSql(), 
executionGroupContext));
+    public void executeSQL(
+                           final ExecutionGroupContext<? extends 
SQLExecutionUnit> executionGroupContext,
+                           final QueryContext queryContext) {
+        
+        String processId = 
executionGroupContext.getReportContext().getProcessId();
+        if (Strings.isNullOrEmpty(processId)) {
+            return;
+        }
+        ProcessRegistry registry = ProcessRegistry.getInstance();
+        Process process = registry.get(processId);
+        if (null == process) {
+            // Federation execution: processId is externally managed
+            registry.add(new Process(executionGroupContext));
+            process = registry.get(processId);

Review Comment:
   The executeSQL method has a comment indicating that federation execution 
creates a new process if it doesn't exist. However, after creating the process 
at line 101, it immediately retrieves it again from the registry at line 102. 
This is redundant since we just created it and can use the Process 
constructor's return value directly, or we can use the newly added process 
directly without fetching it again.
   ```suggestion
               process = new Process(executionGroupContext);
               registry.add(process);
   ```



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionIdGenerator.java:
##########
@@ -45,9 +54,95 @@ public static ConnectionIdGenerator getInstance() {
      * @return next connection ID
      */
     public synchronized int nextId() {
+        Integer clusterGeneratedId = tryGenerateClusterConnectionId();
+        if (null != clusterGeneratedId) {
+            return clusterGeneratedId;
+        }
         if (currentId == Integer.MAX_VALUE) {
             currentId = 0;
         }
         return ++currentId;

Review Comment:
   After MAX_CLUSTER_GENERATE_ATTEMPTS failed attempts to generate a unique 
cluster connection ID, the method falls back to using the local sequential ID 
generator. However, this fallback ID is not registered in the cluster 
repository, which could lead to collisions across different proxy instances in 
a cluster. When cluster mode is active but ID generation fails, the method 
should either retry with exponential backoff, throw an exception, or register 
the fallback ID in the cluster repository.



##########
proxy/frontend/dialect/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/command/query/text/query/MySQLComQueryPacketExecutor.java:
##########
@@ -64,12 +66,59 @@ public final class MySQLComQueryPacketExecutor implements 
QueryCommandExecutor {
     
     public MySQLComQueryPacketExecutor(final MySQLComQueryPacket packet, final 
ConnectionSession connectionSession) throws SQLException {
         this.connectionSession = connectionSession;
+        
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "MySQL");
-        SQLStatement sqlStatement = 
ProxySQLComQueryParser.parse(packet.getSQL(), databaseType, connectionSession);
+        
+        // 1. Parse SQL into AST
+        SQLStatement sqlStatement = ProxySQLComQueryParser.parse(
+                packet.getSQL(), databaseType, connectionSession);
+        
+        // 2. Normalize KILL / KILL QUERY
+        sqlStatement = normalizeKillStatement(sqlStatement);
+        
+        // 3. Create backend handler
         proxyBackendHandler = areMultiStatements(connectionSession, 
sqlStatement, packet.getSQL())
-                ? new 
MySQLMultiStatementsProxyBackendHandler(connectionSession, sqlStatement, 
packet.getSQL())
-                : ProxyBackendHandlerFactory.newInstance(databaseType, 
packet.getSQL(), sqlStatement, connectionSession, packet.getHintValueContext());
-        characterSet = 
connectionSession.getAttributeMap().attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY).get().getId();
+                ? new MySQLMultiStatementsProxyBackendHandler(
+                        connectionSession, sqlStatement, packet.getSQL())
+                : ProxyBackendHandlerFactory.newInstance(
+                        databaseType,
+                        packet.getSQL(),
+                        sqlStatement,
+                        connectionSession,
+                        packet.getHintValueContext());
+        
+        // 4. Resolve character set
+        characterSet = connectionSession.getAttributeMap()
+                .attr(MySQLConstants.CHARACTER_SET_ATTRIBUTE_KEY)
+                .get()
+                .getId();
+    }
+    
+    private SQLStatement normalizeKillStatement(final SQLStatement 
sqlStatement) {
+        if (!(sqlStatement instanceof MySQLKillStatement)) {
+            return sqlStatement;
+        }
+        MySQLKillStatement kill = (MySQLKillStatement) sqlStatement;
+        String scope = kill.getScope();
+        // Only normalize KILL QUERY <connectionId>.
+        // KILL <id> without scope is treated as CONNECTION semantics and is 
not rewritten.
+        if (null == scope || !"QUERY".equalsIgnoreCase(scope)) {
+            return sqlStatement;
+        }
+        String id = kill.getProcessId();
+        if (null == id || !id.chars().allMatch(Character::isDigit)) {
+            return sqlStatement;
+        }
+        String processId = MySQLConnectionIdRegistry.getInstance()
+                .getProcessId(Long.parseLong(id));

Review Comment:
   Potential uncaught 'java.lang.NumberFormatException'.
   ```suggestion
           long numericId;
           try {
               numericId = Long.parseLong(id);
           } catch (NumberFormatException ex) {
               // If the ID cannot be parsed as a long (e.g., overflow or empty 
string), skip normalization.
               return sqlStatement;
           }
           String processId = MySQLConnectionIdRegistry.getInstance()
                   .getProcessId(numericId);
   ```



##########
infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/Process.java:
##########
@@ -44,13 +43,13 @@
 @Getter
 public final class Process {
     
-    private final Map<Integer, Statement> processStatements = new 
ConcurrentHashMap<>();
+    private final Map<StatementIdentity, Statement> processStatements = new 
ConcurrentHashMap<>();

Review Comment:
   getProcessStatements exposes the internal representation stored in field 
processStatements. The value may be modified [after this call to 
getProcessStatements](1).



##########
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/connection/ConnectionIdGenerator.java:
##########
@@ -45,9 +54,95 @@ public static ConnectionIdGenerator getInstance() {
      * @return next connection ID
      */
     public synchronized int nextId() {
+        Integer clusterGeneratedId = tryGenerateClusterConnectionId();
+        if (null != clusterGeneratedId) {
+            return clusterGeneratedId;
+        }
         if (currentId == Integer.MAX_VALUE) {
             currentId = 0;
         }
         return ++currentId;
     }

Review Comment:
   The new cluster mode connection ID generation logic in ConnectionIdGenerator 
is not covered by tests. The existing tests only verify the fallback local ID 
generation, but don't test the tryGenerateClusterConnectionId path, releaseId 
functionality, or the cluster repository integration.



##########
proxy/frontend/dialect/mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/MySQLFrontendEngine.java:
##########
@@ -60,4 +71,22 @@ public void handleException(final ConnectionSession 
connectionSession, final Exc
     public String getDatabaseType() {
         return "MySQL";
     }
+    
+    /**
+     * Bind MySQL native connection ID with process ID after authentication.
+     *
+     * @param channel Netty channel
+     * @param session connection session
+     */

Review Comment:
   This method overrides 
[DatabaseProtocolFrontendEngine.bindProcessAfterAuthentication](1); it is 
advisable to add an Override annotation.
   ```suggestion
        */
       @Override
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to