本文共 6360 字,大约阅读时间需要 21 分钟。
上下文context是对计时器(timer)+通道(channel)+同步锁(sync.Mutex)的封装,主要用于多个协程间的统一控制,如取消和定时。理论上,能用上下文的地方都可以用计时器+通道+同步锁的方式来改写,那为什么还要用上下文呢。本文就从一个例子开始推导一下这个问题。仍然还是用里的例子,采用context的实现方式请见前文,这里不再累述。
假设有这样一个应用场景,一个公司(main)有一名经理(manager)和两名工人(worker),公司下班(main exit)有两种可能:一:工人(worker)的工作时间已经达到合同约定的最大时长;二:经理(manager)提前叫停收工。两种可能满足其中一个即可下班。
1)首先我们先简化问题,仅以timer实现上述场景的第二种下班条件:
package mainimport ( "fmt" "time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 2 * time.Secondfunc main() { ch := make(chan struct{}) go worker(ch, "[1]") go worker(ch, "[2]") go manager(ch) <-ch //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed")}func manager(ch chan struct{}) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") close(ch)}func worker(ch chan struct{}, name string) { for { select { case <-ch: fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) }}
输出:
[2] working[1] working[1] working[2] workingmanager called cancel()[1] return for ctxWithCancel.Done()[2] return for ctxWithCancel.Done()company closed
从输出来看,manger协程在延时2秒后关闭了通道,worker检测到通道关闭后退出,main退出。整个过程符合预期。
2)我们再加入下班的第一种条件:
在main函数里加入一个timer,当达到worker的最大工作时长时,关闭通道ch。其余代码不变。
func main() { ch := make(chan struct{}) go worker(ch, "[1]") go worker(ch, "[2]") go manager(ch) timer := time.NewTimer(MAX_WORKING_DURATION) select{ case <- timer.C: close(ch) } <-ch //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed")}
输出:
[1] working[2] working[2] working[1] workingmanager called cancel()[1] return for ctxWithCancel.Done()[2] return for ctxWithCancel.Done()panic: close of closed channelgoroutine 1 [running]:main.main() /home/go-test/src/tutorial/trycontext_timer.go:24 +0x10eexit status 2
最后一行报错了,试图关闭一个已经关闭的通道。因为manager已经在2秒的时候把通道ch关闭了,所以定时器5秒的时候再关通道ch就报错了。这是一个协程同步的问题,要修正这个问题就需要加入同步锁和标识通道是否关闭的标志变量,在关闭通道时使用同步锁锁定再检测标志变量的状态,完成后释放同步锁。
修正之后:
package mainimport ( "fmt" "sync" "time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 2 * time.Secondtype ctx struct { mu sync.Mutex closed bool}func main() { ch := make(chan struct{}) go worker(ch, "[1]") go worker(ch, "[2]") var c ctx go manager(ch, &c) timer := time.NewTimer(MAX_WORKING_DURATION) select { case <-timer.C: c.mu.Lock() if c.closed == true { c.mu.Unlock() return } close(ch) c.closed = true c.mu.Unlock() } <-ch //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed")}func manager(ch chan struct{}, c *ctx) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") c.mu.Lock() if c.closed == true { c.mu.Unlock() return } close(ch) c.closed = true c.mu.Unlock()}func worker(ch chan struct{}, name string) { for { select { case <-ch: fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) }}
这里加入了一个ctx结构体以指针的方式在协程之间共享,同时把timer对通道的关闭以及manager对通道的关闭都加上了同步锁和标志变量检测。这样一来,整个运行就正常了:
[1] working[2] working[2] working[1] workingmanager called cancel()[2] return for ctxWithCancel.Done()[1] return for ctxWithCancel.Done()
修改ACTUAL_WORKING_DURATION = 10 * time.Second,让超时先发生,输出也符合预期:
[1] working[2] working[2] working[1] working[2] working[1] working[2] working[1] working[2] working[1] working[2] return for ctxWithCancel.Done()[1] return for ctxWithCancel.Done()company closed
就功能而言,这段代码已经完全可用了,既实现了超时控制又实现了取消控制。但是,这段代码存在冗余,又不方便重复使用,因此还需要重构:
首先把通道以及对通道的取消操作封装到ctx结构体:
package mainimport ( "fmt" "sync" "time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 10 * time.Secondtype ctx struct{ mu sync.Mutex closed bool done chan struct{} }func New() (*ctx, func()){ c := ctx{} c.done = make(chan struct{}) return &c, func(){c.cancel()}}func (c *ctx)Done() chan struct{}{ return c.done}func (c *ctx)cancel(){ c.mu.Lock() if c.closed == true{ c.mu.Unlock() return } close(c.done) c.closed = true c.mu.Unlock()}func main() { c, cancelFunc := New() go worker(c, "[1]") go worker(c, "[2]") go manager(c, cancelFunc) timer := time.NewTimer(MAX_WORKING_DURATION) select { case <-timer.C: cancelFunc() } <-c.Done() //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed")}func manager(c *ctx, cancelFunc func()) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") cancelFunc()}func worker(c *ctx, name string) { for { select { case <-c.Done(): fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) }}
这样一来,主体程序就简洁多了,对通道的操作都被封装在ctx结构体里。到这一步,其实已经可以看出ctx结构体就是带有cancel功能的上下文的雏形。
我们再把上面定时器超时部分的代码也封装到ctx结构体
最后的代码:
package mainimport ( "fmt" "sync" "time")//worker工作的最大时长,超过这个时长worker自行收工无需等待manager叫停const MAX_WORKING_DURATION = 5 * time.Second//达到实际工作时长后,manager可以提前叫停const ACTUAL_WORKING_DURATION = 10 * time.Secondtype ctx struct{ mu sync.Mutex closed bool done chan struct{} }func New() (*ctx, func()){ c := ctx{} c.done = make(chan struct{}) return &c, func(){c.cancel()}}func NewWithTimeout(dur time.Duration) (*ctx, func()){ c := ctx{} c.done = make(chan struct{}) timer := time.NewTimer(dur) go func() { select { case <-timer.C: c.cancel() } }() return &c, func(){c.cancel()}}func (c *ctx)Done() chan struct{}{ return c.done}func (c *ctx)cancel(){ c.mu.Lock() if c.closed == true{ c.mu.Unlock() return } close(c.done) c.closed = true c.mu.Unlock()}func main() { c, cancelFunc := NewWithTimeout(MAX_WORKING_DURATION) go worker(c, "[1]") go worker(c, "[2]") go manager(c, cancelFunc) <-c.Done() //暂停1秒便于协程的打印输出 time.Sleep(1 * time.Second) fmt.Println("company closed")}func manager(c *ctx, cancelFunc func()) { time.Sleep(ACTUAL_WORKING_DURATION) fmt.Println("manager called cancel()") cancelFunc()}func worker(c *ctx, name string) { for { select { case <-c.Done(): fmt.Println(name, "return for ctxWithCancel.Done()") return default: fmt.Println(name, "working") } time.Sleep(1 * time.Second) }}
总结:
本文为了实现多协程同步的问题,从定时器+通道的方式逐步推导,最终得到了一个可重复使用的结构体ctx,而这个结构体实际就是context的雏形,整个过程是一个“造轮子”的过程,golang的context已经把这个轮子造好了。所以在多协程同步的场景下,都应该使用context。
转载地址:http://anmwn.baihongyu.com/