声明
声明
声明
阅读本编文章需要go语言基础和对资源池有一些了解。
go 版本为1.11,FastHTTP为2018-11-23的最新master版本
前言
在开始前我们先来简单定义一下协程池:能够达到协程资源复用
。在这个定义下协程池的实现可以说是“百花齐放”了,找一下热门的go语言开源项目都会有协程池的不同实现方式。 有基于链表实现的Tidb,有基于环形队列实现的Jaeger,有基于数组栈实现的FastHTTP等,种类繁多任君选择。这么多的协程池实现可以归纳成二种:
这2种实现中,个人比较喜欢第二种按需创建,FastHTTP也是使用第二种方式,所以我们来看看它是如何实现的。
FastHTTP协程池简介
在介绍FastHTTP协程池之前先做一下简单的介绍。workerChan和协程一一对应,相同的生命周期,可以把workerChan看成是协程的门牌,使用凭证,引路子等。 整个协程池的实现主要由workerPool和workerChan组成。
- 请求进来创建协程
- 请求处理完成,把协程的workerChan放入workerPool.ready
- 再有请求进来,从workerPool.ready获取workerChan,处理请求。
- 从第2步开始不断重复
协程池用在哪里
- go官方原生
http.Server
1net/http/server.go #2805
2func (srv *Server) Serve(l net.Listener) error {
3 ......
4 for {
5 rw, e := l.Accept()
6 ......
7 //FastHTTP在这步使用协程池
8 go c.serve(ctx)
9 }
10}
- FastHTTP的
fasthttp.ListenAndServe
1github.com/valyala/fasthttp/server.go 1489
2func (s *Server) Serve(ln net.Listener) error {
3 ......
4 for {
5 if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
6 ......
7 }
8 //对应go原生的 go c.serve(ctx)
9 if !wp.Serve(c) {
10 ......
11 }
12 ......
13 }
14}
在go原生的
http.Server
包中,当接收到新请求就会启动一个协程处理,而FastHTTP则使用协程池处理。
获取workerChan
1github.com/valyala/fasthttp/workerpool.go #156
2func (wp *workerPool) getCh() *workerChan {
3 var ch *workerChan
4 createWorker := false
5
6 wp.lock.Lock()
7 ready := wp.ready
8 n := len(ready) - 1
9 if n < 0 {
10 if wp.workersCount < wp.MaxWorkersCount {
11 createWorker = true
12 wp.workersCount++
13 }
14 } else {
15 //从尾部获取Ch
16 ch = ready[n]
17 ready[n] = nil
18 wp.ready = ready[:n]
19 }
20 wp.lock.Unlock()
21
22 if ch == nil {
23 //如果协程数超过上限,直接抛弃当前请求
24 if !createWorker {
25 return nil
26 }
27 vch := wp.workerChanPool.Get()
28 if vch == nil {
29 vch = &workerChan{
30 ch: make(chan chan struct{}, workerChanCap),
31 }
32 }
33 ch = vch.(*workerChan)
34 //ch和协程绑定
35 go func() {
36 wp.workerFunc(ch)
37 wp.workerChanPool.Put(vch)
38 }()
39 }
40 return ch
41}
在go语言中不同协程之间的通讯使用
channel
,在协程池中也不例外,FastHTTP创建了一个协程,就会和一个workerChan
绑定,使用方根据这个workerChan
就可以使用协程池里的资源。从上面的代码可以看出,使用协程池的资源,都是先从Slice的尾部弹出workerChan
,在把workerChan
交给使用放,如果Slice没有workerChan
就会创建。
把workerChan放入Slice尾部
1github.com/valyala/fasthttp/workerpool.go #194
2func (wp *workerPool) release(ch *workerChan) bool {
3 //用户清理
4 ch.lastUseTime = time.Now()
5 wp.lock.Lock()
6 if wp.mustStop {
7 wp.lock.Unlock()
8 return false
9 }
10 //往尾部追加
11 wp.ready = append(wp.ready, ch)
12 wp.lock.Unlock()
13 return true
14}
当协程完成工作后,就会把
workerChan
放回Slice尾部,以待其他请求使用。
定期清理过期workerChan
1github.com/valyala/fasthttp/workerpool.go #98
2func (wp *workerPool) clean(scratch *[]*workerChan) {
3 ......
4 currentTime := time.Now()
5
6 wp.lock.Lock()
7 ready := wp.ready
8 n := len(ready)
9 i := 0
10 for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
11 i++
12 }
13 *scratch = append((*scratch)[:0], ready[:i]...)
14 if i > 0 {
15 m := copy(ready, ready[i:])
16 for i = m; i < n; i++ {
17 ready[i] = nil
18 }
19 wp.ready = ready[:m]
20 }
21 wp.lock.Unlock()
22
23 ......
24 tmp := *scratch
25 for i, ch := range tmp {
26 //让协程停止工作
27 ch.ch <- nil
28 tmp[i] = nil
29 }
30}
定期清理是为了避免在常态下空闲的协程过多,加重了调度层的负担。使用按需创建协程池的方式存在这样一个问题,高峰期的时候创建了很多协程,高峰期过后很多协程处于空闲状态,这就造成了不必要的开销。所以需要一种过期机制。在这里数组栈(FILO)的优点也体现出来了,因为栈的特点不活跃的
workerChan
都放在了数组的头部,所以只需要从数组头部开始轮询,一直到找到未过期的workerChan
,再把这部分清理掉,就达到清理的效果,并且不需要轮询整个数组。
收益有多少
花了点时间对FastHTTP的协程池进行了压测代码。
1apple:gopool apple$ go test -bench=. -test.benchmem
2goos: darwin
3goarch: amd64
4pkg: study_go/gopool
5BenchmarkNotPool-4 10 4937881320 ns/op 107818560 B/op 401680 allocs/op
6BenchmarkFastHttpPool-4 10 380807481 ns/op 13444607 B/op 169946 allocs/op
7BenchmarkAntsPoll-4 10 429482715 ns/op 20756724 B/op 302093 allocs/op
8PASS
9ok study_go/gopool 72.891s
从上面的对比来看使用协程池的收益还不少。
结语
FastHTTP协程池的实现方式是我所了解的几种实现中,性能是比较突出的,当然其他协程池的实现方式也很有学习参考价值,在这个过程中复习了链表,数组栈,环形队列的使用场景。收获颇多。