Go goroutine 协程

发布时间: 更新时间: 总字数:1723 阅读时间:4m 作者: IP上海 分享 网址

goroutineGo编程语言中的一个轻量级执行线程(golang中称为协程),是一个与程序其他部分同时执行的函数。

基本概念

什么是并发

程序中并发是指将一个过程按照并行算法拆分为多个可以独立执行的代码块,从而充分利用多核处理器提高系统吞吐率

并发 & 并行 & 顺序

  • 顺序:指程序发起任务上一个完成,才触发下一个开始
  • 并发:同时处理多个任务
  • 并行:多个任务执行单元可以一起执行

goroutine

  • goroutine 是一个由 Go 运行时 管理的轻量级线程,go 中称为 协程
    • 操作系统不管理 goroutinegoroutine 的操作、切换属于 用户态
    • goroutine 开销很小,初始化一个需要 2~4k 的内存(栈空间),非常轻量
  • goroutine 是Golang的并发执行单元,使用 go 关键字后接函数调用来创建一个 goroutine(也叫例程)
package main

import (
	"fmt"
	"runtime"
	"time"
)

func PrintNum(name string) {
	for i := 0; i < 10; i++ {
		fmt.Println(name, ":", i)
		runtime.Gosched() // yields the processor, allowing other goroutines to run
	}
}

func main() {
	go PrintNum("no1")
	go PrintNum("no2")
	// 防止主进程先于 goroutine 结束
	time.Sleep(1 * time.Second)
}

time.Sleep 的原因:

  1. 防止 main 进程是非阻塞的,会比 PrintNum 先结束导致其他线程没有执行
  2. runtime.Gosched() 执行到时会让出 CPU,time.Sleep 也会让出 CPU 占用

go 中 main 函数也是协程来启动的,称为主进程,也叫工作进程。主进程结束工作后,工作协程也会立即销毁。可以使用如下方式来使主进程等待其他协程执行状态:

  • waitGroup
  • channel

waitGroup 示例

  • 命名函数调用示例
package main

import (
	"fmt"
	"runtime"
	"sync"
)

func PrintNum(name string, group *sync.WaitGroup) {
	for i := 0; i < 10; i++ {
		fmt.Println(name, ":", i)
		runtime.Gosched() // yields the processor, allowing other goroutines to run
	}
	group.Done()
}

func main() {
	var group sync.WaitGroup
	// 声明计数器个数
	group.Add(2)

	go PrintNum("no1", &group)
	go PrintNum("no2", &group)

	group.Wait()
}
  • 匿名函数调用示例
package main

import (
	"fmt"
	"runtime"
	"sync"
)

func main() {
	var group sync.WaitGroup
	m := 2
	// 声明计数器个数
	group.Add(m)

	for i := 0; i < m; i++ {
		go func(name string, group *sync.WaitGroup) {
			for i := 0; i < 3; i++ {
				fmt.Println(name, ":", i)
				runtime.Gosched() // yields the processor, allowing other goroutines to run
			}
			group.Done()
		}(fmt.Sprintf("no%d", i), &group)
	}
	group.Wait()
}

/*
Output:
no1 : 0
no0 : 0
no1 : 1
no0 : 1
no1 : 2
no0 : 2
*/

闭包陷阱:

  • 在协程中,若在闭包中引用闭包外的变量,可能会因外包变量变化导致闭包内部使用不准确。此时,可以采用向闭包内传递参数来规避该问题

channel 示例

Go channel 信号量

package main

import (
	"fmt"
	"runtime"
)

func PrintNum(name string, intChan chan int) {
	for i := 0; i < 3; i++ {
		fmt.Println(name, ":", i)
		runtime.Gosched() // yields the processor, allowing other goroutines to run
	}
	intChan <- 1
}

func main() {
	intChan := make(chan int, 1)

	// 单示例
	go PrintNum("no1", intChan)
	go PrintNum("no2", intChan)

	<-intChan
	<-intChan

	// 批量示例
	for i := 0; i < 5; i++ {
		go PrintNum(fmt.Sprintf("no%d", i), intChan)
	}
	for i := 0; i < 5; i++ {
		<-intChan
	}
}

共享数据

多个例程对同一个内存资源进行修改时,并未对资源进行同步限制,会导致修改数据混乱问题

  • 问题示例
package main

import (
	"fmt"
	"sync"
)

func main() {
	// 并发对同一个内存空间 sum 进行修改,多次运行结果值是随机的,期待为 0
	var sum int
	group := &sync.WaitGroup{}

	add := func(group *sync.WaitGroup) {
		defer group.Done()
		for i := 0; i < 100; i++ {
			sum += i
		}
	}
	diff := func(group *sync.WaitGroup) {
		defer group.Done()
		for i := 0; i < 100; i++ {
			sum -= i
		}
	}

	for i := 0; i < 5; i++ {
		group.Add(2)
		go add(group)
		go diff(group)
	}

	group.Wait()
	fmt.Println(sum)
}
  • 解决方法

在对资源操作时,先获取锁,处理完毕后在是否锁。golang 中使用 sync.Mutex 定义互斥锁。对同一个变量要使用同一把锁。

package main

import (
	"fmt"
	"sync"
)

func main() {
	var sum int
	group := &sync.WaitGroup{}
	lock := &sync.Mutex{}

	add := func(group *sync.WaitGroup) {
		defer group.Done()
		for i := 0; i < 100; i++ {
			lock.Lock() // 加锁
			sum += i
			lock.Unlock() // 是否锁
		}
	}
	diff := func(group *sync.WaitGroup) {
		defer group.Done()
		for i := 0; i < 100; i++ {
			lock.Lock() // 加锁
			sum -= i
			lock.Unlock() // 是否锁
		}
	}

	for i := 0; i < 5; i++ {
		group.Add(2)
		go add(group)
		go diff(group)
	}

	group.Wait()
	fmt.Println(sum)
}

也可以使用 atomic

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

func main() {
	var sum int32
	group := &sync.WaitGroup{}

	add := func(group *sync.WaitGroup) {
		defer group.Done()
		for i := 0; i < 100; i++ {
			atomic.AddInt32(&sum, int32(i))
		}
	}
	diff := func(group *sync.WaitGroup) {
		defer group.Done()
		for i := 0; i < 100; i++ {
			atomic.AddInt32(&sum, int32(-i))
		}
	}

	for i := 0; i < 5; i++ {
		group.Add(2)
		go add(group)
		go diff(group)
	}

	group.Wait()
	fmt.Println(sum)
}

GPM 模型

// G - goroutine.
// M - worker thread, or machine.
// P - processor, a resource that is required to execute Go code.
//     M must have an associated P to execute Go code, however it can be
//     blocked or in a syscall w/o an associated P.
  • 结构图

logo
参考

  • go scheduler 的主要功能是对处理器(CPU)上运行的 OS 线程分发可以运行的 goroutine,它由三部分组成:
    • G(goroutine)go func 生成一个 G
      • 限制:初始化一个需要 2~4k 的内存(栈空间),受限于宿主机内存
        • 4k * 1,000,000 = 4,000,000k 理论上 4G 内存可生成 100 万 G
    • P(processor) 处理器,一般 P 的数量为系统 CPU 的核数
      • 限制:P 的数量受环境变量 GOMAXPROCS 控制
      • P本地队列 不超过 256 个,新建 G 是默认放在 本地队列,若本地队列满了,P本地队列 的一半的 G 移动到 全局队列
      • P 检查到 本地队列 为空时,会随机的从其他 P本地队列 中尝试窃取(steal)一半可运行的 G
    • M(machine) 系统线程

说明:

  • Go1.14 实现了基于信号的抢占式调度,通过 runtime.sysmon 实现检测抢占(forcegcnetpollretake 等发放),常见的场景
    • 抢占阻塞在系统调用上的 P
    • 抢占运行时间过长(1ms)的 G
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数