博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Go语言:为什么要使用上下文(context)而不是计时器(timer)加通道(channel)的方式来控制协程
阅读量:3593 次
发布时间:2019-05-20

本文共 6360 字,大约阅读时间需要 21 分钟。

上下文context是对计时器(timer)+通道(channel)+同步锁(sync.Mutex)的封装,主要用于多个协程间的统一控制,如取消和定时。理论上,能用上下文的地方都可以用计时器+通道+同步锁的方式来改写,那为什么还要用上下文呢。本文就从一个例子开始推导一下这个问题。仍然还是用里的例子,采用context的实现方式请见前文,这里不再累述。

假设有这样一个应用场景,一个公司(main)有一名经理(manager)和两名工人(worker),公司下班(main exit)有两种可能:一:工人(worker)的工作时间已经达到合同约定的最大时长;二:经理(manager)提前叫停收工。两种可能满足其中一个即可下班。

1)首先我们先简化问题,仅以timer实现上述场景的第二种下班条件:

package mainimport (	"fmt"	"time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 2 * time.Secondfunc main() {	ch := make(chan struct{})	go worker(ch, "[1]")	go worker(ch, "[2]")	go manager(ch)	<-ch	//暂停1秒便于协程的打印输出	time.Sleep(1 * time.Second)	fmt.Println("company closed")}func manager(ch chan struct{}) {	time.Sleep(ACTUAL_WORKING_DURATION)	fmt.Println("manager called cancel()")	close(ch)}func worker(ch chan struct{}, name string) {	for {		select {		case <-ch:			fmt.Println(name, "return for ctxWithCancel.Done()")			return		default:			fmt.Println(name, "working")		}		time.Sleep(1 * time.Second)	}}

输出:

[2] working[1] working[1] working[2] workingmanager called cancel()[1] return for ctxWithCancel.Done()[2] return for ctxWithCancel.Done()company closed

从输出来看,manger协程在延时2秒后关闭了通道,worker检测到通道关闭后退出,main退出。整个过程符合预期。

2)我们再加入下班的第一种条件:

在main函数里加入一个timer,当达到worker的最大工作时长时,关闭通道ch。其余代码不变。

func main() {	ch := make(chan struct{})	go worker(ch, "[1]")	go worker(ch, "[2]")	go manager(ch)	timer := time.NewTimer(MAX_WORKING_DURATION)	select{		case <- timer.C:			close(ch)	}	<-ch	//暂停1秒便于协程的打印输出	time.Sleep(1 * time.Second)	fmt.Println("company closed")}

输出:

[1] working[2] working[2] working[1] workingmanager called cancel()[1] return for ctxWithCancel.Done()[2] return for ctxWithCancel.Done()panic: close of closed channelgoroutine 1 [running]:main.main()        /home/go-test/src/tutorial/trycontext_timer.go:24 +0x10eexit status 2

最后一行报错了,试图关闭一个已经关闭的通道。因为manager已经在2秒的时候把通道ch关闭了,所以定时器5秒的时候再关通道ch就报错了。这是一个协程同步的问题,要修正这个问题就需要加入同步锁和标识通道是否关闭的标志变量,在关闭通道时使用同步锁锁定再检测标志变量的状态,完成后释放同步锁。

修正之后:

package mainimport (	"fmt"	"sync"	"time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 2 * time.Secondtype ctx struct {	mu     sync.Mutex	closed bool}func main() {	ch := make(chan struct{})	go worker(ch, "[1]")	go worker(ch, "[2]")	var c ctx	go manager(ch, &c)	timer := time.NewTimer(MAX_WORKING_DURATION)	select {	case <-timer.C:		c.mu.Lock()		if c.closed == true {			c.mu.Unlock()			return		}		close(ch)		c.closed = true		c.mu.Unlock()	}	<-ch	//暂停1秒便于协程的打印输出	time.Sleep(1 * time.Second)	fmt.Println("company closed")}func manager(ch chan struct{}, c *ctx) {	time.Sleep(ACTUAL_WORKING_DURATION)	fmt.Println("manager called cancel()")	c.mu.Lock()	if c.closed == true {		c.mu.Unlock()		return	}	close(ch)	c.closed = true	c.mu.Unlock()}func worker(ch chan struct{}, name string) {	for {		select {		case <-ch:			fmt.Println(name, "return for ctxWithCancel.Done()")			return		default:			fmt.Println(name, "working")		}		time.Sleep(1 * time.Second)	}}

这里加入了一个ctx结构体以指针的方式在协程之间共享,同时把timer对通道的关闭以及manager对通道的关闭都加上了同步锁和标志变量检测。这样一来,整个运行就正常了:

[1] working[2] working[2] working[1] workingmanager called cancel()[2] return for ctxWithCancel.Done()[1] return for ctxWithCancel.Done()

修改ACTUAL_WORKING_DURATION = 10 * time.Second,让超时先发生,输出也符合预期:

[1] working[2] working[2] working[1] working[2] working[1] working[2] working[1] working[2] working[1] working[2] return for ctxWithCancel.Done()[1] return for ctxWithCancel.Done()company closed

就功能而言,这段代码已经完全可用了,既实现了超时控制又实现了取消控制。但是,这段代码存在冗余,又不方便重复使用,因此还需要重构:

首先把通道以及对通道的取消操作封装到ctx结构体:

package mainimport (	"fmt"	"sync"	"time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 10 * time.Secondtype ctx struct{	mu sync.Mutex 	closed bool 	done chan struct{} }func New() (*ctx, func()){	c := ctx{}	c.done = make(chan struct{})	return &c, func(){c.cancel()}}func (c *ctx)Done() chan struct{}{	return c.done}func (c *ctx)cancel(){	c.mu.Lock()	if c.closed == true{		c.mu.Unlock()		return	}	close(c.done)	c.closed = true	c.mu.Unlock()}func main() {	c, cancelFunc := New()	go worker(c, "[1]")	go worker(c, "[2]")	go manager(c, cancelFunc)	timer := time.NewTimer(MAX_WORKING_DURATION)	select {	case <-timer.C:		cancelFunc()	}	<-c.Done()	//暂停1秒便于协程的打印输出	time.Sleep(1 * time.Second)	fmt.Println("company closed")}func manager(c *ctx, cancelFunc func()) {	time.Sleep(ACTUAL_WORKING_DURATION)	fmt.Println("manager called cancel()")	cancelFunc()}func worker(c *ctx, name string) {	for {		select {		case <-c.Done():			fmt.Println(name, "return for ctxWithCancel.Done()")			return		default:			fmt.Println(name, "working")		}		time.Sleep(1 * time.Second)	}}

这样一来,主体程序就简洁多了,对通道的操作都被封装在ctx结构体里。到这一步,其实已经可以看出ctx结构体就是带有cancel功能的上下文的雏形。

我们再把上面定时器超时部分的代码也封装到ctx结构体

最后的代码:

package mainimport (	"fmt"	"sync"	"time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 10 * time.Secondtype ctx struct{	mu sync.Mutex 	closed bool 	done chan struct{} }func New() (*ctx, func()){	c := ctx{}	c.done = make(chan struct{})	return &c, func(){c.cancel()}}func NewWithTimeout(dur time.Duration) (*ctx, func()){	c := ctx{}	c.done = make(chan struct{})	timer := time.NewTimer(dur)	go func() {		select {		case <-timer.C:			c.cancel()		}	}()	return &c, func(){c.cancel()}}func (c *ctx)Done() chan struct{}{	return c.done}func (c *ctx)cancel(){	c.mu.Lock()	if c.closed == true{		c.mu.Unlock()		return	}	close(c.done)	c.closed = true	c.mu.Unlock()}func main() {	c, cancelFunc := NewWithTimeout(MAX_WORKING_DURATION)	go worker(c, "[1]")	go worker(c, "[2]")	go manager(c, cancelFunc)	<-c.Done()	//暂停1秒便于协程的打印输出	time.Sleep(1 * time.Second)	fmt.Println("company closed")}func manager(c *ctx, cancelFunc func()) {	time.Sleep(ACTUAL_WORKING_DURATION)	fmt.Println("manager called cancel()")	cancelFunc()}func worker(c *ctx, name string) {	for {		select {		case <-c.Done():			fmt.Println(name, "return for ctxWithCancel.Done()")			return		default:			fmt.Println(name, "working")		}		time.Sleep(1 * time.Second)	}}

总结:

本文为了实现多协程同步的问题,从定时器+通道的方式逐步推导,最终得到了一个可重复使用的结构体ctx,而这个结构体实际就是context的雏形,整个过程是一个“造轮子”的过程,golang的context已经把这个轮子造好了。所以在多协程同步的场景下,都应该使用context。

 

转载地址:http://anmwn.baihongyu.com/

你可能感兴趣的文章
数据结构与算法-python实现无序表(单链表)
查看>>
数据结构与算法-python实现顺序表
查看>>
Django REST framework仅需几步完成搭建api
查看>>
Django REST framework 中的权限认证
查看>>
采用实例演示Vue生命周期
查看>>
idea破解后无法打开的解决办法
查看>>
浅谈js中节点的浅拷贝和深拷贝
查看>>
服务器tomcat成功运行但是无法在外网访问的解决办法
查看>>
排序算法的稳定问题
查看>>
M1安装环境
查看>>
类加载之双亲委派
查看>>
c++ make_pair&pair
查看>>
C++ mutable
查看>>
剑指offer:面试题18. 删除链表的节点
查看>>
剑指offer:面试题19. 正则表达式匹配
查看>>
剑指offer:面试题24. 反转链表
查看>>
剑指offer:面试题25. 合并两个排序的链表
查看>>
剑指offer:面试题26. 树的子结构
查看>>
剑指offer:面试题27. 二叉树的镜像
查看>>
剑指offer:面试题33. 二叉搜索树的后序遍历序列
查看>>