Hi Robert Actually the code above is simplified to make it easy to understand.
Thanks for the suggestion on variable namings... Will improve that. The scenario is like the producer functions(produceTaskOfType1ToChan() and produceTaskOfType2ToChan()) will produce a list of strings to the channel... like... func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan string) { defer wg.Done() autoCancelIds := getAutoCancelIdsFromSource2() for autoCancelId := range autoCancelIds { autoCancelChan <- autoCancelId }} Now does this code makes some sense? On Fri, 27 Dec, 2019, 10:10 AM robert engels, <reng...@ix.netcom.com> wrote: > Yes, the code doesn’t work :) - it will only ever produce 2 items - unless > that was expected - even so, you want the N workers doing work, and > probably a constant number sending to Kafka - but a lot depends on your > “serial needs”. In your case you only have 2 workers producing work, and N > senders - which is backwards to me. > > I would also say that your variable names could be improved - as > “autoCancelChan” isn’t really meaningful here, it is simple a chan used to > send items to the Kafka senders (at least I think). > > On Dec 26, 2019, at 10:12 PM, Amarjeet Anand <amarjeetanandsi...@gmail.com> > wrote: > > Hi > > I have to produce some task to kafka parallely. So I want to implement a > simple worker group pattern in go. > > Does the below code decent enough to take it to production? > > var workerCount = runtime.NumCPU()*7 + 1 > > func WorkerPattern() { > taskWg := &sync.WaitGroup{} > taskWg.Add(2) > > autoCancelChan := make(chan string, workerCount*3) // *3, just to make > enough room. workers will be slower anyways > go produceTaskOfType1ToChan(taskWg, autoCancelChan) > go produceTaskOfType2ToChan(taskWg, autoCancelChan) > > // start workers to push autoCancel to kafka > workerWg := &sync.WaitGroup{} > go kafkaProducerWorkers(autoCancelChan, workerWg) > > // wait to close autoCancelChan channel till all the task is written > taskWg.Wait() > close(autoCancelChan) > > // wait till all workers finish their task > workerWg.Wait() > > fmt.Println("Done!!!")} > > func produceTaskOfType1ToChan(wg *sync.WaitGroup, autoCancelChan chan string) > { > defer wg.Done() > // can produce random number of task on autoCancelChan > autoCancelChan <- "task of type of 1"} > > func produceTaskOfType2ToChan(wg *sync.WaitGroup, autoCancelChan chan string) > { > defer wg.Done() > // can produce random number of task on autoCancelChan > autoCancelChan <- "task of type of 2"} > > func kafkaProducerWorkers(autoCancelChan chan string, workerWg > *sync.WaitGroup) { > workerWg.Add(workerCount) > for i := 0; i < workerCount; i++ { > go produceToKafka(autoCancelChan, workerWg) > }} > > func produceToKafka(autoCancelChan chan string, workerWg *sync.WaitGroup) { > defer workerWg.Done() > > // for loop will terminate once autoCancelChan is closed > for autoCancel := range autoCancelChan { > KafkaClient.PublishToKafkaTopic(autoCancel) > }} > > Any improvement you can suggest to this code? > > > > > -- > You received this message because you are subscribed to the Google Groups > "golang-nuts" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to golang-nuts+unsubscr...@googlegroups.com. > To view this discussion on the web visit > https://groups.google.com/d/msgid/golang-nuts/CANFuhy8qBjooo_tB_gT0f%3DTE4DaOFqWL5SWwNghy%2BL-eV82KdA%40mail.gmail.com > <https://groups.google.com/d/msgid/golang-nuts/CANFuhy8qBjooo_tB_gT0f%3DTE4DaOFqWL5SWwNghy%2BL-eV82KdA%40mail.gmail.com?utm_medium=email&utm_source=footer> > . > > > -- You received this message because you are subscribed to the Google Groups "golang-nuts" group. To unsubscribe from this group and stop receiving emails from it, send an email to golang-nuts+unsubscr...@googlegroups.com. To view this discussion on the web visit https://groups.google.com/d/msgid/golang-nuts/CANFuhy_AOfx9Vpq67tbwUJQhgfCi6Wmnsfu%2BFJ1yS%3DyyBFptkQ%40mail.gmail.com.