This is an automated email from the ASF dual-hosted git repository.
miaoliyao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/shardingsphere-on-cloud.git
The following commit(s) were added to refs/heads/main by this push:
new d26a7fa fix(storage-node): fix register storage unit bug
new e7da324 Merge pull request #376 from Xu-Wentao/storage-node
d26a7fa is described below
commit d26a7fa19c5ca0068989952861b506ade0c67252
Author: xuwentao <[email protected]>
AuthorDate: Tue May 23 16:54:23 2023 +0800
fix(storage-node): fix register storage unit bug
---
.../controllers/storage_ndoe_controller_test.go | 24 ++++++++--------
.../pkg/controllers/storage_node_controller.go | 6 ++--
.../pkg/reconcile/storagenode/aws/rdsinstance.go | 2 +-
.../pkg/shardingsphere/mocks/shardingsphere.go | 16 +++++------
.../pkg/shardingsphere/shardingsphere.go | 32 +++++++++++++++-------
.../pkg/shardingsphere/shardingsphere_test.go | 27 ++++++++++++++----
.../test/e2e/storage_node_controller_test.go | 16 ++++++-----
7 files changed, 76 insertions(+), 47 deletions(-)
diff --git
a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
index 04f01f1..87e2fba 100644
--- a/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
+++ b/shardingsphere-operator/pkg/controllers/storage_ndoe_controller_test.go
@@ -245,16 +245,14 @@ var _ = Describe("StorageNode Controller Mock Test",
func() {
})
Context("reconcile storage node in Ready status when it's been
deleted", func() {
- var (
- rdsInstanceAvailable = dbmesh_rds.DescInstance{
- DBInstanceIdentifier:
defaultTestInstanceIdentifier,
- DBInstanceStatus:
dbmesh_rds.DBInstanceStatusAvailable,
- Endpoint: dbmesh_rds.Endpoint{
- Address: "127.0.0.1",
- Port: 3306,
- },
- }
- )
+ rdsInstanceAvailable := dbmesh_rds.DescInstance{
+ DBInstanceIdentifier: defaultTestInstanceIdentifier,
+ DBInstanceStatus:
dbmesh_rds.DBInstanceStatusAvailable,
+ Endpoint: dbmesh_rds.Endpoint{
+ Address: "127.0.0.1",
+ Port: 3306,
+ },
+ }
It("should be successful when instance is in available status",
func() {
deletingStorageNode := "test-deleting-storage-node"
req := ctrl.Request{
@@ -478,7 +476,7 @@ var _ = Describe("StorageNode Controller Mock Test", func()
{
// mock shardingsphere create database
mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil)
// mock shardingsphere register storage unit
- mockSS.EXPECT().RegisterStorageUnit(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil)
+ mockSS.EXPECT().RegisterStorageUnit(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil)
// mock shardingsphere close connection
mockSS.EXPECT().Close()
@@ -640,7 +638,7 @@ var _ = Describe("StorageNode Controller Mock Test", func()
{
mockSS.EXPECT().CreateDatabase(gomock.Any()).Return(nil)
mockSS.EXPECT().Close().Return(nil)
- mockSS.EXPECT().RegisterStorageUnit(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil)
+ mockSS.EXPECT().RegisterStorageUnit(gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(nil)
Expect(reconciler.registerStorageUnit(ctx,
sn)).To(BeNil())
Expect(sn.Status.Registered).To(BeTrue())
@@ -716,7 +714,7 @@ var _ = Describe("StorageNode Controller Mock Test", func()
{
}
Expect(fakeClient.Create(ctx,
sn)).Should(Succeed())
-
mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any()).Return(nil)
+
mockSS.EXPECT().UnRegisterStorageUnit(gomock.Any(), gomock.Any()).Return(nil)
mockSS.EXPECT().Close().Return(nil)
Expect(reconciler.unregisterStorageUnit(ctx,
sn)).To(BeNil())
})
diff --git a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
index b010b8f..96cf79a 100644
--- a/shardingsphere-operator/pkg/controllers/storage_node_controller.go
+++ b/shardingsphere-operator/pkg/controllers/storage_node_controller.go
@@ -503,7 +503,7 @@ func (r *StorageNodeReconciler) registerStorageUnit(ctx
context.Context, node *v
password :=
node.Annotations[dbmeshv1alpha1.AnnotationsMasterUserPassword]
// TODO how to set ds name?
- if err := ssServer.RegisterStorageUnit("ds_0", host, uint(port),
dbName, username, password); err != nil {
+ if err := ssServer.RegisterStorageUnit(logicDBName, "ds_0", host,
uint(port), dbName, username, password); err != nil {
return fmt.Errorf("register storage node failed: %w", err)
}
r.Recorder.Eventf(node, corev1.EventTypeNormal,
"StorageUnitRegistered", "StorageUnit %s:%d/%s is registered", host, port,
dbName)
@@ -520,6 +520,8 @@ func (r *StorageNodeReconciler) unregisterStorageUnit(ctx
context.Context, node
return err
}
+ logicDBName := node.Annotations[AnnotationKeyLogicDatabaseName]
+
ssServer, err := r.getShardingsphereServer(ctx, node)
if err != nil {
return fmt.Errorf("getShardingsphereServer failed: %w", err)
@@ -528,7 +530,7 @@ func (r *StorageNodeReconciler) unregisterStorageUnit(ctx
context.Context, node
defer ssServer.Close()
// TODO how to set ds name?
- if err := ssServer.UnRegisterStorageUnit("ds_0"); err != nil {
+ if err := ssServer.UnRegisterStorageUnit(logicDBName, "ds_0"); err !=
nil {
return fmt.Errorf("unregister storage unit failed: %w", err)
}
diff --git
a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
index 0907fd9..e54db6c 100644
--- a/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
+++ b/shardingsphere-operator/pkg/reconcile/storagenode/aws/rdsinstance.go
@@ -141,7 +141,7 @@ func (c *RdsClient) CreateInstance(ctx context.Context,
node *v1alpha1.StorageNo
SetMasterUserPassword(params["masterUserPassword"]).
SetAllocatedStorage(int32(storage))
// set database name if needed.
- if v, ok := params[node.Annotations[""]]; ok {
+ if v, ok :=
params[node.Annotations[dbmeshv1alpha1.AnnotationsInstanceDBName]]; ok {
instance.SetDBName(v)
}
return instance.Create(ctx)
diff --git a/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
index d2eecb4..8d394b9 100644
--- a/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/mocks/shardingsphere.go
@@ -79,29 +79,29 @@ func (mr *MockIServerMockRecorder) CreateDatabase(dbName
interface{}) *gomock.Ca
}
// RegisterStorageUnit mocks base method.
-func (m *MockIServer) RegisterStorageUnit(dsName, host string, port uint,
dbName, user, password string) error {
+func (m *MockIServer) RegisterStorageUnit(logicDBName, dsName, dsHost string,
dsPort uint, dsDBName, dsUser, dsPassword string) error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "RegisterStorageUnit", dsName, host, port,
dbName, user, password)
+ ret := m.ctrl.Call(m, "RegisterStorageUnit", logicDBName, dsName,
dsHost, dsPort, dsDBName, dsUser, dsPassword)
ret0, _ := ret[0].(error)
return ret0
}
// RegisterStorageUnit indicates an expected call of RegisterStorageUnit.
-func (mr *MockIServerMockRecorder) RegisterStorageUnit(dsName, host, port,
dbName, user, password interface{}) *gomock.Call {
+func (mr *MockIServerMockRecorder) RegisterStorageUnit(logicDBName, dsName,
dsHost, dsPort, dsDBName, dsUser, dsPassword interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"RegisterStorageUnit", reflect.TypeOf((*MockIServer)(nil).RegisterStorageUnit),
dsName, host, port, dbName, user, password)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"RegisterStorageUnit", reflect.TypeOf((*MockIServer)(nil).RegisterStorageUnit),
logicDBName, dsName, dsHost, dsPort, dsDBName, dsUser, dsPassword)
}
// UnRegisterStorageUnit mocks base method.
-func (m *MockIServer) UnRegisterStorageUnit(dsName string) error {
+func (m *MockIServer) UnRegisterStorageUnit(logicDBName, dsName string) error {
m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "UnRegisterStorageUnit", dsName)
+ ret := m.ctrl.Call(m, "UnRegisterStorageUnit", logicDBName, dsName)
ret0, _ := ret[0].(error)
return ret0
}
// UnRegisterStorageUnit indicates an expected call of UnRegisterStorageUnit.
-func (mr *MockIServerMockRecorder) UnRegisterStorageUnit(dsName interface{})
*gomock.Call {
+func (mr *MockIServerMockRecorder) UnRegisterStorageUnit(logicDBName, dsName
interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UnRegisterStorageUnit",
reflect.TypeOf((*MockIServer)(nil).UnRegisterStorageUnit), dsName)
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock,
"UnRegisterStorageUnit",
reflect.TypeOf((*MockIServer)(nil).UnRegisterStorageUnit), logicDBName, dsName)
}
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
index 9b2a0d9..1bf38f5 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere.go
@@ -27,10 +27,12 @@ import (
const (
// DistSQLCreateDatabase create database if not exists.
DistSQLCreateDatabase = `CREATE DATABASE IF NOT EXISTS %s;`
+ // DistSQLUseDatabase use database.
+ DistSQLUseDatabase = `USE %s;`
// DistSQLRegisterStorageUnit register database to shardingsphere by
storage unit name and database info.
DistSQLRegisterStorageUnit = `REGISTER STORAGE UNIT IF NOT EXISTS %s
(HOST="%s",PORT=%d,DB="%s",USER="%s",PASSWORD="%s");`
// DistSQLShowRulesUsed show all rules used by storage unit name.
- DistSQLShowRulesUsed = `SHOW RULES USED STORAGE UNIT %s FROM %s;`
+ DistSQLShowRulesUsed = `SHOW RULES USED STORAGE UNIT %s;`
// DistSQLUnRegisterStorageUnit unregister database from shardingsphere
by storage unit name.
DistSQLUnRegisterStorageUnit = `UNREGISTER STORAGE UNIT %s;`
// DistSQLDropRule drop rule by rule type and rule name.
@@ -52,8 +54,8 @@ type server struct {
type IServer interface {
CreateDatabase(dbName string) error
- RegisterStorageUnit(dsName, host string, port uint, dbName, user,
password string) error
- UnRegisterStorageUnit(dsName string) error
+ RegisterStorageUnit(logicDBName, dsName, dsHost string, dsPort uint,
dsDBName, dsUser, dsPassword string) error
+ UnRegisterStorageUnit(logicDBName, dsName string) error
Close() error
}
@@ -98,10 +100,15 @@ func (s *server) CreateDatabase(dbName string) error {
return nil
}
-func (s *server) RegisterStorageUnit(dsName, host string, port uint, dbName,
user, password string) error {
- distSQL := fmt.Sprintf(DistSQLRegisterStorageUnit, dsName, host, port,
dbName, user, password)
+func (s *server) RegisterStorageUnit(logicDBName, dsName, dsHost string,
dsPort uint, dsDBName, dsUser, dsPassword string) error {
+ _, err := s.db.Exec(fmt.Sprintf(DistSQLUseDatabase, logicDBName))
+ if err != nil {
+ return fmt.Errorf("use database error: %w", err)
+ }
- _, err := s.db.Exec(distSQL)
+ distSQL := fmt.Sprintf(DistSQLRegisterStorageUnit, dsName, dsHost,
dsPort, dsDBName, dsUser, dsPassword)
+
+ _, err = s.db.Exec(distSQL)
if err != nil {
return fmt.Errorf("register database error: %w", err)
}
@@ -110,9 +117,9 @@ func (s *server) RegisterStorageUnit(dsName, host string,
port uint, dbName, use
}
// getRulesUsed returns all rules used by storage unit name.
-func (s *server) getRulesUsed(dsName, dbName string) (rules []*Rule, err
error) {
+func (s *server) getRulesUsed(dsName string) (rules []*Rule, err error) {
rules = make([]*Rule, 0)
- distSQL := fmt.Sprintf(DistSQLShowRulesUsed, dsName, dbName)
+ distSQL := fmt.Sprintf(DistSQLShowRulesUsed, dsName)
rows, err := s.db.Query(distSQL)
if err != nil {
@@ -134,8 +141,13 @@ func (s *server) getRulesUsed(dsName, dbName string)
(rules []*Rule, err error)
return rules, nil
}
-func (s *server) UnRegisterStorageUnit(dsName string) error {
- rules, err := s.getRulesUsed(dsName, "")
+func (s *server) UnRegisterStorageUnit(logicDBName, dsName string) error {
+ _, err := s.db.Exec(fmt.Sprintf(DistSQLUseDatabase, logicDBName))
+ if err != nil {
+ return fmt.Errorf("use database error: %w", err)
+ }
+
+ rules, err := s.getRulesUsed(dsName)
if err != nil {
return fmt.Errorf("get rules used error: %w", err)
}
diff --git a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
index 15aa828..0c7b0ff 100644
--- a/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
+++ b/shardingsphere-operator/pkg/shardingsphere/shardingsphere_test.go
@@ -66,10 +66,11 @@ var _ = Describe("Test ShardingSphere Server", func() {
Context("Test register storage unit", func() {
It("should register success", func() {
// mock db and return register storage unit success
+
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
1))
dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
// create server
- err = s.RegisterStorageUnit("ds_0", "localhost",
uint(3307), "sharding_db", "user", "password")
+ err = s.RegisterStorageUnit("sharding_db", "ds_0",
"localhost", uint(3307), "sharding_db", "user", "password")
Expect(err).ShouldNot(HaveOccurred())
})
})
@@ -81,7 +82,7 @@ var _ = Describe("Test ShardingSphere Server", func() {
// mock db and return sharding table rule
dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type",
"name"}).AddRow("sharding", "t_order"))
- result, err := s.(*server).getRulesUsed("ds_0",
"sharding_db")
+ result, err := s.(*server).getRulesUsed("ds_0")
Expect(err).ShouldNot(HaveOccurred())
Expect(result).Should(Equal([]*Rule{{Type: "sharding",
Name: "t_order"}}))
})
@@ -100,10 +101,11 @@ var _ = Describe("Test ShardingSphere Server", func() {
Context("Test unregister storage node", func() {
It("should unregister success", func() {
+
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
1))
dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type", "name"}))
dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(1, 1))
- err = s.UnRegisterStorageUnit("ds_0")
+ err = s.UnRegisterStorageUnit("sharding_db", "ds_0")
Expect(err).ShouldNot(HaveOccurred())
})
})
@@ -116,6 +118,13 @@ var _ = Describe("Test ShardingSphere Server Manually",
func() {
port uint
user string
pass string
+
+ dsName string
+ dsHost string
+ dsPort uint
+ dsDBName string
+ dsUser string
+ dsPass string
)
Context("Test create database", func() {
@@ -123,13 +132,19 @@ var _ = Describe("Test ShardingSphere Server Manually",
func() {
if driver == "" || host == "" || port == 0 || user ==
"" || pass == "" {
Skip("skip test")
}
- dbName := "test_db"
+ logicDBName := "test_db"
s, err := NewServer(driver, host, port, user, pass)
Expect(err).ShouldNot(HaveOccurred())
- err = s.CreateDatabase(dbName)
+ err = s.CreateDatabase(logicDBName)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = s.RegisterStorageUnit(logicDBName, dsName,
dsHost, dsPort, dsDBName, dsUser, dsPass)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = s.UnRegisterStorageUnit(logicDBName, dsName)
Expect(err).ShouldNot(HaveOccurred())
- s.(*server).db.Exec(fmt.Sprintf(`DROP DATABASE %s`,
dbName))
+ s.(*server).db.Exec(fmt.Sprintf(`DROP DATABASE %s`,
logicDBName))
})
})
})
diff --git a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
index 0c1a681..c50ff64 100644
--- a/shardingsphere-operator/test/e2e/storage_node_controller_test.go
+++ b/shardingsphere-operator/test/e2e/storage_node_controller_test.go
@@ -20,27 +20,27 @@ package e2e
import (
"context"
"database/sql"
- "github.com/DATA-DOG/go-sqlmock"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
"reflect"
"regexp"
"time"
-
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
-
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
-
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
-
"bou.ke/monkey"
+ "github.com/DATA-DOG/go-sqlmock"
dbmesh_rds "github.com/database-mesh/golang-sdk/aws/client/rds"
dbmeshv1alpha1
"github.com/database-mesh/golang-sdk/kubernetes/api/v1alpha1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
+
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/api/v1alpha1"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/controllers"
+
"github.com/apache/shardingsphere-on-cloud/shardingsphere-operator/pkg/reconcile/storagenode/aws"
)
var _ = Describe("StorageNode Controller Suite Test", func() {
- var databaseClassName = "test-database-class"
+ databaseClassName := "test-database-class"
BeforeEach(func() {
databaseClass := &dbmeshv1alpha1.DatabaseClass{
@@ -244,6 +244,7 @@ var _ = Describe("StorageNode Controller Suite Test",
func() {
Expect(k8sClient.Create(ctx, node)).Should(Succeed())
dbmock.ExpectExec(regexp.QuoteMeta("CREATE DATABASE IF
NOT EXISTS")).WillReturnResult(sqlmock.NewResult(1, 1))
+
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
1))
dbmock.ExpectExec(regexp.QuoteMeta("REGISTER STORAGE
UNIT IF NOT EXISTS")).WillReturnResult(sqlmock.NewResult(0, 0))
Eventually(func() v1alpha1.StorageNodePhaseStatus {
@@ -261,6 +262,7 @@ var _ = Describe("StorageNode Controller Suite Test",
func() {
// delete storage node
Expect(k8sClient.Delete(ctx, node)).Should(Succeed())
+
dbmock.ExpectExec(regexp.QuoteMeta("USE")).WillReturnResult(sqlmock.NewResult(1,
1))
dbmock.ExpectQuery(regexp.QuoteMeta("SHOW RULES USED
STORAGE UNIT")).WillReturnRows(sqlmock.NewRows([]string{"type",
"name"}).AddRow("sharding", "t_order"))
dbmock.ExpectExec("DROP SHARDING TABLE
RULE").WillReturnResult(sqlmock.NewResult(1, 1))
dbmock.ExpectExec(regexp.QuoteMeta("UNREGISTER STORAGE
UNIT")).WillReturnResult(sqlmock.NewResult(0, 0))