我可能会做这样的事情,它让一切都很容易遵循。我定义了一个实现信号量的结构来控制旋转的活动Go例程的数量......并允许我在进入时从通道读取。
package main import ( "fmt" "sync" ) type TestValContainer struct { wg sync.WaitGroup sema chan struct{} data chan int } func doSomething(number int) { fmt.Println(number) } func main() { //semaphore limit 10 routines at time tvc := TestValContainer{ sema: make(chan struct{}, 10), data: make(chan int), } for i := 0; i <= 100; i++ { tvc.wg.Add(1) go func(i int) { tvc.sema <- struct{}{} defer func() { <-tvc.sema tvc.wg.Done() }() tvc.data <- i }(i) } // wait in the background so that waiting and closing the channel dont // block the for loop below go func() { tvc.wg.Wait() close(tvc.data) }() // get channel results for res := range tvc.data { doSomething(res) } }
在您的示例中,您有两个错误:
wg.Done
wg.Add(1)
如果你将逻辑分开,逻辑将更清晰,更容易理解 制片人 一边从 消费者 方面更清楚。为每一方运行一个单独的goroutine。例:
// Producer side (only write and close allowed). go func() { testValContainer <- "string val 1" testValContainer <- "string val 2" testValContainer <- "string val 3" testValContainer <- "string val 4" testValContainer <- "string val 5" testValContainer <- "string val 6" testValContainer <- "string val 7" close(testValContainer) // Signals that production is done. }() // Consumer side (only read allowed). for i:=0; i < 3; i++ { wg.Add(1) go func() { defer wg.Done() v := i fmt.Printf("launching %v", i) for str := range testValContainer { doSomething(str) } fmt.Println(v, "--EXIT --") }() } wg.Wait()
如果这些项目是从其他来源(可能是goroutine的集合)生成的,那么你应该仍然有:1)一个单独的goroutine或逻辑,用于监督生产和调用 close 一旦完成,或2)让你的主线程等待生产方完成(例如用 WaitGroup 等待 制片人 goroutines)并关闭频道 之前 等待消费方面。
close
WaitGroup
如果你考虑一下,无论你如何安排你需要的逻辑 的 一些 强> 在一个单一同步的地方检测到“侧通道”的方式,即不再产生消息。否则你永远不知道什么时候应该关闭频道。
换句话说,您不能等待消费者端的范围循环完成以触发 close ,因为这导致了捕获22。