On Wed, Jun 17, 2020 at 4:40 PM envee <neeraj.vaidy...@gmail.com> wrote:
>
> Hi Robert, It is in my first post in this thread. Basically, I want to know 
> why all my logical processors are not being used  in my program. Thanks.

New goroutines are added to the run queue for the P that creates them.
When a P has nothing to do, it will steal goroutines from the run
queue of other P's.  The run queue length doesn't necessarily indicate
about whether P's are running them; it just tells you something about
which P's are creating new goroutines.

Ian


> On Thursday, 18 June 2020 07:24:40 UTC+10, Robert Engels wrote:
>>
>> What is the question?
>>
>> On Jun 17, 2020, at 4:06 PM, envee <neeraj....@gmail.com> wrote:
>>
>> Hi, Is anyone able to help me here ?
>> Here is a (simplified) snippet of the code, in case it helps answering my 
>> query. I basically create a goroutine for every input file (assume max 8) 
>> and then wait for processing of all files to finish. Each goroutine 
>> processes a line within the file and then any records which match a certain 
>> criteria are appended to a slice. After all lines have been processed in a 
>> file, the list is Sent to a channel. Finally, in the Closer goroutine, I 
>> wait for all goroutines to finish and close the channel once all goroutines 
>> have finished :
>>
>> package main
>>
>> import (
>> "bufio"
>> "compress/gzip"
>> "flag"
>> "fmt"
>> "log"
>> "os"
>> "path/filepath"
>> "strings"
>> "sync"
>> "github.com/en-vee/alog"
>> )
>>
>> const (
>> inputFilePrefix = "subscriber_db_"
>> )
>>
>> var (
>> inputDir              string
>> )
>>
>> type QuarantineObject struct {
>> objectType string
>> id         string
>> }
>>
>> func init() {
>> flag.StringVar(&inputDir, "d", "", "Path to the Input folder which is to be 
>> analysed")
>> }
>>
>> func main() {
>>
>> var err error
>> alog.SetLogLevel(alog.TRACE)
>> flag.Parse()
>>
>> // Validation of input parameters
>> if inputDir == "" {
>> fmt.Fprintf(os.Stderr, "No Input Directory Specified\n")
>> flag.Usage()
>> os.Exit(1)
>> }
>>
>> // Is the input directory valid ?
>> if _, err := os.Stat(inputDir); os.IsNotExist(err) {
>> fmt.Fprintf(os.Stderr, "Input Directory %s is Invalid\n", inputDir)
>> flag.Usage()
>> os.Exit(1)
>> }
>>
>> // Determine all subscriber files by matching on the subscriber files prefix
>>
>> inputFileNames, err := filepath.Glob(fmt.Sprintf("%s/%s*.log.gz", inputDir, 
>> inputFilePrefix))
>> if err != nil {
>> fmt.Fprintf(os.Stderr, "Error listing files : %v\n", err)
>> os.Exit(1)
>> }
>>
>> // Loop through all subscriber files
>> // Make a goroutine for processing each file
>> // Create a channel to receive the quarantined objects
>> qObjChannel := make(chan []QuarantineObject, len(inputFileNames))
>>
>> //runtime.GOMAXPROCS(len(inputFileNames))
>> var wg sync.WaitGroup
>> for _, inputFileGz := range inputFileNames {
>> wg.Add(1)
>> go func(inputFileGz string) {
>> nRecords := 0
>>
>> qObjList := make([]QuarantineObject, 0, 0)
>> defer wg.Done()
>> defer func() {
>> alog.Trace("Finished Processing File : %s. Total Records Analysed : %d\n", 
>> inputFileGz, nRecords)
>> }()
>> // Open the file as a GZIP stream
>> alog.Trace("==================================================================================================================================")
>> alog.Trace("Processing Input File : %s", inputFileGz)
>> alog.Trace("==================================================================================================================================")
>>
>> f, err := os.Open(inputFileGz)
>> if err != nil {
>> fmt.Fprintf(os.Stderr, "Error opening file : %v\n", err)
>> return
>> }
>> defer f.Close()
>>
>> fgz, err := gzip.NewReader(f)
>> if err != nil {
>> fmt.Fprintf(os.Stderr, "Error creating GZIP reader : %v\n", err)
>> return
>> }
>> defer fgz.Close()
>>
>> scanner := bufio.NewScanner(fgz)
>>
>> // Iterate over all lines of the file and decode
>>
>> for scanner.Scan() {
>> qObject := decodeLine()
>> if qObject.IsQuarantined() {
>> qObjList = append(qObjList, qObject)
>> }
>> }
>> ///////////////////////////////////////////////////////
>> // After all lines have been processed, Send to Channel
>> ///////////////////////////////////////////////////////
>> qObjChannel <- qObjList
>> }(inputFileGz)
>>
>> }
>>
>> fmt.Println("Waiting for processing of all files to finish")
>> ///////////////////////////////////////////////////////
>> // Closer GoRoutine
>> ///////////////////////////////////////////////////////
>> go func() {
>> wg.Wait()
>> close(qObjChannel)
>> fmt.Println("Quarantined Objects List")
>> fmt.Println("------------------------")
>> }()
>>
>> qFound := false
>>
>> for qObjList := range qObjChannel {
>> for _, qObj := range qObjList {
>> fmt.Println(qObj.id, "--->", qObj.objectType)
>> qFound = true
>> }
>> }
>>
>> }
>>
>>
>>
>> On Monday, 15 June 2020 23:29:06 UTC+10, envee wrote:
>>>
>>> I am running a program which reads multiple gzipped input files and 
>>> performs some processing on each line of the file.
>>> It creates 8 goroutines (1 per input file which is to be processed. the 
>>> number of such files can be thought to remain 8 at the max).
>>> Each of the go routines send to a buffered channel after finishing 
>>> processing of their respective file.
>>> After creating the go routines, the program waits (using WaitGroup) for all 
>>> go routines to finish and also drain the channel for all the values sent by 
>>> the go routines.
>>>
>>> I have an 4 core CPU with 2 threads per core = 8 logical cores.
>>>
>>> But I set GOMAXPROCS=4
>>>
>>> When I run the program with scheduler trace interval set to 1000ms, I can 
>>> see the following :
>>>
>>> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>>> idlethreads=0 runqueue=0 [0 0 0 1]
>>> SCHED 2008ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>>> idlethreads=1 runqueue=0 [1 0 5 0]
>>> SCHED 3015ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>>> idlethreads=1 runqueue=1 [0 0 1 0]
>>> SCHED 4022ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
>>> idlethreads=2 runqueue=0 [0 0 0 0]
>>> SCHED 5029ms: gomaxprocs=4 idleprocs=0 threads=9 spinningthreads=0 
>>> idlethreads=2 runqueue=1 [0 0 0 4]
>>>
>>>
>>> If I create 8 go routines, shouldn't they all be distributed equally among 
>>> the 4 logical cores ?
>>>
>>> Why do some runqueues of the logical cores show values of 4 or 5 and some 
>>> have values of 0 ?
>>>
>>> I was hoping to see something like which I according to my understanding 
>>> means that all 4 processors have 1 go routine each waiting in the local 
>>> runqueue and at the same time has 1 go routine running on the assigned OS 
>>> Thread :
>>>
>>> SCHED 1001ms: gomaxprocs=4 idleprocs=0 threads=8 spinningthreads=0 
>>> idlethreads=0 runqueue=0 [1 1 1 1]
>>>
>>> Thanks.
>>
>> --
>> You received this message because you are subscribed to the Google Groups 
>> "golang-nuts" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to golan...@googlegroups.com.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/golang-nuts/36efa087-d66c-4d7e-b5b2-de1d4d3ea339o%40googlegroups.com.
>
> --
> You received this message because you are subscribed to the Google Groups 
> "golang-nuts" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to golang-nuts+unsubscr...@googlegroups.com.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/golang-nuts/e297ab2e-898a-4f95-b923-e34859ebcbeao%40googlegroups.com.

-- 
You received this message because you are subscribed to the Google Groups 
"golang-nuts" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to golang-nuts+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/golang-nuts/CAOyqgcUOeUVnd8TjbOS%2BjTrfFeVvKzvZXF_EHL%3DfCuFfAvk0eg%40mail.gmail.com.

Reply via email to