goroutine
是Go
编程语言中的一个轻量级执行线程(golang中称为协程
),是一个与程序其他部分同时执行的函数。
基本概念
什么是并发
程序中并发是指将一个过程按照并行算法拆分为多个可以独立执行的代码块,从而充分利用多核处理器提高系统吞吐率
并发 & 并行 & 顺序
- 顺序:指程序发起任务上一个完成,才触发下一个开始
- 并发:同时处理多个任务
- 并行:多个任务执行单元可以一起执行
goroutine
goroutine
是一个由 Go 运行时
管理的轻量级线程,go 中称为 协程
- 操作系统不管理
goroutine
,goroutine
的操作、切换属于 用户态
goroutine
开销很小,初始化一个需要 2~4k
的内存(栈空间),非常轻量
goroutine
是Golang的并发执行单元,使用 go
关键字后接函数调用来创建一个 goroutine(也叫例程)
package main
import (
"fmt"
"runtime"
"time"
)
func PrintNum(name string) {
for i := 0; i < 10; i++ {
fmt.Println(name, ":", i)
runtime.Gosched() // yields the processor, allowing other goroutines to run
}
}
func main() {
go PrintNum("no1")
go PrintNum("no2")
// 防止主进程先于 goroutine 结束
time.Sleep(1 * time.Second)
}
加 time.Sleep
的原因:
- 防止 main 进程是非阻塞的,会比
PrintNum
先结束导致其他线程没有执行
runtime.Gosched()
执行到时会让出 CPU,time.Sleep
也会让出 CPU 占用
go 中 main
函数也是协程来启动的,称为主进程,也叫工作进程。主进程结束工作后,工作协程也会立即销毁。可以使用如下方式来使主进程等待其他协程执行状态:
waitGroup 示例
package main
import (
"fmt"
"runtime"
"sync"
)
func PrintNum(name string, group *sync.WaitGroup) {
for i := 0; i < 10; i++ {
fmt.Println(name, ":", i)
runtime.Gosched() // yields the processor, allowing other goroutines to run
}
group.Done()
}
func main() {
var group sync.WaitGroup
// 声明计数器个数
group.Add(2)
go PrintNum("no1", &group)
go PrintNum("no2", &group)
group.Wait()
}
package main
import (
"fmt"
"runtime"
"sync"
)
func main() {
var group sync.WaitGroup
m := 2
// 声明计数器个数
group.Add(m)
for i := 0; i < m; i++ {
go func(name string, group *sync.WaitGroup) {
for i := 0; i < 3; i++ {
fmt.Println(name, ":", i)
runtime.Gosched() // yields the processor, allowing other goroutines to run
}
group.Done()
}(fmt.Sprintf("no%d", i), &group)
}
group.Wait()
}
/*
Output:
no1 : 0
no0 : 0
no1 : 1
no0 : 1
no1 : 2
no0 : 2
*/
闭包陷阱:
- 在协程中,若在闭包中引用闭包外的变量,可能会因外包变量变化导致闭包内部使用不准确。此时,可以采用向闭包内传递参数来规避该问题
channel 示例
Go channel 信号量
package main
import (
"fmt"
"runtime"
)
func PrintNum(name string, intChan chan int) {
for i := 0; i < 3; i++ {
fmt.Println(name, ":", i)
runtime.Gosched() // yields the processor, allowing other goroutines to run
}
intChan <- 1
}
func main() {
intChan := make(chan int, 1)
// 单示例
go PrintNum("no1", intChan)
go PrintNum("no2", intChan)
<-intChan
<-intChan
// 批量示例
for i := 0; i < 5; i++ {
go PrintNum(fmt.Sprintf("no%d", i), intChan)
}
for i := 0; i < 5; i++ {
<-intChan
}
}
共享数据
多个例程对同一个内存资源进行修改时,并未对资源进行同步限制,会导致修改数据混乱问题
package main
import (
"fmt"
"sync"
)
func main() {
// 并发对同一个内存空间 sum 进行修改,多次运行结果值是随机的,期待为 0
var sum int
group := &sync.WaitGroup{}
add := func(group *sync.WaitGroup) {
defer group.Done()
for i := 0; i < 100; i++ {
sum += i
}
}
diff := func(group *sync.WaitGroup) {
defer group.Done()
for i := 0; i < 100; i++ {
sum -= i
}
}
for i := 0; i < 5; i++ {
group.Add(2)
go add(group)
go diff(group)
}
group.Wait()
fmt.Println(sum)
}
在对资源操作时,先获取锁,处理完毕后在是否锁。golang 中使用 sync.Mutex
定义互斥锁。对同一个变量要使用同一把锁。
package main
import (
"fmt"
"sync"
)
func main() {
var sum int
group := &sync.WaitGroup{}
lock := &sync.Mutex{}
add := func(group *sync.WaitGroup) {
defer group.Done()
for i := 0; i < 100; i++ {
lock.Lock() // 加锁
sum += i
lock.Unlock() // 是否锁
}
}
diff := func(group *sync.WaitGroup) {
defer group.Done()
for i := 0; i < 100; i++ {
lock.Lock() // 加锁
sum -= i
lock.Unlock() // 是否锁
}
}
for i := 0; i < 5; i++ {
group.Add(2)
go add(group)
go diff(group)
}
group.Wait()
fmt.Println(sum)
}
也可以使用 atomic
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var sum int32
group := &sync.WaitGroup{}
add := func(group *sync.WaitGroup) {
defer group.Done()
for i := 0; i < 100; i++ {
atomic.AddInt32(&sum, int32(i))
}
}
diff := func(group *sync.WaitGroup) {
defer group.Done()
for i := 0; i < 100; i++ {
atomic.AddInt32(&sum, int32(-i))
}
}
for i := 0; i < 5; i++ {
group.Add(2)
go add(group)
go diff(group)
}
group.Wait()
fmt.Println(sum)
}
GPM 模型
// G - goroutine.
// M - worker thread, or machine.
// P - processor, a resource that is required to execute Go code.
// M must have an associated P to execute Go code, however it can be
// blocked or in a syscall w/o an associated P.
参考
- go scheduler 的主要功能是对处理器(CPU)上运行的 OS 线程分发可以运行的
goroutine
,它由三部分组成:
G(goroutine)
即 go func
生成一个 G
- 限制:初始化一个需要
2~4k
的内存(栈空间),受限于宿主机内存
4k * 1,000,000 = 4,000,000k
理论上 4G
内存可生成 100 万 G
P(processor)
处理器,一般 P
的数量为系统 CPU 的核数
- 限制:
P
的数量受环境变量 GOMAXPROCS
控制
P
的 本地队列
不超过 256 个,新建 G
是默认放在 本地队列
,若本地队列满了,P
的 本地队列
的一半的 G
移动到 全局队列
- 当
P
检查到 本地队列
为空时,会随机的从其他 P
的 本地队列
中尝试窃取(steal)一半可运行的 G
M(machine)
系统线程
说明:
Go1.14
实现了基于信号的抢占式调度,通过 runtime.sysmon
实现检测抢占(forcegc
、netpoll
、retake
等发放),常见的场景
- 抢占阻塞在系统调用上的
P
- 抢占运行时间过长(1ms)的
G