This is an automated email from the ASF dual-hosted git repository.

miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git


The following commit(s) were added to refs/heads/main by this push:
     new d26a7fa  fix(storage-node): fix register storage unit bug
     new e7da324  Merge pull request #376 from Xu-Wentao/storage-node
d26a7fa is described below

commit d26a7fa19c5ca0068989952861b506ade0c67252
Author: xuwentao <[email protected]>
AuthorDate: Tue May 23 16:54:23 2023 +0800

    fix(storage-node): fix register storage unit bug
---
 .../controllers/storage_ndoe_controller_test.go    | 24 ++++++++--------
 .../pkg/controllers/storage_node_controller.go     |  6 ++--
 .../pkg/reconcile/storagenode/aws/rdsinstance.go   |  2 +-
 .../pkg/shardingsphere/mocks/shardingsphere.go     | 16 +++++------
 .../pkg/shardingsphere/shardingsphere.go           | 32 +++++++++++++++-------
 .../pkg/shardingsphere/shardingsphere_test.go      | 27 ++++++++++++++----
 .../test/e2e/storage_node_controller_test.go       | 16 ++++++-----
 7 files changed, 76 insertions(+), 47 deletions(-)

diff --git 
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go 
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 04f01f1..87e2fba 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -245,16 +245,14 @@ var _ = Describe("StorageNode Controller Mock Test", 
func() {
        })
 
        Context("reconcile storage node in Ready status when it's been 
deleted", func() {
-               var (
-                       rdsInstanceAvailable = dbmesh_rds.DescInstance{
-                               DBInstanceIdentifier: 
defaultTestInstanceIdentifier,
-                               DBInstanceStatus:     
dbmesh_rds.DBInstanceStatusAvailable,
-                               Endpoint: dbmesh_rds.Endpoint{
-                                       Address: "127.0.0.1",
-                                       Port:    3306,
-                               },
-                       }
-               )
+               rdsInstanceAvailable := dbmesh_rds.DescInstance{
+                       DBInstanceIdentifier: defaultTestInstanceIdentifier,
+                       DBInstanceStatus:     
dbmesh_rds.DBInstanceStatusAvailable,
+                       Endpoint: dbmesh_rds.Endpoint{
+                               Address: "127.0.0.1",
+                               Port:    3306,
+                       },
+               }
                It("should be successful when instance is in available status", 
func() {
                        deletingStorageNode := "test-deleting-storage-node"
                        req := ctrl.Request{
@@ -478,7 +476,7 @@ var _ = Describe("StorageNode Controller Mock Test", func() 
{
                        // mock shardingsphere create database
                        mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil)
                        // mock shardingsphere register storage unit
-                       mockSS.EXPECT().RegisterStorageUnit(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil)
+                       mockSS.EXPECT().RegisterStorageUnit(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil)
                        // mock shardingsphere close connection
                        mockSS.EXPECT().Close()
 
@@ -640,7 +638,7 @@ var _ = Describe("StorageNode Controller Mock Test", func() 
{
 
                        mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil)
                        mockSS.EXPECT().Close().Return(nil)
-                       mockSS.EXPECT().RegisterStorageUnit(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil)
+                       mockSS.EXPECT().RegisterStorageUnit(gomock.Any(), 
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), 
gomock.Any()).Return(nil)
 
                        Expect(reconciler.registerStorageUnit(ctx, 
sn)).To(BeNil())
                        Expect(sn.Status.Registered).To(BeTrue())
@@ -716,7 +714,7 @@ var _ = Describe("StorageNode Controller Mock Test", func() 
{
                                }
                                Expect(fakeClient.Create(ctx, 
sn)).Should(Succeed())
 
-                               
mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any()).Return(nil)
+                               
mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any(), gomock.Any()).Return(nil)
                                mockSS.EXPECT().Close().Return(nil)
                                Expect(reconciler.unregisterStorageUnit(ctx, 
sn)).To(BeNil())
                        })
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go 
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index b010b8f..96cf79a 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -503,7 +503,7 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx 
context.Context, node *v
        password := 
node.Annotations[dbmeshv1alpha1.AnnotationsMasterUserPassword]
 
        // TODO how to set ds name?
-       if err := ssServer.RegisterStorageUnit("ds_0", host, uint(port), 
dbName, username, password); err != nil {
+       if err := ssServer.RegisterStorageUnit(logicDBName, "ds_0", host, 
uint(port), dbName, username, password); err != nil {
                return fmt.Errorf("register storage node failed: %w", err)
        }
        r.Recorder.Eventf(node, corev1.EventTypeNormal, 
"StorageUnitRegistered", "StorageUnit %s:%d/%s is registered", host, port, 
dbName)
@@ -520,6 +520,8 @@ func (r *StorageNodeReconciler) unregisterStorageUnit(ctx 
context.Context, node
                return err
        }
 
+       logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName]
+
        ssServer, err := r.getShardingsphereServer(ctx, node)
        if err != nil {
                return fmt.Errorf("getShardingsphereServer failed: %w", err)
@@ -528,7 +530,7 @@ func (r *StorageNodeReconciler) unregisterStorageUnit(ctx 
context.Context, node
        defer ssServer.Close()
 
        // TODO how to set ds name?
-       if err := ssServer.UnRegisterStorageUnit("ds_0"); err != nil {
+       if err := ssServer.UnRegisterStorageUnit(logicDBName, "ds_0"); err != 
nil {
                return fmt.Errorf("unregister storage unit failed: %w", err)
        }
 
diff --git 
a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go 
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
index 0907fd9..e54db6c 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
@@ -141,7 +141,7 @@ func (c *RdsClient) CreateInstance(ctx context.Context, 
node *v1alpha1.StorageNo
                SetMasterUserPassword(params["masterUserPassword"]).
                SetAllocatedStorage(int32(storage))
        // set database name if needed.
-       if v, ok := params[node.Annotations[""]]; ok {
+       if v, ok := 
params[node.Annotations[dbmeshv1alpha1.AnnotationsInstanceDBName]]; ok {
                instance.SetDBName(v)
        }
        return instance.Create(ctx)
diff --git a/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go 
b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
index d2eecb4..8d394b9 100644
--- a/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
@@ -79,29 +79,29 @@ func (mr *MockIServerMockRecorder) CreateDatabase(dbName 
interface{}) *gomock.Ca
 }
 
 // RegisterStorageUnit mocks base method.
-func (m *MockIServer) RegisterStorageUnit(dsName, host string, port uint, 
dbName, user, password string) error {
+func (m *MockIServer) RegisterStorageUnit(logicDBName, dsName, dsHost string, 
dsPort uint, dsDBName, dsUser, dsPassword string) error {
        m.ctrl.T.Helper()
-       ret := m.ctrl.Call(m, "RegisterStorageUnit", dsName, host, port, 
dbName, user, password)
+       ret := m.ctrl.Call(m, "RegisterStorageUnit", logicDBName, dsName, 
dsHost, dsPort, dsDBName, dsUser, dsPassword)
        ret0, _ := ret[0].(error)
        return ret0
 }
 
 // RegisterStorageUnit indicates an expected call of RegisterStorageUnit.
-func (mr *MockIServerMockRecorder) RegisterStorageUnit(dsName, host, port, 
dbName, user, password interface{}) *gomock.Call {
+func (mr *MockIServerMockRecorder) RegisterStorageUnit(logicDBName, dsName, 
dsHost, dsPort, dsDBName, dsUser, dsPassword interface{}) *gomock.Call {
        mr.mock.ctrl.T.Helper()
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterStorageUnit", reflect.TypeOf((*MockIServer)(nil).RegisterStorageUnit), 
dsName, host, port, dbName, user, password)
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"RegisterStorageUnit", reflect.TypeOf((*MockIServer)(nil).RegisterStorageUnit), 
logicDBName, dsName, dsHost, dsPort, dsDBName, dsUser, dsPassword)
 }
 
 // UnRegisterStorageUnit mocks base method.
-func (m *MockIServer) UnRegisterStorageUnit(dsName string) error {
+func (m *MockIServer) UnRegisterStorageUnit(logicDBName, dsName string) error {
        m.ctrl.T.Helper()
-       ret := m.ctrl.Call(m, "UnRegisterStorageUnit", dsName)
+       ret := m.ctrl.Call(m, "UnRegisterStorageUnit", logicDBName, dsName)
        ret0, _ := ret[0].(error)
        return ret0
 }
 
 // UnRegisterStorageUnit indicates an expected call of UnRegisterStorageUnit.
-func (mr *MockIServerMockRecorder) UnRegisterStorageUnit(dsName interface{}) 
*gomock.Call {
+func (mr *MockIServerMockRecorder) UnRegisterStorageUnit(logicDBName, dsName 
interface{}) *gomock.Call {
        mr.mock.ctrl.T.Helper()
-       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UnRegisterStorageUnit", 
reflect.TypeOf((*MockIServer)(nil).UnRegisterStorageUnit), dsName)
+       return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, 
"UnRegisterStorageUnit", 
reflect.TypeOf((*MockIServer)(nil).UnRegisterStorageUnit), logicDBName, dsName)
 }
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go 
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
index 9b2a0d9..1bf38f5 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -27,10 +27,12 @@ import (
 const (
        // DistSQLCreateDatabase create database if not exists.
        DistSQLCreateDatabase = `CREATE DATABASE IF NOT EXISTS %s;`
+       // DistSQLUseDatabase use database.
+       DistSQLUseDatabase = `USE %s;`
        // DistSQLRegisterStorageUnit register database to shardingsphere by 
storage unit name and database info.
        DistSQLRegisterStorageUnit = `REGISTER STORAGE UNIT IF NOT EXISTS %s 
(HOST="%s",PORT=%d,DB="%s",USER="%s",PASSWORD="%s");`
        // DistSQLShowRulesUsed show all rules used by storage unit name.
-       DistSQLShowRulesUsed = `SHOW RULES USED STORAGE UNIT %s FROM %s;`
+       DistSQLShowRulesUsed = `SHOW RULES USED STORAGE UNIT %s;`
        // DistSQLUnRegisterStorageUnit unregister database from shardingsphere 
by storage unit name.
        DistSQLUnRegisterStorageUnit = `UNREGISTER STORAGE UNIT %s;`
        // DistSQLDropRule drop rule by rule type and rule name.
@@ -52,8 +54,8 @@ type server struct {
 
 type IServer interface {
        CreateDatabase(dbName string) error
-       RegisterStorageUnit(dsName, host string, port uint, dbName, user, 
password string) error
-       UnRegisterStorageUnit(dsName string) error
+       RegisterStorageUnit(logicDBName, dsName, dsHost string, dsPort uint, 
dsDBName, dsUser, dsPassword string) error
+       UnRegisterStorageUnit(logicDBName, dsName string) error
        Close() error
 }
 
@@ -98,10 +100,15 @@ func (s *server) CreateDatabase(dbName string) error {
        return nil
 }
 
-func (s *server) RegisterStorageUnit(dsName, host string, port uint, dbName, 
user, password string) error {
-       distSQL := fmt.Sprintf(DistSQLRegisterStorageUnit, dsName, host, port, 
dbName, user, password)
+func (s *server) RegisterStorageUnit(logicDBName, dsName, dsHost string, 
dsPort uint, dsDBName, dsUser, dsPassword string) error {
+       _, err := s.db.Exec(fmt.Sprintf(DistSQLUseDatabase, logicDBName))
+       if err != nil {
+               return fmt.Errorf("use database error: %w", err)
+       }
 
-       _, err := s.db.Exec(distSQL)
+       distSQL := fmt.Sprintf(DistSQLRegisterStorageUnit, dsName, dsHost, 
dsPort, dsDBName, dsUser, dsPassword)
+
+       _, err = s.db.Exec(distSQL)
        if err != nil {
                return fmt.Errorf("register database error: %w", err)
        }
@@ -110,9 +117,9 @@ func (s *server) RegisterStorageUnit(dsName, host string, 
port uint, dbName, use
 }
 
 // getRulesUsed returns all rules used by storage unit name.
-func (s *server) getRulesUsed(dsName, dbName string) (rules []*Rule, err 
error) {
+func (s *server) getRulesUsed(dsName string) (rules []*Rule, err error) {
        rules = make([]*Rule, 0)
-       distSQL := fmt.Sprintf(DistSQLShowRulesUsed, dsName, dbName)
+       distSQL := fmt.Sprintf(DistSQLShowRulesUsed, dsName)
 
        rows, err := s.db.Query(distSQL)
        if err != nil {
@@ -134,8 +141,13 @@ func (s *server) getRulesUsed(dsName, dbName string) 
(rules []*Rule, err error)
        return rules, nil
 }
 
-func (s *server) UnRegisterStorageUnit(dsName string) error {
-       rules, err := s.getRulesUsed(dsName, "")
+func (s *server) UnRegisterStorageUnit(logicDBName, dsName string) error {
+       _, err := s.db.Exec(fmt.Sprintf(DistSQLUseDatabase, logicDBName))
+       if err != nil {
+               return fmt.Errorf("use database error: %w", err)
+       }
+
+       rules, err := s.getRulesUsed(dsName)
        if err != nil {
                return fmt.Errorf("get rules used error: %w", err)
        }
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go 
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
index 15aa828..0c7b0ff 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
@@ -66,10 +66,11 @@ var _ = Describe("Test ShardingSphere Server", func() {
        Context("Test register storage unit", func() {
                It("should register success", func() {
                        // mock db and return register storage unit success
+                       
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
 1))
                        dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
 
                        // create server
-                       err = s.RegisterStorageUnit("ds_0", "localhost", 
uint(3307), "sharding_db", "user", "password")
+                       err = s.RegisterStorageUnit("sharding_db", "ds_0", 
"localhost", uint(3307), "sharding_db", "user", "password")
                        Expect(err).ShouldNot(HaveOccurred())
                })
        })
@@ -81,7 +82,7 @@ var _ = Describe("Test ShardingSphere Server", func() {
                        // mock db and return sharding table rule
                        dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED 
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", 
"name"}).AddRow("sharding", "t_order"))
 
-                       result, err := s.(*server).getRulesUsed("ds_0", 
"sharding_db")
+                       result, err := s.(*server).getRulesUsed("ds_0")
                        Expect(err).ShouldNot(HaveOccurred())
                        Expect(result).Should(Equal([]*Rule{{Type: "sharding", 
Name: "t_order"}}))
                })
@@ -100,10 +101,11 @@ var _ = Describe("Test ShardingSphere Server", func() {
 
        Context("Test unregister storage node", func() {
                It("should unregister success", func() {
+                       
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
 1))
                        dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED 
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}))
                        dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
 
-                       err = s.UnRegisterStorageUnit("ds_0")
+                       err = s.UnRegisterStorageUnit("sharding_db", "ds_0")
                        Expect(err).ShouldNot(HaveOccurred())
                })
        })
@@ -116,6 +118,13 @@ var _ = Describe("Test ShardingSphere Server Manually", 
func() {
                port   uint
                user   string
                pass   string
+
+               dsName   string
+               dsHost   string
+               dsPort   uint
+               dsDBName string
+               dsUser   string
+               dsPass   string
        )
 
        Context("Test create database", func() {
@@ -123,13 +132,19 @@ var _ = Describe("Test ShardingSphere Server Manually", 
func() {
                        if driver == "" || host == "" || port == 0 || user == 
"" || pass == "" {
                                Skip("skip test")
                        }
-                       dbName := "test_db"
+                       logicDBName := "test_db"
                        s, err := NewServer(driver, host, port, user, pass)
                        Expect(err).ShouldNot(HaveOccurred())
-                       err = s.CreateDatabase(dbName)
+                       err = s.CreateDatabase(logicDBName)
+                       Expect(err).ShouldNot(HaveOccurred())
+
+                       err = s.RegisterStorageUnit(logicDBName, dsName, 
dsHost, dsPort, dsDBName, dsUser, dsPass)
+                       Expect(err).ShouldNot(HaveOccurred())
+
+                       err = s.UnRegisterStorageUnit(logicDBName, dsName)
                        Expect(err).ShouldNot(HaveOccurred())
 
-                       s.(*server).db.Exec(fmt.Sprintf(`DROP DATABASE %s`, 
dbName))
+                       s.(*server).db.Exec(fmt.Sprintf(`DROP DATABASE %s`, 
logicDBName))
                })
        })
 })
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go 
b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 0c1a681..c50ff64 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -20,27 +20,27 @@ package e2e
 import (
        "context"
        "database/sql"
-       "github.com/DATA-DOG/go-sqlmock"
-       apierrors "k8s.io/apimachinery/pkg/api/errors"
        "reflect"
        "regexp"
        "time"
 
-       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
-       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
-       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
-
        "bou.ke/monkey"
+       "github.com/DATA-DOG/go-sqlmock"
        dbmesh_rds "github.com/database-mesh/golang-sdk/aws/client/rds"
        dbmeshv1alpha1 
"github.com/database-mesh/golang-sdk/kubernetes/api/v1alpha1"
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
+       apierrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "sigs.k8s.io/controller-runtime/pkg/client"
+
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
+       
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
 )
 
 var _ = Describe("StorageNode Controller Suite Test", func() {
-       var databaseClassName = "test-database-class"
+       databaseClassName := "test-database-class"
 
        BeforeEach(func() {
                databaseClass := &dbmeshv1alpha1.DatabaseClass{
@@ -244,6 +244,7 @@ var _ = Describe("StorageNode Controller Suite Test", 
func() {
                        Expect(k8sClient.Create(ctx, node)).Should(Succeed())
 
                        dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF 
NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
+                       
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
 1))
                        dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE 
UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
 
                        Eventually(func() v1alpha1.StorageNodePhaseStatus {
@@ -261,6 +262,7 @@ var _ = Describe("StorageNode Controller Suite Test", 
func() {
                        // delete storage node
                        Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
 
+                       
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
 1))
                        dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED 
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", 
"name"}).AddRow("sharding", "t_order"))
                        dbmock.ExpectExec("DROP SHARDING TABLE 
RULE").WillReturnResult(sqlmock.NewResult(1, 1))
                        dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE 
UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))

Reply via email to