作者:lomtom 个人网站:lomtom.cn[1] 你的支持就是我最大的动力。
不要通过共享内存来通信,而应通过通信来共享内存。
Go 协程具有简单的模型:它是与其它Go 协程并发运行在同一地址空间的函数。它是轻量级的, 所有消耗几乎就只有栈空间的分配。而且栈最开始是非常小的,所以它们很廉价, 仅在需要时才会随着堆空间的分配(和释放)而变化。
Go 协程在多线程操作系统上可实现多路复用,因此若一个线程阻塞,比如说等待I/O, 那么其它的线程就会运行。
Go 协程的设计隐藏了线程创建和管理的诸多复杂性。
在函数或方法前添加 go 关键字能够在新的Go 协程中调用它。当调用完成后, 该Go 协程也会安静地退出。(效果有点像Unix Shell中的 & 符号,它能让命令在后台运行。)
go myFunc() // 同时运行 myFunc 不需要等待
匿名函数在协程中调用非常方便:
func TestGo(t *testing.T) {
s := "你好吗"
go func() {
fmt.Println(s)
}()
ss := "小道科不好"
fmt.Println(ss)
}
结果输出:
小道科不好
你好吗
在Go中,匿名函数都是闭包:其实现在保证了函数内引用变量的生命周期与函数的活动时间相同。
所以,值得注意的是,如果主函数执行完,而go后面的方法未执行完,程序同样停止。例如,我们在go后面的方法中,让函数睡眠1秒钟,这样主程序运行完就会退出,而不会输出s
func TestGo(t *testing.T) {
s := "你好吗"
go func() {
time.Sleep(1 * time.Second)
fmt.Println(s)
}()
ss := "小道科不好"
fmt.Println(ss)
}
输出:
小道科不好
那么如何避免这样的情况呢?后续会讲到
这些函数没什么实用性,因为它们没有实现完成时的信号处理。因此,我们需要信道。
为什么需要channel?单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。
Go 语言中的通道(channel)是一种特殊的类型。
管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
管道与映射一样,也需要通过 make 来分配内存。
其结果值充当了对底层数据结构的引用。若提供了一个可选的整数形参,它就会为该信道设置缓冲区大小。
默认值是零,表示不带缓冲的或同步的信道。
c := make(chan int) // 整数无缓冲信道
c := make(chan int, 0) // 整数无缓冲信道
c := make(chan *os.File, 100) // 指向文件的指针的缓冲信道
c <- a
num := <- c
var c chan int
var c chan<- int
var c <-chan int
例子:
func TestGo(t *testing.T) {
// 创建一个无缓冲的类型为整型的 channel
c := make(chan int)
// 执行自定义方法;方法结束时,会在信道上发信号
go func() {
// doSomething
for i := 0; i < 5; i++ {
// 发送一个信号
c <- i
}
// 关闭管道
close(c)
}()
// doSomething
// 等待自定义方法执行完成,然后从 channel 取值
for i := range c{
fmt.Println(i)
}
}
接收者在收到数据前会一直阻塞。
在go
中可以使用chan
来实现通信,同样,Go 也提供了WaitGroup
来实现同步。
上面的例子很好说明了没有使用WaitGroup
存在的同步问题
func TestGo(t *testing.T) {
s := "你好,世界"
go func() {
time.Sleep(1 * time.Second)
fmt.Println(s)
}()
ss := "小道科不好"
fmt.Println(ss)
}
这里重新举一个例子,我在主函数里输出十次,在myFunc
里也输出十次,理论情况,都会输出。
为了模拟在myFunc
未执行完,而主程序执行完的情况下,在myFunc
中加入time.Sleep(time.Second*1)
func TestGo1(t *testing.T) {
go myFunc()
for i := 0; i < 10; i++ {
fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次")
}
}
func myFunc() {
for i := 0; i < 10; i++ {
fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次")
time.Sleep(time.Second*1)
}
}
输出如下:
main()测试,这是第0次
main()测试,这是第1次
main()测试,这是第2次
main()测试,这是第3次
main()测试,这是第4次
main()测试,这是第5次
main()测试,这是第6次
main()测试,这是第7次
main()测试,这是第8次
main()测试,这是第9次
test()测试,这是第0次
这显然不是我们所要的结果,那么最简单的就是在主函数的循环中也加入time.Sleep(time.Second*1)
func TestGo1(t *testing.T) {
go myFunc()
for i := 0; i < 10; i++ {
fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次")
time.Sleep(time.Second*1)
}
}
func myFunc() {
for i := 0; i < 10; i++ {
fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次")
time.Sleep(time.Second*1)
}
}
输出如下:
main()测试,这是第0次
test()测试,这是第0次
main()测试,这是第1次
test()测试,这是第1次
main()测试,这是第2次
test()测试,这是第2次
test()测试,这是第3次
main()测试,这是第3次
main()测试,这是第4次
test()测试,这是第4次
test()测试,这是第5次
main()测试,这是第5次
main()测试,这是第6次
test()测试,这是第6次
test()测试,这是第7次
main()测试,这是第7次
main()测试,这是第8次
test()测试,这是第8次
test()测试,这是第9次
main()测试,这是第9次
这虽然达到了我们的预想的效果,但在正式情况下,我们并不会知道代码的执行速度与时间,所以这个一秒,理论可行,实际却很拉垮。
那么就可以使用WaitGroup
来控制。
func TestGo1(t *testing.T) {
var wg sync.WaitGroup
go myFunc(&wg)
for i := 0; i < 10; i++ {
fmt.Println("main()测试,这是第" + strconv.Itoa(i) + "次")
//time.Sleep(time.Second*1)
}
wg.Wait()
}
func myFunc(wg *sync.WaitGroup) {
wg.Add(1)
for i := 0; i < 10; i++ {
fmt.Println("test()测试,这是第" + strconv.Itoa(i) + "次")
time.Sleep(time.Second*1)
}
wg.Done()
}
输出:
main()测试,这是第0次
main()测试,这是第1次
main()测试,这是第2次
main()测试,这是第3次
main()测试,这是第4次
main()测试,这是第5次
test()测试,这是第0次
main()测试,这是第6次
main()测试,这是第7次
main()测试,这是第8次
main()测试,这是第9次
test()测试,这是第1次
test()测试,这是第2次
test()测试,这是第3次
test()测试,这是第4次
test()测试,这是第5次
test()测试,这是第6次
test()测试,这是第7次
test()测试,这是第8次
test()测试,这是第9次
在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、 只关闭一次通道等。
Go语言中的sync包中提供了一个针对只执行一次场景的解决方案:sync.Once。
sync.Once对外提供的操作只有一个Do方法:
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
在使用上,我们只需要将需要执行的方法传入即可。
var db *gorm.DB
var loadDbConf sync.Once
// GetDb 获取连接
func GetDb() *gorm.DB {
loadDbConf.Do(DbInit)
return db
}
// DbInit 数据库连接池初始化
func DbInit() {
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: logger.Info, // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
},
)
conn, err1 := gorm.Open(mysql.Open(mySQLUri()), &gorm.Config{
Logger: newLogger,
})
if err1 != nil {
log.Printf("mysql connect get failed.%v", err1)
return
}
db = conn
log.Printf("mysql init success")
}
type Once struct {
done uint32
m Mutex
}
sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记 录初始化是否完成。
这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。
Go语言中内置的map不是并发安全的。
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func TestGo5(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
上面的代码开启少量几个goroutine的时候可能没什么问题,当并发多了之后执行上面的代码就会报fatal error: concurrent map writes
错误。
k=:0,v:=0
k=:2,v:=2
k=:4,v:=4
k=:5,v:=5
k=:6,v:=6
k=:7,v:=7
k=:1,v:=1
k=:8,v:=8
k=:3,v:=3
k=:11,v:=11
k=:10,v:=10
k=:12,v:=12
fatal error: concurrent map writes
k=:13,v:=13
k=:14,v:=14
goroutine 38 [running]:
runtime.throw(0xe6715c, 0x15)
E:/program/go/src/runtime/panic.go:1117 +0x79 fp=0xc000337ec8 sp=0xc000337e98 pc=0x7ac6f9
runtime.mapassign_faststr(0xd9b800, 0xc00003c2a0, 0xe8046a, 0x2, 0x0)
像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。
开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。同时sync.Map内置了诸如Store、Load、LoadOrStore、Delete、Range等操作方法。
var m1 = sync.Map{}
func TestGo6(t *testing.T) {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
k := strconv.Itoa(n)
m1.Store(k,n)
v, _ := m1.Load(k)
fmt.Printf("k=:%v,v:=%v\n", k, v)
wg.Done()
}(i)
}
wg.Wait()
}
这些设计的另一个应用是在多CPU核心上实现并行计算。
如果计算过程能够被分为几块 可独立执行的过程,它就可以在每块计算结束时向信道发送信号,从而实现并行处理。
在下面这个例子,我需要计算1 到 n的和,一般来说简单的直接一个循环搞定。
func TestGo(t *testing.T){
n := 100000
var s int
for i := 0;i < n;i++{
s += i
}
log.Println(s)
}
但是对于数据量比较大的,这样显然不适合出现在我们的代码中,那么就可以采用并行计算来实现。
func TestGo3(t *testing.T){
t1 := time.Now()
n := 100
num := 10
c := make(chan int,num)
for i := 0;i < num;i++ {
go func() {
start := n / num * i
end := n / num * (i + 1)
var s int
for j := start; j < end;j++ {
s += j
}
c <- s
}()
}
var s int
for i := 0;i < num;i++ {
s += <- c
}
t2 := time.Since(t1)
log.Println(t2)
log.Println(s)
}
当然也可以借用WaitGroup
func TestGo3(t *testing.T){
t1 := time.Now()
n := 100
num := 10
c := make(chan int,num)
var wg sync.WaitGroup
for i := 0;i < num;i++ {
wg.Add(1)
go func() {
start := n / num * i
end := n / num * (i + 1)
var s int
for j := start; j < end;j++ {
s += j
}
c <- s
wg.Done()
}()
}
wg.Wait()
close(c)
var s int
for item := range c{
s += item
}
t2 := time.Since(t1)
log.Println(t2)
log.Println(s)
}
我们在循环中启动了独立的处理块,每个CPU将执行一个处理。
它们有可能以乱序的形式完成并结束,但这没有关系;我们只需在所有Go协程开始后接收,并统计信道中的完成信号即可。
除了直接设置 num 常量值以外,我们还可以向 runtime 询问一个合理的值。
函数 runtime.NumCPU[2] 可以返回硬件 CPU 上的核心数量,如此使用:
var num = runtime.NumCPU()
另外一个需要知道的函数是 runtime.GOMAXPROCS,会返回用户设置可用 CPU 数量。默认情况下使用 runtime.NumCPU的值,但是可以被命令行环境变量,或者调用此函数并传参正整数。传参 0 的话会返回值,假如说我们尊重用户对资源的分配,
就应该这么写:
var numCPU = runtime.GOMAXPROCS(0)
注意不要混淆并发(concurrency)和并行(parallelism)的概念:并发是用可独立执行组件构造程序的方法, 而并行则是为了效率在多 CPU 上平行地进行计算。
尽管 Go 的并发特性能够让某些问题更易构造成并行计算, 但 Go 仍然是种并发而非并行的语言,且 Go 的模型并不适合所有的并行问题。
但是,如果你跑过了前面的代码,就会发现一个问题,在后面的计算其实是不准确的。
1 - 100
的值应该为4950
,而他每次输出的值基本都不会相同,更不可能是4950
.
问题出现在Go的 for 循环中,该循环变量在每次迭代时会被重用,因此 i变量会在所有的Go协程间共享,这不是我们想要的。
我们需要确保 i 对于每个Go协程来说都是唯一的。
这里有几种方法来实现:
func TestGo3(t *testing.T){
t1 := time.Now()
n := 100
num := 10
c := make(chan int,num)
var wg sync.WaitGroup
for i := 0;i < num;i++ {
wg.Add(1)
go myFunc1(n,num,i,c,&wg)
}
wg.Wait()
close(c)
var s int
for item := range c{
s += item
}
t2 := time.Since(t1)
log.Println(t2)
log.Println(s)
}
func myFunc1(n,num,i int,c chan int,wg *sync.WaitGroup) {
start := n / num * i
end := n / num * (i + 1)
var s int
for j := start; j < end;j++ {
s += j
}
c <- s
wg.Done()
}
func TestGo3(t *testing.T){
t1 := time.Now()
n := 100
num := 10
c := make(chan int,num)
var wg sync.WaitGroup
for i := 0;i < num;i++ {
wg.Add(1)
// 以i为参数传入
go func(i int) {
start := n / num * i
end := n / num * (i + 1)
var s int
for j := start; j < end;j++ {
s += j
}
c <- s
wg.Done()
}(i)
}
wg.Wait()
close(c)
var s int
for item := range c{
s += item
}
t2 := time.Since(t1)
log.Println(t2)
log.Println(s)
}
func TestGo3(t *testing.T){
t1 := time.Now()
n := 100
num := 10
c := make(chan int,num)
var wg sync.WaitGroup
for i := 0;i < num;i++ {
wg.Add(1)
// 重新申明
i := i
go func() {
start := n / num * i
end := n / num * (i + 1)
var s int
for j := start; j < end;j++ {
s += j
}
c <- s
wg.Done()
}()
}
wg.Wait()
close(c)
var s int
for item := range c{
s += item
}
t2 := time.Since(t1)
log.Println(t2)
log.Println(s)
}
i:= i
的写法看起来有点奇怪,但在 Go 中这样做是合法且常见的。
你用相同的名字获得了该变量的一个新的版本, 以此来局部地刻意屏蔽循环变量,使它对每个 Go 协程保持唯一。
传统方法与并行计算:
这里不考虑越界情况(即不考虑计算正确性),因为他每次结果还是会计算的。
n值 | 传统 | 并行 |
---|---|---|
10000000 | 3.7146ms | 1.058ms |
100000000 | 39.9925ms | 7.1645ms |
1000000000 | 344.0599ms | 49.9023ms |
10000000000 | 3.4797346s | 501.5713ms |
100000000000 | 34.5406926s | 4.650136s |
结果还是挺明显的。
[1]
lomtom.cn: https://lomtom.cn
[2]
runtime.NumCPU: https://golang.org/pkg/runtime#NumCPU