Golang-redigo源码分析之连接池

连接池一直是系统设计中很重要的一个话题,其主要的作用是复用系统中已经创建好的连接,避免重复创建连接加重系统负荷,下面看一下golang中redigo中连接池的使用和原理。

使用示例

首先看下redigo中连接池的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"time"
)

import (
"github.com/gomodule/redigo/redis"
)

func main() {
pool := &redis.Pool{
MaxIdle: 4,
MaxActive: 4,
Dial: func() (redis.Conn, error) {
rc, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
return nil, err
}
return rc, nil
},
IdleTimeout: time.Second,
Wait: true,
}
con := pool.Get()
str, err := redis.String(con.Do("get", "aaa"))
con.Close()
fmt.Println("value: ", str, " err:", err)
}

我们可以看到Redigo使用连接池还是很简单的步骤:

  1. 创建连接池
  2. 简单设置连接池的最大连接数等参数
  3. 注入拨号函数(设置redis地址 端口号等)
  4. 调用pool.Get() 获取连接
  5. 使用连接Do函数请求redis
  6. 关闭连接

源码分析

首先看下连接池对象Pool的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
type Pool struct {
// Dial is an application supplied function for creating and configuring a
// connection.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
// 拨号函数,从外部注入
Dial func() (Conn, error)

// DialContext is an application supplied function for creating and configuring a
// connection with the given context.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
DialContext func(ctx context.Context) (Conn, error)

// TestOnBorrow is an optional application supplied function for checking
// the health of an idle connection before the connection is used again by
// the application. Argument t is the time that the connection was returned
// to the pool. If the function returns an error, then the connection is
// closed.
// 检测连接的可用性,从外部注入。如果返回error,则直接关闭连接。
TestOnBorrow func(c Conn, t time.Time) error

// Maximum number of idle connections in the pool.
// 最大闲置连接数量
MaxIdle int

// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
// 最大活动连接数
MaxActive int

// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
// 闲置过期时间,在get函数中会有逻辑,删除过期的连接
IdleTimeout time.Duration

// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
// 设置如果活动连接达到上限 再获取时候是等待还是返回错误
// 如果是false 系统会返回redigo: connection pool exhausted
// 如果是true 会利用p 的ch 属性让线程等待,直到有连接释放出来
Wait bool

// Close connections older than this duration. If the value is zero, then
// the pool does not close connections based on age.
// 连接最长生存时间 如果超过时间会被从链表中删除
MaxConnLifetime time.Duration

// 判断ch 是否被初始化了
chInitialized uint32 // set to 1 when field ch is initialized

// 锁,这块也给出了以后使用锁的时候一些经验,锁尽量要在某个对象的内部,并且指明哪些变量会用到该锁。
mu sync.Mutex // mu protects the following fields
closed bool // set to true when the pool is closed.
active int // the number of open connections in the pool
// 当p.Wait为true的时候,利用此channel实现阻塞
ch chan struct{} // limits open connections when p.Wait is true
// 存放闲置连接的链表
idle idleList // idle connections

// 等待获取连接的数量
waitCount int64 // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.
}

我们可以看到,其中有几个关键性的字段比如最大活动连接数、最大闲置连接数、闲置链接过期时间、连接生存时间等。

我们知道 连接池最重要的就是两个方法,一个是获取连接,一个是关闭连接。我们来看一下代码:

Get源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// get prunes stale connections and returns a connection from the idle list or
// creates a new connection.
func (p *Pool) get(ctx context.Context) (*poolConn, error) {

// Handle limit for p.Wait == true.
// 处理是否需要等待, pool Wait如果是true, 则等待连接释放
var waited time.Duration
if p.Wait && p.MaxActive > 0 {
// 这里用到了懒加载的方式,也是单例模式中常用的一种模式,初始化pool中的ch channel
p.lazyInit()

// wait indicates if we believe it will block so its not 100% accurate
// however for stats it should be good enough.
// 如果len(p.ch) == 0,则意味着所有的连接都正在被使用中,则需要等待别的client使用完之后,释放此连接
wait := len(p.ch) == 0
var start time.Time
if wait {
start = time.Now()
}
if ctx == nil {
<-p.ch // 阻塞在此处,一旦有别的连接被close,则就相当于获取到了一个连接。
} else {
select {
case <-p.ch:
case <-ctx.Done():
return nil, ctx.Err()
}
}
if wait {
// 计算等待的时间,time.Since是time.Now().Sub(t)的封装
waited = time.Since(start)
}
}

p.mu.Lock()

if waited > 0 {
p.waitCount++
p.waitDuration += waited
}

// Prune stale connections at the back of the idle list.
// 所有的连接通过链表链接在一起,链表头部是最新的连接,尾部是最旧的连接,因为close方法会把释放的连接放到链表的头部。
// 删除链表尾部的陈旧连接,删除超时的连接
// 连接close之后,连接会回到pool的idle(闲置)链表中
if p.IdleTimeout > 0 {
n := p.idle.count
for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
pc := p.idle.back
p.idle.popBack()
p.mu.Unlock()
// 疑问?,为什么要在conn close的时候,要先释放锁呢?
pc.c.Close()
p.mu.Lock()
p.active--
}
}

// Get idle connection from the front of idle list.
// 从链表的头部获取空闲连接
for p.idle.front != nil {
pc := p.idle.front
p.idle.popFront()
p.mu.Unlock()
// 调用验证函数如果返回错误不为nil 关闭连接拿下一个
// 判断连接生存时间 大于生存时间则关闭拿下一个
if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
// 如果验证通过,则直接返回此
return pc, nil
}
pc.c.Close()
p.mu.Lock()
p.active--
}

// Check for pool closed before dialing a new connection.
// 判断连接池是否被关闭 如果关闭则解锁报错
if p.closed {
p.mu.Unlock()
return nil, errors.New("redigo: get on closed pool")
}

// Handle limit for p.Wait == false.
// 如果活动连接大于最大连接数,则返回错误
if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
p.mu.Unlock()
return nil, ErrPoolExhausted
}
// 如果在链表中没有获取到可用的连接 并添加active数量添加
p.active++
p.mu.Unlock()
c, err := p.dial(ctx)
// 如果调用失败 则减少active数量
if err != nil {
c = nil
p.mu.Lock()
p.active--
if p.ch != nil && !p.closed {
// 这里主要是处理Wait=true的情况,因为在Wait=true的时候,由于 <-p.ch 代码已经浪费一个channel,所以此处需要补充一个。
p.ch <- struct{}{}
}
p.mu.Unlock()
}
return &poolConn{c: c, created: nowFunc()}, err
}

下面是lazyInit的源码,跟单例模式中的懒加载模式是一样的。这里叫Fast Path 和 Slow Path。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (p *Pool) lazyInit() {
// Fast path.
if atomic.LoadUint32(&p.chInitialized) == 1 {
return
}
// Slow path.
p.mu.Lock()
if p.chInitialized == 0 {
p.ch = make(chan struct{}, p.MaxActive)
if p.closed {
close(p.ch)
} else {
// 有多少个最大连接数,则初始化多少个channel
for i := 0; i < p.MaxActive; i++ {
p.ch <- struct{}{}
}
}
// 这里用到了atomic包中的原子操作
atomic.StoreUint32(&p.chInitialized, 1)
}
p.mu.Unlock()
}

Close方法和put源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (ac *activeConn) Close() error {
pc := ac.pc
if pc == nil {
return nil
}
ac.pc = nil
// 判断连接的状态 发送取消事务 取消watch
if ac.state&connectionMultiState != 0 {
pc.c.Send("DISCARD")
ac.state &^= (connectionMultiState | connectionWatchState)
} else if ac.state&connectionWatchState != 0 {
pc.c.Send("UNWATCH")
ac.state &^= connectionWatchState
}
if ac.state&connectionSubscribeState != 0 {
pc.c.Send("UNSUBSCRIBE")
pc.c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
pc.c.Send("ECHO", sentinel)
pc.c.Flush()
for {
p, err := pc.c.Receive()
if err != nil {
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
ac.state &^= connectionSubscribeState
break
}
}
}
pc.c.Do("")
// 把连接放入链表
ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
return nil
}


// 将连接 重新放入闲置链表
func (p *Pool) put(pc *poolConn, forceClose bool) error {
p.mu.Lock()
if !p.closed && !forceClose {
pc.t = nowFunc()
// 放入链表头部,保证了头部的连接都是最新的。
p.idle.pushFront(pc)
if p.idle.count > p.MaxIdle {
pc = p.idle.back
// 释放连接的时候,优先从尾部释放
p.idle.popBack()
} else {
pc = nil
}
}

if pc != nil {
p.mu.Unlock()
pc.c.Close()
p.mu.Lock()
p.active--
}

// 如果连接的ch 不为空 并且连接池没有关闭 则给channel中输入一个struct{}{}
// 如果在连接达到最大活动数量之后 再获取连接并且pool的Wait为ture 会阻塞线程等待连接被释放
if p.ch != nil && !p.closed {
p.ch <- struct{}{}
}
p.mu.Unlock()
return nil
}

总结

整个Pool整体流程,我大概画了一个图。
从初始化 =》获取 -》创建连接 =》返回连接 =》关闭连接 =》
其中还有一条线是Pool.Wait = true 会一直阻塞 一直到有连接Close 释放活动连接数 线程被唤醒返回闲置的连接
其实大部分的连接池都是类似的流程,比如goroutine,redis。
流程图

参考链接

Go Redigo 源码分析(二) 连接池
Redigo源码分析