Go channel 信号量

发布时间: 更新时间: 总字数:2107 阅读时间:5m 作者: IP上海 分享 网址

本文介绍golangchannel常见的使用方式,go关键字用来开启一个goroutine(协程)进行任务处理,多个任务之间如果需要通信,需要用到channel

介绍

channel可以传输基本类型的数据如intstring,同时也可以传输struct数据

声明

var intChan chan int
var strChan chan string
  • 默认值为 nil
package main

import "fmt"

func main() {
	var intChan chan int
	var strChan chan string

	fmt.Printf("%T, %#v\n%T, %#v\n", intChan, intChan, strChan, strChan)
	// Output:
	//chan int, (chan int)(nil)
	//chan string, (chan string)(nil)
}

初始化

初始化 channel 类型

  • 无缓冲:intChan := make(chan int)
  • 缓冲类型:strChan := make(chan string, 3) // 3为缓冲数
  • 只读管道:var rChan <-chan int = intChan
  • 只写管道:var wChan chan<- int = intChan
package main

import "fmt"

func main() {
	intChan := make(chan int)
	strChan := make(chan string, 3)

	fmt.Printf("%T, %#v\n%T, %#v\n", intChan, intChan, strChan, strChan)
	// Output:
	//chan int, (chan int)(0xc000026120)
	//chan string, (chan string)(0xc000026180)
}

字面量

说明:

  • 无缓存的 chan 的赋值,必须放到 goroutine 中,否则报错:fatal error: all goroutines are asleep - deadlock!
  • 待缓冲的 chan 可以在 main 函数中赋值
  • 取 chan 变量中的值时,若 chan 中无值时,会阻塞直到有值
    • value, ok := <-intChan
    • value := <-intChan
  • Chan 可以关闭,关闭后只能读,不能写,否则报错:panic: send on closed channel
package main

import (
	"fmt"
	"time"
)

func main() {
	intChan := make(chan int)
	strChan := make(chan string, 3)

	fmt.Println(len(intChan))
	fmt.Println(len(strChan))

	// 赋值,必须放到 goroutine 中赋值
	go func(intChan chan int, strChan chan string) {
		intChan <- 8
		time.Sleep(2 * time.Second)
	}(intChan, strChan)

	// 取值
	fmt.Println(len(intChan))
	fmt.Println(<-intChan)

	strChan <- "hello chan"
	fmt.Println(len(strChan))
	strChan <- "hello chan"
	// 关闭Chan,关闭后只能读,不能写
	close(strChan)
	//strChan <- "hello chan" // panic: send on closed channel

	fmt.Println(<-strChan)
	v, ok := <-strChan
	fmt.Printf("%T %#v, %T %#v\n", v, v, ok, ok)
	fmt.Println(len(strChan))

	// 遍历管道,只有 close(strChan) 时,才能使用 for range,否则会发生死锁
	for s := range strChan {
		fmt.Println(s)
	}

	// Output:
	//0
	//0
	//0
	//8
	//1
	//hello chan
	//string "hello chan", bool true
	//0
}

示例

无缓存 Chan

package main

import (
	"fmt"
	"testing"
	"time"
)

func TestChannel(t *testing.T) {
	intChan := make(chan int)

	go func() {
		fmt.Println("begin run goroutine...")
		time.Sleep(3 * time.Second)
		intChan <- 1
		fmt.Println("finish run goroutine.")
	}()

	value, ok := <-intChan
	if !ok {
		fmt.Println("get intChan err!", ok)
	}
	fmt.Println("value:", value)

	// Output:
	//begin run goroutine...
	//value: 1
	//finish run goroutine.
}

有缓存 Chan

package main

import (
	"fmt"
	"testing"
)

func TestCacheChannel(t *testing.T) {
	intChan := make(chan int, 3)

	go func() {
		fmt.Println("begin run goroutine...")
		for i := 0; i < 20; i++ {
			fmt.Println("set value:", i)
			intChan <- i
		}
		fmt.Println("finish run goroutine.")
	}()

	for i := 0; i < 20; i++ {
		value := <-intChan
		fmt.Println("get value:", value)
	}
}

结构体 Chan

package main

import (
	"fmt"
	"testing"
)

type User struct {
	Name  string
	Age   int
	Class Class
}

type Class struct {
	Name string
}

// use channel translate struct is value translate
func TestTranslateStructChannel(t *testing.T) {
	userChan := make(chan User, 1)

	go func() {
		user := User{Name: "xianbin", Age: 18, Class: Class{Name: "c1"}}
		fmt.Printf("src user: %+v\n", user)
		userChan <- user

		user.Age = 20
		fmt.Printf("src user: %+v\n", user)
	}()

	newUser := <-userChan
	fmt.Printf("src user: %+v\n", newUser)

	// Output:
	//src user: {Name:xianbin Age:18 Class:{Name:c1}}
	//src user: {Name:xianbin Age:20 Class:{Name:c1}}
	//src user: {Name:xianbin Age:18 Class:{Name:c1}}
}

Close Chan

package main

import (
	"fmt"
	"testing"
	"time"
)

func TestCloseChannel(t *testing.T) {
	intChan := make(chan int, 5)
	signCh := make(chan int, 2)

	go func() {
		for i := 0; i < 5; i++ {
			intChan <- i
			fmt.Println("set intCh value:", i)
			time.Sleep(time.Second)
		}

		close(intChan)
		fmt.Println("the intCh channel is closed")

		signCh <- 0
	}()

	go func() {
		for {
			i, ok := <-intChan
			fmt.Printf("get int channel %d, %v \n", i, ok)
			if !ok {
				fmt.Println("monitor intCh channel is closed")
				break
			}

			// 防止耗尽资源
			time.Sleep(time.Second * 4)
		}

		signCh <- 1
	}()

	<-signCh
	<-signCh
	fmt.Println("end")

	// Output:
	//set intCh value: 0
	//get int channel 0, true
	//set intCh value: 1
	//set intCh value: 2
	//set intCh value: 3
	//get int channel 1, true
	//set intCh value: 4
	//the intCh channel is closed
	//get int channel 2, true
	//get int channel 3, true
	//get int channel 4, true
	//get int channel 0, false
	//monitor intCh channel is closed
	//end
}

Signal Chan

package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"
)

func main() {
	stop := make(chan os.Signal, 1)
	signal.Notify(stop, syscall.SIGKILL, syscall.SIGINT)

	reload := make(chan os.Signal, 1)
	signal.Notify(reload, syscall.SIGHUP)

	go func() {
		for {
			<-reload
			fmt.Println("reload")
		}
	}()

	go func() {
		for {
			<-stop
			fmt.Println("stop")
			os.Exit(-1)
		}
	}()
	<-stop
}

// kill -HUP PID
// kill -KILL PID

Sequnse Chan

package main

import (
	"fmt"
	"testing"
	"time"
)

func TestSequnseChannel(t *testing.T) {
	intChan := make(chan int)

	go func() {
		time.Sleep(time.Second)
		v := <-intChan
		fmt.Println(v)
	}()

	intChan <- 1
	fmt.Println(2)
}

Merge Chan

package main

import (
	"fmt"
	"testing"
	"time"
)

func TestMergeChannel(t *testing.T) {
	input1 := make(chan int)
	input2 := make(chan int)
	output := make(chan int)

	go func(in1, in2 <-chan int, out chan<- int) {
		for {
			select {
			case v := <-in1:
				out <- v
			case v := <-in2:
				out <- v
			}
		}
	}(input1, input2, output)

	go func() {
		for i := 0; i < 10; i++ {
			input1 <- i
			fmt.Println("set value:", i)
			time.Sleep(time.Millisecond * 100)
		}
	}()

	go func() {
		for i := 20; i < 30; i++ {
			input2 <- i
			fmt.Println("set value:", i)
			time.Sleep(time.Millisecond * 100)
		}
	}()

	go func() {
		for {
			select {
			case value := <-output:
				fmt.Println("get value:", value)
			}
		}
	}()

	time.Sleep(time.Second * 5)
	fmt.Println("main process exist.")
}

Quit Chan

package main

import (
	"fmt"
	"testing"
	"time"
)

func TestQuitChannel(t *testing.T) {
	intChan := make(chan int)
	quit := make(chan bool)

	go func() {
		for {
			select {
			case v := <-intChan:
				fmt.Println("get value:", v)
			case <-quit:
				fmt.Println("goroutine exit.")
				return
			}
		}
	}()

	for i := 0; i < 3; i++ {
		intChan <- i
	}
	quit <- true
	time.Sleep(time.Second)
	fmt.Println("main process exist.")

	// Output:
	//get value: 0
	//get value: 1
	//get value: 2
	//goroutine exit.
	//main process exist.
}

生产/消费 Chan

package main

import (
	"fmt"
	"testing"
	"time"
)

func TestProductCustomerChannel(t *testing.T) {
	intChan := make(chan int)
	prodChan := make(chan bool)
	custChan := make(chan bool)

	go func() {
		for i := 0; i < 3; i++ {
			intChan <- i
			fmt.Println("Product set value:", i)
			time.Sleep(time.Second)
		}
		prodChan <- true
	}()

	go func() {
		for {
			select {
			case v := <-intChan:
				fmt.Println("Customer get value:", v)
			case <-prodChan:
				custChan <- false
				return
			}
		}
	}()

	<-custChan
	fmt.Println("main process exist.")

	// Output:
	//Product set value: 0
	//Customer get value: 0
	//Product set value: 1
	//Customer get value: 1
	//Customer get value: 2
	//Product set value: 2
	//main process exist.
}

Timeout Chan

package main

import (
	"fmt"
	"math/rand"
	"testing"
	"time"
)

func TestTime(t *testing.T) {
	fmt.Println(time.Now())
	afterChan := time.After(1 * time.Second)
	fmt.Println(<-afterChan) // 只能读取一次,否则报如下错误,应用于延迟执行任务
	//fmt.Println(<-afterChan) // fatal error: all goroutines are asleep - deadlock!

	tickChan := time.Tick(1 * time.Second)
	for i := 0; i < 3; i++ {
		fmt.Println(<-tickChan) // tick 按照指定的间隔可以获取多次,应用于周期性任务
	}
}

func TestTimeoutChannel(t *testing.T) {
	intChan := make(chan int)
	quitChan := make(chan bool)

	go func() {
		for {
			select {
			case v := <-intChan:
				fmt.Println("get value:", v)
			case <-time.After(time.Second * 3):
				quitChan <- true
				fmt.Println("goroutine timeout exit")
				return
			}
		}
	}()

	for i := 0; i < 3; i++ {
		intChan <- i
	}

	<-quitChan
	fmt.Println("goroutine timeout, main process exit")

	// Output:
	//get value: 0
	//get value: 1
	//get value: 2
	//goroutine timeout, main process exit
	//goroutine timeout exit
}

func TestTimeoutChannel2(t *testing.T) {
	result := make(chan int)
	timeout := make(chan bool)

	go func() {
		rand.Seed(time.Now().Unix())
		r := rand.Int() % 5
		time.Sleep(time.Duration(r) * time.Second)
		result <- r
	}()
	go func() {
		time.Sleep(3 * time.Second)
		timeout <- true
	}()

	select {
	case <-timeout:
		fmt.Println("timeout")
	case v := <-result:
		fmt.Println("random", v)
	}
}

读写管道

  • 示例1
package main

import (
	"fmt"
	"time"
)

func main() {
	var intChan chan int = make(chan int, 2)

	// 只读 chan
	go func(intChan <-chan int) {
		fmt.Println(<-intChan)
	}(intChan)

	// 只写 chan
	go func(intChan chan<- int) {
		intChan <- 2
	}(intChan)

	// 使用 sleep 等待前面的结束
	time.Sleep(1 * time.Second)

	// 只读 chan
	var rChan <-chan int = intChan
	// 只写 chan
	var wChan chan<- int = intChan

	wChan <- 8
	fmt.Println(<-rChan)

	// Output:
	//2
	//8
}
  • 示例2
package main

import (
	"fmt"
	"testing"
	"time"
)

func TestInAndOutChannel(t *testing.T) {
	intChan := make(chan int)
	quit := make(chan bool)

	go func(inChan chan<- int) {
		for i := 0; i < 10; i++ {
			inChan <- i
			fmt.Println("set value:", i)
			time.Sleep(time.Microsecond * 500)
		}
		quit <- true
	}(intChan)

	go func(outChan <-chan int) {
		for {
			select {
			case v := <-outChan:
				fmt.Println("get value:", v)
			case <-quit:
				fmt.Println("get inChan quit")
				return
			}
		}
	}(intChan)

	<-quit
	fmt.Println("main process exit")
}

限制数量 chan

package main

import (
	"fmt"
	"testing"
	"time"
)

func TestMaxChannel(t *testing.T) {
	maxNum := 3
	limitChan := make(chan bool, maxNum)
	quit := make(chan bool)

	for i := 0; i < 100; i++ {
		fmt.Println("start worker:")
		limitChan <- true
		go func(i int) {
			fmt.Println("do worker start: ", i)
			time.Sleep(time.Millisecond * 20)
			fmt.Println("do worker finish: ", i)

			<-limitChan

			if i == 99 {
				fmt.Println("goroutine quit")
				quit <- true
			}
		}(i)
	}

	<-quit
	fmt.Println("main process exit")
}

信号量 chan

package main

import (
	"fmt"
	"os"
	"os/signal"
	"testing"
	"time"
)

func TestSignalChannel(t *testing.T) {
	quit := make(chan os.Signal)
	signal.Notify(quit, os.Interrupt)

	go func() {
		number := 0
		for {
			number++
			fmt.Println("number is", number)
			time.Sleep(time.Second * 2)
		}
	}()

	fmt.Println("Press Ctrl + C exit process.")
	<-quit
	fmt.Println("main process exit")
}

Synchronize Chan

package main

import (
	"fmt"
	"math/rand"
	"testing"
	"time"
)

var lockChan = make(chan int, 1)
var gameCurrency = 20

func testSynchronizeUse(currency int) {
	lockChan <- 0

	if gameCurrency >= currency {
		srcGameCurrency := gameCurrency
		gameCurrency -= currency
		fmt.Printf("old %d, use %d,left %d\n", srcGameCurrency, currency, gameCurrency)
	} else {
		fmt.Printf("want use %d, just only %d\n", currency, gameCurrency)
	}

	<-lockChan
}

func testSynchronizeGain(currency int) {
	lockChan <- 0

	srcGameCurrency := gameCurrency
	gameCurrency += currency
	fmt.Printf("old %d, get %d,left %d\n", srcGameCurrency, currency, gameCurrency)

	<-lockChan
}

func TestSynchronizeChannel(t *testing.T) {
	quit := make(chan bool, 2)

	go func() {
		for i := 0; i < 10; i++ {
			currency := rand.Intn(12) + 1
			go testSynchronizeUse(currency)

			time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
		}

		quit <- true
	}()

	go func() {
		for i := 0; i < 10; i++ {
			currency := rand.Intn(12) + 1
			go testSynchronizeGain(currency)

			time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
		}

		quit <- true
	}()

	<-quit
	<-quit
	fmt.Println("main process exit")
}
Home Archives Categories Tags Statistics
本文总阅读量 次 本站总访问量 次 本站总访客数