icey129 opened a new pull request, #820:
URL: https://github.com/apache/rocketmq-client-go/pull/820

   ## What is the purpose of the change
   
   Fix DATA RACE when running the executable file generated by `go build -race`
   
   ## Brief changelog
   
   send csListLock *sync.Mutex to computeStatsData for lock csList
   
   ## Verifying this change
   
   ### How to reproduce
   Run the following test with `go test -race` 
   
   ```
   func TestNewStatsManager(t *testing.T) {
        stats := NewStatsManager()
   
        st := time.Now()
        for  {
                stats.increasePullTPS("rocketmq", "default", 1)
                time.Sleep(500*time.Millisecond)
                if time.Now().Sub(st) > 5*time.Minute {
                        break
                }
        }
        stats.ShutDownStat()
   }
   ```
   
   The **DATA RACE** will print on terminal
   ```
   WARNING: DATA RACE
   Read at 0x00c000538028 by goroutine 230:
     container/list.(*List).Len()
         /usr/local/go1.17.3/src/container/list/list.go:66 +0xc7
     github.com/apache/rocketmq-client-go/v2/consumer.computeStatsData()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:143
 +0xd5
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItem).printAtMinutes()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:426
 +0x5e
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).printAtMinutes.func1()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:285
 +0x47
     sync.(*Map).Range()
         /usr/local/go1.17.3/src/sync/map.go:346 +0x205
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).printAtMinutes()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:283
 +0x3b
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func4()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:224
 +0x112
     github.com/apache/rocketmq-client-go/v2/primitive.WithRecover()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/primitive/base.go:100
 +0x52
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init·dwrap·21()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:215
 +0x39
   
   Previous write at 0x00c000538028 by goroutine 227:
     container/list.(*List).insert()
         /usr/local/go1.17.3/src/container/list/list.go:98 +0x518
     container/list.(*List).insertValue()
         /usr/local/go1.17.3/src/container/list/list.go:104 +0x1bf
     container/list.(*List).PushBack()
         /usr/local/go1.17.3/src/container/list/list.go:155 +0x27e
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItem).samplingInSeconds()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:389
 +0x145
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).samplingInSeconds.func1()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:261
 +0x47
     sync.(*Map).Range()
         /usr/local/go1.17.3/src/sync/map.go:346 +0x205
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).samplingInSeconds()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:259
 +0x3b
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func1()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:184
 +0xc6
     github.com/apache/rocketmq-client-go/v2/primitive.WithRecover()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/primitive/base.go:100
 +0x52
     
github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init·dwrap·18()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:176
 +0x39
   
   Goroutine 230 (running) created at:
     github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:215
 +0x3d0
     github.com/apache/rocketmq-client-go/v2/consumer.newStatsItemSet()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:171
 +0x51c
     github.com/apache/rocketmq-client-go/v2/consumer.NewStatsManager()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:46
 +0x51d
     github.com/apache/rocketmq-client-go/v2/consumer.TestNewStatsManager()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics_test.go:217
 +0x29
     testing.tRunner()
         /usr/local/go1.17.3/src/testing/testing.go:1259 +0x22f
     testing.(*T).Run·dwrap·21()
         /usr/local/go1.17.3/src/testing/testing.go:1306 +0x47
   
   Goroutine 227 (running) created at:
     github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:176
 +0x12b
     github.com/apache/rocketmq-client-go/v2/consumer.newStatsItemSet()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:171
 +0x51c
     github.com/apache/rocketmq-client-go/v2/consumer.NewStatsManager()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:46
 +0x51d
     github.com/apache/rocketmq-client-go/v2/consumer.TestNewStatsManager()
         
/home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics_test.go:217
 +0x29
     testing.tRunner()
         /usr/local/go1.17.3/src/testing/testing.go:1259 +0x22f
     testing.(*T).Run·dwrap·21()
         /usr/local/go1.17.3/src/testing/testing.go:1306 +0x47
   ```
   
   When run statsItemSet `init()` will start several goroutines to run  
`sis.samplingInSeconds()` and `sis.printAtMinutes()`.
   The `samplingInSeconds()` func will write `csListMinute`
   ```
   func (si *statsItem) samplingInSeconds() {
        si.csListMinuteLock.Lock()
        defer si.csListMinuteLock.Unlock()
        si.csListMinute.PushBack(callSnapshot{
                timestamp: time.Now().Unix() * 1000,
                time:      atomic.LoadInt64(&si.times),
                value:     atomic.LoadInt64(&si.value),
        })
        if si.csListMinute.Len() > 7 {
                si.csListMinute.Remove(si.csListMinute.Front())
        }
   }
   ``` 
   While `printAtMinutes()` func call `computeStatsData()` will read 
`csListMinute`
   ```
   func (si *statsItem) printAtMinutes() {
        ss := computeStatsData(si.csListMinute)
        rlog.Info("Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
                "statsName": si.statsName,
                "statsKey":  si.statsKey,
                "SUM":       ss.sum,
                "TPS":       fmt.Sprintf("%.2f", ss.tps),
                "AVGPT":     ss.avgpt,
        })
   }
   
   var csListLock sync.Mutex
   
   func computeStatsData(csList *list.List) statsSnapshot {
        csListLock.Lock()
        defer csListLock.Unlock()
        tps, avgpt, sum := 0.0, 0.0, int64(0)
        if csList.Len() > 0 {
                first := csList.Front().Value.(callSnapshot)
                last := csList.Back().Value.(callSnapshot)
                sum = last.value - first.value
                tps = float64(sum*1000.0) / 
float64(last.timestamp-first.timestamp)
                timesDiff := last.time - first.time
                if timesDiff > 0 {
                        avgpt = float64(sum*1.0) / float64(timesDiff)
                }
        }
        return statsSnapshot{
                tps:   tps,
                avgpt: avgpt,
                sum:   sum,
        }
   }
   ```
   
   In `samplingInSeconds()` use `csListMinuteLock` while `computeStatsData()` 
use a global Mutex to lock. So read and write in different goroutine will cause 
DATA RACE.
   
   ### How to fix
   Change the `computeStatsData()` func parameters as following will fix the 
DATA RACE
   ```
   func computeStatsData(csListLock *sync.Mutex, csList *list.List) 
statsSnapshot {
        csListLock.Lock()
        defer csListLock.Unlock()
        tps, avgpt, sum := 0.0, 0.0, int64(0)
        if csList.Len() > 0 {
                first := csList.Front().Value.(callSnapshot)
                last := csList.Back().Value.(callSnapshot)
                sum = last.value - first.value
                tps = float64(sum*1000.0) / 
float64(last.timestamp-first.timestamp)
                timesDiff := last.time - first.time
                if timesDiff > 0 {
                        avgpt = float64(sum*1.0) / float64(timesDiff)
                }
        }
        return statsSnapshot{
                tps:   tps,
                avgpt: avgpt,
                sum:   sum,
        }
   }
   ```
   
   the callers change to 
   ```
   func (si *statsItem) printAtMinutes() {
        ss := computeStatsData(&si.csListMinuteLock, si.csListMinute)
        rlog.Info("Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f", 
map[string]interface{}{
                "statsName": si.statsName,
                "statsKey":  si.statsKey,
                "SUM":       ss.sum,
                "TPS":       fmt.Sprintf("%.2f", ss.tps),
                "AVGPT":     ss.avgpt,
        })
   }
   ```
   
   When changed the code, run the previous test won't have DATA RACE
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily. Notice, `it would be helpful if you could finish the following 5 
checklist(the last one is not necessary)before request the community to review 
your PR`.
   
   - [√] Make sure there is a [Github 
issue](https://github.com/apache/rocketmq/issues) filed for the change (usually 
before you start working on it). Trivial changes like typos do not require a 
Github issue. Your pull request should address just this issue, without pulling 
in other changes - one PR resolves one issue. 
   - [√] Format the pull request title like `[ISSUE #123] Fix UnknownException 
when host config not exist`. Each commit in the pull request should have a 
meaningful subject line and body.
   - [√] Write a pull request description that is detailed enough to understand 
what the pull request does, how, and why.
   - [√] Write necessary unit-test(over 80% coverage) to verify your logic 
correction, more mock a little better when a cross-module dependency exists.
   - [ ] If this contribution is large, please file an [Apache Individual 
Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to