在 Go 语言中,协程(goroutine)是轻量级的执行单元,虽然开销小,但无限制地创建协程仍然会消耗大量系统资源,甚至导致程序崩溃。因此,合理控制协程数量是编写高效 Go 程序的关键。本文将介绍几种常用的协程数量控制方法,并结合具体案例说明其用法。
带缓冲的通道可以作为一个简易的信号量(Semaphore),通过控制通道的容量来限制同时运行的协程数量。
基本原理:
案例代码:
package main
import (
	"fmt"
	"time"
)
func worker(id int, sem chan struct{}) {
	defer func() { <-sem }() // 释放令牌
	fmt.Printf("Worker %d 开始工作\n", id)
	time.Sleep(time.Second) // 模拟工作
	fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
	const maxGoroutines = 3 // 最大协程数量
	sem := make(chan struct{}, maxGoroutines)
	totalTasks := 10 // 总任务数
	for i := 0; i < totalTasks; i++ {
		sem <- struct{}{} // 获取令牌,若满则等待
		go worker(i, sem)
	}
	// 等待所有令牌被释放(所有协程完成)
	for i := 0; i < cap(sem); i++ {
		sem <- struct{}{}
	}
	fmt.Println("所有任务完成")
}sync.WaitGroup 用于等待一组协程完成,结合通道可以更灵活地控制协程数量。
案例代码:
package main
import (
	"fmt"
	"sync"
	"time"
)
func worker(id int, wg *sync.WaitGroup) {
	defer wg.Done()
	fmt.Printf("Worker %d 开始工作\n", id)
	time.Sleep(time.Second)
	fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
	const maxGoroutines = 3
	sem := make(chan struct{}, maxGoroutines)
	var wg sync.WaitGroup
	totalTasks := 10
	for i := 0; i < totalTasks; i++ {
		sem <- struct{}{}
		wg.Add(1)
		go func(id int) {
			defer func() { <-sem }()
			worker(id, &wg)
		}(i)
	}
	wg.Wait() // 等待所有任务完成
	fmt.Println("所有任务完成")
}工作池模式创建固定数量的工作协程,从任务队列中获取任务执行,适用于任务数量多且可批量处理的场景。
案例代码:
package main
import (
	"fmt"
	"sync"
	"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Worker %d 处理任务 %d\n", id, job)
		time.Sleep(time.Second) // 模拟处理时间
		results <- job * 2      // 模拟处理结果
	}
}
func main() {
	const (
		numWorkers = 3    // 工作协程数量
		numJobs    = 10   // 任务数量
	)
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)
	var wg sync.WaitGroup
	// 启动工作协程
	wg.Add(numWorkers)
	for w := 1; w <= numWorkers; w++ {
		go worker(w, jobs, results, &wg)
	}
	// 发送任务
	go func() {
		for j := 1; j <= numJobs; j++ {
			jobs <- j
		}
		close(jobs) // 所有任务发送完毕,关闭通道
	}()
	// 等待所有工作协程完成
	go func() {
		wg.Wait()
		close(results) // 所有结果处理完毕,关闭通道
	}()
	// 收集结果
	for result := range results {
		fmt.Printf("收到结果: %d\n", result)
	}
	fmt.Println("所有任务完成")
}对于复杂场景,可以使用成熟的第三方库,如 golang.org/x/sync/errgroup 或 github.com/panjf2000/ants(高性能协程池)。
使用 errgroup 的案例:
package main
import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
	"time"
)
func worker(id int) error {
	fmt.Printf("Worker %d 开始工作\n", id)
	time.Sleep(time.Second)
	fmt.Printf("Worker %d 完成工作\n", id)
	return nil
}
func main() {
	const maxGoroutines = 3
	g, ctx := errgroup.WithContext(context.Background())
	g.SetLimit(maxGoroutines) // 设置最大并发数
	totalTasks := 10
	for i := 0; i < totalTasks; i++ {
		id := i
		g.Go(func() error {
			select {
			case <-ctx.Done():
				return ctx.Err()
			default:
				return worker(id)
			}
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("发生错误: %v\n", err)
	} else {
		fmt.Println("所有任务完成")
	}
}