本文介绍golang
中channel
常见的使用方式,go
关键字用来开启一个goroutine
(协程)进行任务处理,多个任务之间如果需要通信,需要用到channel
。
介绍
channel
可以传输基本类型的数据如int
、string
,同时也可以传输struct
数据
声明
var intChan chan int
var strChan chan string
package main
import "fmt"
func main() {
var intChan chan int
var strChan chan string
fmt.Printf("%T, %#v\n%T, %#v\n", intChan, intChan, strChan, strChan)
// Output:
//chan int, (chan int)(nil)
//chan string, (chan string)(nil)
}
初始化
初始化 channel 类型
- 无缓冲:
intChan := make(chan int)
- 缓冲类型:
strChan := make(chan string, 3)
// 3为缓冲数 - 只读管道:
var rChan <-chan int = intChan
- 只写管道:
var wChan chan<- int = intChan
package main
import "fmt"
func main() {
intChan := make(chan int)
strChan := make(chan string, 3)
fmt.Printf("%T, %#v\n%T, %#v\n", intChan, intChan, strChan, strChan)
// Output:
//chan int, (chan int)(0xc000026120)
//chan string, (chan string)(0xc000026180)
}
字面量
说明:
- 无缓存的 chan 的赋值,必须放到
goroutine
中,否则报错:fatal error: all goroutines are asleep - deadlock!
- 待缓冲的 chan 可以在
main
函数中赋值 - 取 chan 变量中的值时,若 chan 中无值时,会阻塞直到有值
value, ok := <-intChan
value := <-intChan
- Chan 可以关闭,关闭后只能读,不能写,否则报错:
panic: send on closed channel
package main
import (
"fmt"
"time"
)
func main() {
intChan := make(chan int)
strChan := make(chan string, 3)
fmt.Println(len(intChan))
fmt.Println(len(strChan))
// 赋值,必须放到 goroutine 中赋值
go func(intChan chan int, strChan chan string) {
intChan <- 8
time.Sleep(2 * time.Second)
}(intChan, strChan)
// 取值
fmt.Println(len(intChan))
fmt.Println(<-intChan)
strChan <- "hello chan"
fmt.Println(len(strChan))
strChan <- "hello chan"
// 关闭Chan,关闭后只能读,不能写
close(strChan)
//strChan <- "hello chan" // panic: send on closed channel
fmt.Println(<-strChan)
v, ok := <-strChan
fmt.Printf("%T %#v, %T %#v\n", v, v, ok, ok)
fmt.Println(len(strChan))
// 遍历管道,只有 close(strChan) 时,才能使用 for range,否则会发生死锁
for s := range strChan {
fmt.Println(s)
}
// Output:
//0
//0
//0
//8
//1
//hello chan
//string "hello chan", bool true
//0
}
示例
无缓存 Chan
package main
import (
"fmt"
"testing"
"time"
)
func TestChannel(t *testing.T) {
intChan := make(chan int)
go func() {
fmt.Println("begin run goroutine...")
time.Sleep(3 * time.Second)
intChan <- 1
fmt.Println("finish run goroutine.")
}()
value, ok := <-intChan
if !ok {
fmt.Println("get intChan err!", ok)
}
fmt.Println("value:", value)
// Output:
//begin run goroutine...
//value: 1
//finish run goroutine.
}
有缓存 Chan
package main
import (
"fmt"
"testing"
)
func TestCacheChannel(t *testing.T) {
intChan := make(chan int, 3)
go func() {
fmt.Println("begin run goroutine...")
for i := 0; i < 20; i++ {
fmt.Println("set value:", i)
intChan <- i
}
fmt.Println("finish run goroutine.")
}()
for i := 0; i < 20; i++ {
value := <-intChan
fmt.Println("get value:", value)
}
}
结构体 Chan
package main
import (
"fmt"
"testing"
)
type User struct {
Name string
Age int
Class Class
}
type Class struct {
Name string
}
// use channel translate struct is value translate
func TestTranslateStructChannel(t *testing.T) {
userChan := make(chan User, 1)
go func() {
user := User{Name: "xianbin", Age: 18, Class: Class{Name: "c1"}}
fmt.Printf("src user: %+v\n", user)
userChan <- user
user.Age = 20
fmt.Printf("src user: %+v\n", user)
}()
newUser := <-userChan
fmt.Printf("src user: %+v\n", newUser)
// Output:
//src user: {Name:xianbin Age:18 Class:{Name:c1}}
//src user: {Name:xianbin Age:20 Class:{Name:c1}}
//src user: {Name:xianbin Age:18 Class:{Name:c1}}
}
Close Chan
package main
import (
"fmt"
"testing"
"time"
)
func TestCloseChannel(t *testing.T) {
intChan := make(chan int, 5)
signCh := make(chan int, 2)
go func() {
for i := 0; i < 5; i++ {
intChan <- i
fmt.Println("set intCh value:", i)
time.Sleep(time.Second)
}
close(intChan)
fmt.Println("the intCh channel is closed")
signCh <- 0
}()
go func() {
for {
i, ok := <-intChan
fmt.Printf("get int channel %d, %v \n", i, ok)
if !ok {
fmt.Println("monitor intCh channel is closed")
break
}
// 防止耗尽资源
time.Sleep(time.Second * 4)
}
signCh <- 1
}()
<-signCh
<-signCh
fmt.Println("end")
// Output:
//set intCh value: 0
//get int channel 0, true
//set intCh value: 1
//set intCh value: 2
//set intCh value: 3
//get int channel 1, true
//set intCh value: 4
//the intCh channel is closed
//get int channel 2, true
//get int channel 3, true
//get int channel 4, true
//get int channel 0, false
//monitor intCh channel is closed
//end
}
Signal Chan
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
)
func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGKILL, syscall.SIGINT)
reload := make(chan os.Signal, 1)
signal.Notify(reload, syscall.SIGHUP)
go func() {
for {
<-reload
fmt.Println("reload")
}
}()
go func() {
for {
<-stop
fmt.Println("stop")
os.Exit(-1)
}
}()
<-stop
}
// kill -HUP PID
// kill -KILL PID
Sequnse Chan
package main
import (
"fmt"
"testing"
"time"
)
func TestSequnseChannel(t *testing.T) {
intChan := make(chan int)
go func() {
time.Sleep(time.Second)
v := <-intChan
fmt.Println(v)
}()
intChan <- 1
fmt.Println(2)
}
Merge Chan
package main
import (
"fmt"
"testing"
"time"
)
func TestMergeChannel(t *testing.T) {
input1 := make(chan int)
input2 := make(chan int)
output := make(chan int)
go func(in1, in2 <-chan int, out chan<- int) {
for {
select {
case v := <-in1:
out <- v
case v := <-in2:
out <- v
}
}
}(input1, input2, output)
go func() {
for i := 0; i < 10; i++ {
input1 <- i
fmt.Println("set value:", i)
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
for i := 20; i < 30; i++ {
input2 <- i
fmt.Println("set value:", i)
time.Sleep(time.Millisecond * 100)
}
}()
go func() {
for {
select {
case value := <-output:
fmt.Println("get value:", value)
}
}
}()
time.Sleep(time.Second * 5)
fmt.Println("main process exist.")
}
Quit Chan
package main
import (
"fmt"
"testing"
"time"
)
func TestQuitChannel(t *testing.T) {
intChan := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case v := <-intChan:
fmt.Println("get value:", v)
case <-quit:
fmt.Println("goroutine exit.")
return
}
}
}()
for i := 0; i < 3; i++ {
intChan <- i
}
quit <- true
time.Sleep(time.Second)
fmt.Println("main process exist.")
// Output:
//get value: 0
//get value: 1
//get value: 2
//goroutine exit.
//main process exist.
}
生产/消费 Chan
package main
import (
"fmt"
"testing"
"time"
)
func TestProductCustomerChannel(t *testing.T) {
intChan := make(chan int)
prodChan := make(chan bool)
custChan := make(chan bool)
go func() {
for i := 0; i < 3; i++ {
intChan <- i
fmt.Println("Product set value:", i)
time.Sleep(time.Second)
}
prodChan <- true
}()
go func() {
for {
select {
case v := <-intChan:
fmt.Println("Customer get value:", v)
case <-prodChan:
custChan <- false
return
}
}
}()
<-custChan
fmt.Println("main process exist.")
// Output:
//Product set value: 0
//Customer get value: 0
//Product set value: 1
//Customer get value: 1
//Customer get value: 2
//Product set value: 2
//main process exist.
}
Timeout Chan
package main
import (
"fmt"
"math/rand"
"testing"
"time"
)
func TestTime(t *testing.T) {
fmt.Println(time.Now())
afterChan := time.After(1 * time.Second)
fmt.Println(<-afterChan) // 只能读取一次,否则报如下错误,应用于延迟执行任务
//fmt.Println(<-afterChan) // fatal error: all goroutines are asleep - deadlock!
tickChan := time.Tick(1 * time.Second)
for i := 0; i < 3; i++ {
fmt.Println(<-tickChan) // tick 按照指定的间隔可以获取多次,应用于周期性任务
}
}
func TestTimeoutChannel(t *testing.T) {
intChan := make(chan int)
quitChan := make(chan bool)
go func() {
for {
select {
case v := <-intChan:
fmt.Println("get value:", v)
case <-time.After(time.Second * 3):
quitChan <- true
fmt.Println("goroutine timeout exit")
return
}
}
}()
for i := 0; i < 3; i++ {
intChan <- i
}
<-quitChan
fmt.Println("goroutine timeout, main process exit")
// Output:
//get value: 0
//get value: 1
//get value: 2
//goroutine timeout, main process exit
//goroutine timeout exit
}
func TestTimeoutChannel2(t *testing.T) {
result := make(chan int)
timeout := make(chan bool)
go func() {
rand.Seed(time.Now().Unix())
r := rand.Int() % 5
time.Sleep(time.Duration(r) * time.Second)
result <- r
}()
go func() {
time.Sleep(3 * time.Second)
timeout <- true
}()
select {
case <-timeout:
fmt.Println("timeout")
case v := <-result:
fmt.Println("random", v)
}
}
读写管道
package main
import (
"fmt"
"time"
)
func main() {
var intChan chan int = make(chan int, 2)
// 只读 chan
go func(intChan <-chan int) {
fmt.Println(<-intChan)
}(intChan)
// 只写 chan
go func(intChan chan<- int) {
intChan <- 2
}(intChan)
// 使用 sleep 等待前面的结束
time.Sleep(1 * time.Second)
// 只读 chan
var rChan <-chan int = intChan
// 只写 chan
var wChan chan<- int = intChan
wChan <- 8
fmt.Println(<-rChan)
// Output:
//2
//8
}
package main
import (
"fmt"
"testing"
"time"
)
func TestInAndOutChannel(t *testing.T) {
intChan := make(chan int)
quit := make(chan bool)
go func(inChan chan<- int) {
for i := 0; i < 10; i++ {
inChan <- i
fmt.Println("set value:", i)
time.Sleep(time.Microsecond * 500)
}
quit <- true
}(intChan)
go func(outChan <-chan int) {
for {
select {
case v := <-outChan:
fmt.Println("get value:", v)
case <-quit:
fmt.Println("get inChan quit")
return
}
}
}(intChan)
<-quit
fmt.Println("main process exit")
}
限制数量 chan
package main
import (
"fmt"
"testing"
"time"
)
func TestMaxChannel(t *testing.T) {
maxNum := 3
limitChan := make(chan bool, maxNum)
quit := make(chan bool)
for i := 0; i < 100; i++ {
fmt.Println("start worker:")
limitChan <- true
go func(i int) {
fmt.Println("do worker start: ", i)
time.Sleep(time.Millisecond * 20)
fmt.Println("do worker finish: ", i)
<-limitChan
if i == 99 {
fmt.Println("goroutine quit")
quit <- true
}
}(i)
}
<-quit
fmt.Println("main process exit")
}
信号量 chan
package main
import (
"fmt"
"os"
"os/signal"
"testing"
"time"
)
func TestSignalChannel(t *testing.T) {
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
go func() {
number := 0
for {
number++
fmt.Println("number is", number)
time.Sleep(time.Second * 2)
}
}()
fmt.Println("Press Ctrl + C exit process.")
<-quit
fmt.Println("main process exit")
}
Synchronize Chan
package main
import (
"fmt"
"math/rand"
"testing"
"time"
)
var lockChan = make(chan int, 1)
var gameCurrency = 20
func testSynchronizeUse(currency int) {
lockChan <- 0
if gameCurrency >= currency {
srcGameCurrency := gameCurrency
gameCurrency -= currency
fmt.Printf("old %d, use %d,left %d\n", srcGameCurrency, currency, gameCurrency)
} else {
fmt.Printf("want use %d, just only %d\n", currency, gameCurrency)
}
<-lockChan
}
func testSynchronizeGain(currency int) {
lockChan <- 0
srcGameCurrency := gameCurrency
gameCurrency += currency
fmt.Printf("old %d, get %d,left %d\n", srcGameCurrency, currency, gameCurrency)
<-lockChan
}
func TestSynchronizeChannel(t *testing.T) {
quit := make(chan bool, 2)
go func() {
for i := 0; i < 10; i++ {
currency := rand.Intn(12) + 1
go testSynchronizeUse(currency)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
}
quit <- true
}()
go func() {
for i := 0; i < 10; i++ {
currency := rand.Intn(12) + 1
go testSynchronizeGain(currency)
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
}
quit <- true
}()
<-quit
<-quit
fmt.Println("main process exit")
}