This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a05582f9901 [Enhancement] (nereids)implement showSyncJobCommand in 
nereids (#44817)
a05582f9901 is described below

commit a05582f990117ec6b19925ce72dd81c8ff2164f4
Author: Sridhar R Manikarnike <sridhar.n...@gmail.com>
AuthorDate: Thu Dec 19 09:06:49 2024 +0530

    [Enhancement] (nereids)implement showSyncJobCommand in nereids (#44817)
    
    issue Number: close #42775
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |   2 +-
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  12 +++
 .../apache/doris/nereids/trees/plans/PlanType.java |   1 +
 .../trees/plans/commands/ShowSyncJobCommand.java   | 115 +++++++++++++++++++++
 .../trees/plans/visitor/CommandVisitor.java        |   5 +
 .../show/test_show_sync_job_command.groovy         |  56 ++++++++++
 6 files changed, 190 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index ad884eefee1..6c37a2b276b 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -239,6 +239,7 @@ supportedShowStatement
     | SHOW DELETE ((FROM | IN) database=multipartIdentifier)?                  
     #showDelete
     | SHOW ALL? GRANTS                                                         
     #showGrants
     | SHOW GRANTS FOR userIdentify                                             
     #showGrantsForUser
+    | SHOW SYNC JOB ((FROM | IN) database=multipartIdentifier)?                
     #showSyncJob    
     | SHOW LOAD PROFILE loadIdPath=STRING_LITERAL                              
     #showLoadProfile
     | SHOW CREATE REPOSITORY FOR identifier                                    
     #showCreateRepository
     | SHOW VIEW
@@ -366,7 +367,6 @@ unsupportedShowStatement
         ((FROM | IN) database=multipartIdentifier)?                            
     #showIndex
     | SHOW TRANSACTION ((FROM | IN) database=multipartIdentifier)? wildWhere?  
     #showTransaction
     | SHOW CACHE HOTSPOT tablePath=STRING_LITERAL                              
     #showCacheHotSpot
-    | SHOW SYNC JOB ((FROM | IN) database=multipartIdentifier)?                
     #showSyncJob
     | SHOW CATALOG RECYCLE BIN wildWhere?                                      
     #showCatalogRecycleBin
     | SHOW QUERY STATS ((FOR database=identifier)
             | (FROM tableName=multipartIdentifier (ALL VERBOSE?)?))?           
     #showQueryStats
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 4c5d43cb4db..a9ce9215d4d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -279,6 +279,7 @@ import 
org.apache.doris.nereids.DorisParser.ShowRolesContext;
 import org.apache.doris.nereids.DorisParser.ShowSmallFilesContext;
 import org.apache.doris.nereids.DorisParser.ShowSqlBlockRuleContext;
 import org.apache.doris.nereids.DorisParser.ShowStorageEnginesContext;
+import org.apache.doris.nereids.DorisParser.ShowSyncJobContext;
 import org.apache.doris.nereids.DorisParser.ShowTableCreationContext;
 import org.apache.doris.nereids.DorisParser.ShowTableIdContext;
 import org.apache.doris.nereids.DorisParser.ShowTabletStorageFormatContext;
@@ -593,6 +594,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.ShowRolesCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowSmallFilesCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowSqlBlockRuleCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowStorageEnginesCommand;
+import org.apache.doris.nereids.trees.plans.commands.ShowSyncJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowTableCreationCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowTableIdCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.ShowTabletStorageFormatCommand;
@@ -4979,6 +4981,16 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         return new CreateWorkloadGroupCommand(workloadGroupName, ifNotExists, 
properties);
     }
 
+    @Override
+    public LogicalPlan visitShowSyncJob(ShowSyncJobContext ctx) {
+        String databaseName = null;
+        if (ctx.multipartIdentifier() != null) {
+            List<String> databaseParts = 
visitMultipartIdentifier(ctx.multipartIdentifier());
+            databaseName = databaseParts.get(0);
+        }
+        return new ShowSyncJobCommand(databaseName);
+    }
+
     @Override
     public LogicalPlan visitDropFile(DropFileContext ctx) {
         String dbName = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 0cec8c48caf..bfa0163e7d2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -237,6 +237,7 @@ public enum PlanType {
     SHOW_ROLE_COMMAND,
     SHOW_SMALL_FILES_COMMAND,
     SHOW_STORAGE_ENGINES_COMMAND,
+    SHOW_SYNC_JOB_COMMAND,
     SHOW_TABLE_ID_COMMAND,
     SHOW_TRASH_COMMAND,
     SHOW_TABLET_STORAGE_FORMAT_COMMAND,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowSyncJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowSyncJobCommand.java
new file mode 100644
index 00000000000..a673164707f
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowSyncJobCommand.java
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import org.apache.doris.analysis.RedirectStatus;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.util.ListComparator;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.qe.StmtExecutor;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Represents the command for SHOW SYNC JOB.
+ */
+public class ShowSyncJobCommand extends ShowCommand {
+    public static final ImmutableList<String> TITLE_NAMES = new 
ImmutableList.Builder<String>()
+            
.add("JobId").add("JobName").add("Type").add("State").add("Channel").add("Status")
+            
.add("JobConfig").add("CreateTime").add("LastStartTime").add("LastStopTime").add("FinishTime").add("Msg")
+            .build();
+
+    private String databaseName;
+
+    public ShowSyncJobCommand(String databaseName) {
+        super(PlanType.SHOW_SYNC_JOB_COMMAND);
+        this.databaseName = databaseName;
+    }
+
+    private void validate(ConnectContext ctx) throws AnalysisException {
+        if (Strings.isNullOrEmpty(databaseName)) {
+            databaseName = ctx.getDatabase();
+            if (Strings.isNullOrEmpty(databaseName)) {
+                ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
+            }
+        }
+        if (!Env.getCurrentEnv().getAccessManager()
+                .checkDbPriv(ConnectContext.get(), 
InternalCatalog.INTERNAL_CATALOG_NAME,
+                    databaseName, PrivPredicate.SHOW)) {
+            
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED_ERROR,
+                    PrivPredicate.SHOW.getPrivs().toString(), databaseName);
+        }
+    }
+
+    @Override
+    public ShowResultSet doRun(ConnectContext ctx, StmtExecutor executor) 
throws Exception {
+        validate(ctx);
+        Env env = Env.getCurrentEnv();
+        DatabaseIf db = 
Env.getCurrentInternalCatalog().getDbOrAnalysisException(databaseName);
+
+        List<List<Comparable>> syncInfos = 
env.getSyncJobManager().getSyncJobsInfoByDbId(db.getId());
+        Collections.sort(syncInfos, new ListComparator<List<Comparable>>(0));
+
+        List<List<String>> rows = Lists.newArrayList();
+        for (List<Comparable> syncInfo : syncInfos) {
+            List<String> row = new ArrayList<String>(syncInfo.size());
+
+            for (Comparable element : syncInfo) {
+                row.add(element.toString());
+            }
+            rows.add(row);
+        }
+        return new ShowResultSet(this.getMetaData(), rows);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitShowSyncJobCommand(this, context);
+    }
+
+    public ShowResultSetMetaData getMetaData() {
+        ShowResultSetMetaData.Builder builder = 
ShowResultSetMetaData.builder();
+        for (String title : TITLE_NAMES) {
+            builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
+        }
+        return builder.build();
+    }
+
+    @Override
+    public RedirectStatus toRedirectStatus() {
+        return RedirectStatus.FORWARD_NO_SYNC;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
index f988055f0f6..6efa58c1b8e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java
@@ -123,6 +123,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.ShowRolesCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowSmallFilesCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowSqlBlockRuleCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowStorageEnginesCommand;
+import org.apache.doris.nereids.trees.plans.commands.ShowSyncJobCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowTableCreationCommand;
 import org.apache.doris.nereids.trees.plans.commands.ShowTableIdCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.ShowTabletStorageFormatCommand;
@@ -299,6 +300,10 @@ public interface CommandVisitor<R, C> {
         return visitCommand(callCommand, context);
     }
 
+    default R visitShowSyncJobCommand(ShowSyncJobCommand showSyncJobCommand, C 
context) {
+        return visitCommand(showSyncJobCommand, context);
+    }
+
     default R visitCreateProcedureCommand(CreateProcedureCommand 
createProcedureCommand, C context) {
         return visitCommand(createProcedureCommand, context);
     }
diff --git 
a/regression-test/suites/nereids_p0/show/test_show_sync_job_command.groovy 
b/regression-test/suites/nereids_p0/show/test_show_sync_job_command.groovy
new file mode 100644
index 00000000000..32b460be490
--- /dev/null
+++ b/regression-test/suites/nereids_p0/show/test_show_sync_job_command.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_show_sync_job_command", "query,sync_job") {
+    try {
+        sql """CREATE DATABASE IF NOT EXISTS test_db_sync_job;"""
+        
+        sql """CREATE TABLE IF NOT EXISTS test_db_sync_job.test_tbl1_sync_job (
+                  id INT,
+                  name STRING
+              ) UNIQUE KEY(id)
+              DISTRIBUTED BY HASH(id) BUCKETS 1
+              PROPERTIES("replication_num" = "1");"""
+
+        // Create the sync job
+        sql """CREATE SYNC test_db_sync_job.job1
+        (
+            FROM mysql_db1.tbl1 INTO test_tbl1_sync_job
+        )
+        FROM BINLOG
+        (
+            "type" = "canal",
+            "canal.server.ip" = "127.0.0.1",
+            "canal.server.port" = "11111",
+            "canal.destination" = "example",
+            "canal.username" = "",
+            "canal.password" = ""
+        );"""
+
+        checkNereidsExecute("SHOW SYNC JOB FROM test_db_sync_job")
+
+    } catch (Exception e) {
+        // Log any exceptions that occur during testing
+        log.error("Failed to execute CREATE SYNC JOB command", e)
+        throw e
+    } finally {
+        // Cleanup
+        try_sql("STOP SYNC JOB IF EXISTS job1;")
+        try_sql("DROP TABLE IF EXISTS test_db_sync_job.test_tbl1_sync_job;")
+        try_sql("DROP DATABASE IF EXISTS test_db_sync_job;")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to