不会飞的章鱼

熟能生巧,勤能补拙;念念不忘,必有回响。

Go语言精进——了解channel的妙用

什么是channel

channel是Go语言提供的一种重要的并发原语。它在Go语言的CSP模型中扮演者重要的角色:

  • 可以实现goroutine之间的通信;
  • 可以实现goroutine之间的同步。

channel原语的使用:

1
2
3
4
5
6
7
8
9
10
11
12
c := make(chan int)  // 创建一个无缓冲的int类型的channel
c := make(chan int,5) // 创建一个带缓冲的int类型的channel
c <- x // 向channel c 中发送一个值
<- c // 从channel c 中接收一个值
x = <- c // 从channel c 接收一个值并将其存储到变量x中
x,ok = <- c // 从channel c 接收一个值。若channel关闭了,ok将被置为false
for i := range c {...}
close(c) // 关闭channel c

c := make(chan chan int) // 创建一个无缓冲的chan int类型的channel
func stream(ctx context.Context, out chan<- Value) error // 将只发送channel作为函数参数
func spwn(...) <-chan T // 将只接收类型channel作为返回值

当需要同时对多个channel进行操作时,我们将使用另一个CSP模型提供的原语select

1
2
3
4
5
6
select {
case x := <-c1: // 从channel c1接收数据
case y,ok := <-c2: // 从channel c2接收数据,并根据ok值判断c2是否已经关闭
case c3 <- z: // 将z值发送到channel c3中
default: // 当上面的case中的channel通信无法实施时,执行该默认分支
}

无缓冲channel

  • 发送动作一定发生在接收动作完成之前;
  • 接收动作一定发生在发送动作完成之前。

用作信号传递

一对一通知信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type signal struct{}

func worker() {
println("worker is working...")
time.Sleep(1 * time.Second)
}

func spawn(f func()) <-chan signal {
c := make(chan signal)
go func() {
println("worker start to work...")
f()
c <- signal(struct{}{})
}()
return c // 新goroutine退出的通知信号
}

func main() {
println("start a worker...")
c := spawn(worker)
<-c
fmt.Println("worker work done!")
}

一对多通知信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
type signal struct{}

func worker(i int) {
fmt.Printf("worker %d: is working...\n", i)
time.Sleep(1 * time.Second)
fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(i int), num int, groupSignal <-chan signal) <-chan signal {
c := make(chan signal)
var wg sync.WaitGroup

for i := 0; i < num; i++ {
wg.Add(1)
go func(i int) {
<-groupSignal
fmt.Printf("worker %d: start to work...\n", i)
f(i)
wg.Done()
}(i + 1)
}

go func() {
wg.Wait()
c <- signal(struct{}{})
}()
return c
}

func main() {
fmt.Println("start a group of workers...")
groupSignal := make(chan signal)
c := spawnGroup(worker, 5, groupSignal) // 创建一组5个 work goroutine
time.Sleep(5 * time.Second)
fmt.Println("the group of workers start to work...")
close(groupSignal) // 一起开始工作
<-c
fmt.Println("the group of workers work done!")
}

/*
start a group of workers...
the group of workers start to work...
worker 4: start to work...
worker 4: is working...
worker 1: start to work...
worker 2: start to work...
worker 2: is working...
worker 1: is working...
worker 5: start to work...
worker 5: is working...
worker 3: start to work...
worker 3: is working...
worker 3: works done
worker 4: works done
worker 5: works done
worker 1: works done
worker 2: works done
the group of workers work done!
*/

用于替代锁机制

计数器实现方式

基于共享内存+锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type counter struct {
sync.Mutex
i int
}

var cter counter

func Increase() int {
cter.Lock()
defer cter.Unlock()
cter.i++
return cter.i
}

func main() {
for i := 0; i < 10; i++ {
go func(i int) {
v := Increase()
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
}(i)
}

time.Sleep(5 * time.Second)
}

无缓冲channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
type counter struct {
c chan int
i int
}

var cter counter

func InitCounter() {
cter = counter{
c: make(chan int),
}

go func() {
for {
cter.i++
cter.c <- cter.i
}
}()
fmt.Println("counter init ok")
}

func Increase() int {
return <-cter.c
}

func init() {
InitCounter()
}

func main() {
for i := 0; i < 10; i++ {
go func(i int) {
v := Increase()
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
}(i)
}

time.Sleep(5 * time.Second)
}

总结

将计数器操作全部交给一个独立的goroutine处理,并通过无缓冲channel的同步阻塞特性实现计数器的控制。

这种设计更符合Go语言的原则——不要通过共享内存来通信,而应该通过通信来共享内存

带缓冲channel

通过带有capacity参数的内置make函数创建:

1
c := make(chan T,capacity)

由于带缓冲channel的运行时层实现带有缓冲区,因此对带缓冲channel的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的。因此:

  • 在缓冲区无数据或有数据但未满的情况下,对其进行发送操作的goroutine不会阻塞;
  • 在缓冲区已满的情况下,对其进行发送操作的goroutine会阻塞;
  • 在缓冲区为空的情况下,对其进行接收操作的goroutine也会阻塞。

用作消息队列

单收单发性能基准测试

一对一无缓冲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// for send benchmark test
var c1 chan string

// for recv benchmark test
var c2 chan string

func init() {
c1 = make(chan string)
go func() {
for {
<-c1
}
}()

c2 = make(chan string)
go func() {
for {
c2 <- "hello"
}
}()
}

func send(msg string) {
c1 <- msg
}
func recv() {
<-c2
}

func BenchmarkUnbufferedChan1To1Send(b *testing.B) {
for n := 0; n < b.N; n++ {
send("hello")
}
}
func BenchmarkUnbufferedChan1To1Recv(b *testing.B) {
for n := 0; n < b.N; n++ {
recv()
}
}

/*$ go test -bench . one_to_one_test.go
goos: linux
goarch: amd64
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
BenchmarkUnbufferedChan1To1Send-8 3983892 281.5 ns/op
BenchmarkUnbufferedChan1To1Recv-8 4622538 272.9 ns/op
PASS
*/

一对一有缓冲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// for send benchmark test
var c1 chan string

// for recv benchmark test
var c2 chan string

func init() {
c1 = make(chan string, 10)
go func() {
for {
<-c1
}
}()

c2 = make(chan string, 10)
go func() {
for {
c2 <- "hello"
}
}()
}

func send(msg string) {
c1 <- msg
}
func recv() {
<-c2
}

func BenchmarkBufferedChan1To1SendCap10(b *testing.B) {
for n := 0; n < b.N; n++ {
send("hello")
}
}
func BenchmarkBufferedChan1To1RecvCap10(b *testing.B) {
for n := 0; n < b.N; n++ {
recv()
}
}

/*
$ go test -bench . one_to_one_cap_10_test.go
goos: linux
goarch: amd64
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
BenchmarkBufferedChan1To1SendCap10-8 11304949 105.1 ns/op
BenchmarkBufferedChan1To1RecvCap10-8 12171568 95.44 ns/op
PASS

$ go test -bench . one_to_one_cap_100_test.go
goos: linux
goarch: amd64
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
BenchmarkBufferedChan1To1SendCap100-8 17385675 67.87 ns/op
BenchmarkBufferedChan1To1RecvCap100-8 18351138 68.77 ns/op
PASS
*/

多对多无缓冲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// for send benchmark test
var c1 chan string

// for recv benchmark test
var c2 chan string

func init() {
c1 = make(chan string)
for i := 0; i < 10; i++ {
go func() {
for {
<-c1
}
}()
go func() {
for {
c1 <- "hello"
}
}()
}

c2 = make(chan string)
for i := 0; i < 10; i++ {
go func() {
for {
c2 <- "hello"
}
}()
go func() {
for {
<-c2
}
}()
}
}

func send(msg string) {
c1 <- msg
}
func recv() {
<-c2
}

func BenchmarkUnbufferedChanNToNSend(b *testing.B) {
for n := 0; n < b.N; n++ {
send("hello")
}
}
func BenchmarkUnbufferedChanNToNRecv(b *testing.B) {
for n := 0; n < b.N; n++ {
recv()
}
}

/*
$ go test -bench . multi_to_multi_test.go
goos: linux
goarch: amd64
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
BenchmarkUnbufferedChanNToNSend-8 197262 5600 ns/op
BenchmarkUnbufferedChanNToNRecv-8 187816 5794 ns/op
PASS
*/

多对多有缓冲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// for send benchmark test
var c1 chan string

// for recv benchmark test
var c2 chan string

func init() {
c1 = make(chan string, 10)
for i := 0; i < 10; i++ {
go func() {
for {
<-c1
}
}()
go func() {
for {
c1 <- "hello"
}
}()
}

c2 = make(chan string, 10)
for i := 0; i < 10; i++ {
go func() {
for {
c2 <- "hello"
}
}()
go func() {
for {
<-c2
}
}()
}
}

func send(msg string) {
c1 <- msg
}
func recv() {
<-c2
}

func BenchmarkBufferedChanNToNSendCap10(b *testing.B) {
for n := 0; n < b.N; n++ {
send("hello")
}
}
func BenchmarkBufferedChanNToNRecvCap10(b *testing.B) {
for n := 0; n < b.N; n++ {
recv()
}
}

/*
$ go test -bench . multi_to_multi_cap_10_test.go
goos: linux
goarch: amd64
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
BenchmarkBufferedChanNToNSendCap10-8 638821 1924 ns/op
BenchmarkBufferedChanNToNRecvCap10-8 544150 1925 ns/op
PASS

$ go test -bench . multi_to_multi_cap_100_test.go
goos: linux
goarch: amd64
cpu: 11th Gen Intel(R) Core(TM) i5-1135G7 @ 2.40GHz
BenchmarkBufferedChanNToNSendCap100-8 1109060 1186 ns/op
BenchmarkBufferedChanNToNRecvCap100-8 877389 1269 ns/op
PASS

*/

结论:

  • 无论是单收单发还是多收多发,带缓冲channel的收发性能都要好于无缓冲的channel;
  • 对于带缓冲channel而言,选择适当容量会在一定程度上提升收发性能。

用作计数信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)

func main() {
go func() {
for i := 0; i < 8; i++ {
jobs <- (i + 1)
}
close(jobs)
}()

var wg sync.WaitGroup

for j := range jobs {
wg.Add(1)
go func(j int) {
active <- struct{}{}
log.Printf("handle job: %d\n", j)
time.Sleep(2 * time.Second)
<-active
wg.Done()
}(j)
}
wg.Wait()
}

/*
2022/10/08 15:43:45 handle job: 2
2022/10/08 15:43:45 handle job: 8
2022/10/08 15:43:45 handle job: 6
2022/10/08 15:43:47 handle job: 7
2022/10/08 15:43:47 handle job: 3
2022/10/08 15:43:47 handle job: 1
2022/10/08 15:43:49 handle job: 4
2022/10/08 15:43:49 handle job: 5
*/

len(channel)的应用

  • 当channel为无缓冲时,len(channel)返回的是0;
  • 当channel为有缓冲时,len(channel)返回当前channel中尚未读取的元素个数。

nil channel的用法

对于没有初始化的channel进行读写操作会发生阻塞,比如:

1
2
3
4
5
6
func main() {
var c chan int
<-c

c <- 1
}

与select结合

利用default分支避免阻塞

实现超时机制

1
2
3
4
5
6
7
8
func worker() {
select {
case <- c:
// ...
case <- time.After(30 * time.Second):
return
}
}

实现心跳机制

1
2
3
4
5
6
7
8
9
10
11
12
func worker() {
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <- c:
// ...
case <- heartbeat.C:
// 处理心跳
}
}
}
------ 本文结束------
如果本篇文章对你有帮助,可以给作者加个鸡腿~(*^__^*),感谢鼓励与支持!