golang随波逐流之cache2go源码解读

想学习golang了,工作中又接触不到,基础语法一看全明白,真到用的时候却是瞪眼瞎,怎么办?
可以找简单的开源项目,看下源码,看看这些语法的常用方式,并且也可以自己尝试着去自己实现下。
cache2go就是一个比较简单的项目,也是初学者必撸的项目了吧。
本篇是我梳理cache2go的学习笔记。

cache2go概览

cache2go是用Go实现的并发安全的缓存库,主要功能有:

  • 并发安全
  • 缓存命中次数
  • 缓存过期

项目目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
xx/cache2go
├── LICENSE.txt
├── README.md
├── benchmark_test.go
├── cache.go
├── cache_test.go
├── cacheitem.go
├── cachetable.go
├── errors.go
└── examples
├── callbacks
│   └── callbacks.go
├── dataloader
│   └── dataloader.go
└── mycachedapp
└── mycachedapp.go

是不是很简单,功能代码一共也就四个文件,其中一个还是个error文件,里面只定义了两个error类型,其核心文件也就三个。
核心代码虽然不多,但是仔细读这三个文件,还是可以从中学习很多golang语法和知识点的,比如lock、goroutine、匿名函数等等。

强撸灰飞烟灭

对于一个程序员来说,阅读代码是一项必备的技能,因为在工作中肯定会遇到开发维护遗留项目的情况。
那么如何快速高效的阅读代码呢?俗话说强撸灰飞烟灭,所以肯定不能上来就一顿操作猛如虎。
针对不同开发人员开发的项目以及不同类型的项目需要有不同的方法,这里谈下我个人的几点看法:
1、公司内部项目可结合口头流传或者wiki以及项目本身的技术框架,从某个功能点进行切入
2、开源项目普遍有较为完善的文档和单元测试或者examples,可以从文档入手了解功能及使用方式,然后以单元测试或者examples作为代码的切入点。

cache2go是一个开源项目,里面有测试相关的代码,也有examples,所以先从examples代码入手。
examples中有三个文件,分别为mycachedapp.go、dataloader.go和callbacks.go,看名字应该先看mycachedapp.go这个文件,它应该是缓存的使用方式。

mycachedapp.go文件中介绍了cache的读写和删等常用操作,解析下关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 实力化一个缓存cache,名字为myCahe
cache := cache2go.Cache("myCache")
// 向缓存中加入元素,kv格式,第二个参数时key的过期时间,0代表用不过期
cache.Add("someKey", 5*time.Second, &val)
// 缓存中读取key对应的值
res, err := cache.Value("someKey")
// 设置删除某个key时的回调函数
cache.SetAboutToDeleteItemCallback(func(e *cache2go.CacheItem) {
fmt.Println("Deleting:", e.Key(), e.Data().(*myStruct).text, e.CreatedOn())
})
// 从缓存中删除某个key
cache.Delete("someKey")
// 清空缓存
cache.Flush()

通读mycachedapp.go之后,了解了cache的基本用法,下面我们更深一步,去探寻他具体是如何实现的。

Cache

mycachedapp.go中调用cache2go.Cache()初始化了一个缓存实例,接着跟进这个方法看下具体的细节。此方法位于cache.go文件中,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
var (
cache = make(map[string]*CacheTable)
mutex sync.RWMutex
)

func Cache(table string) *CacheTable {
// RLock读锁,读锁之间可以共享,Lock需要等待Rlock释放才能得到锁
mutex.RLock()
// 从map cache中获取table对应的值
t, ok := cache[table]
mutex.RUnlock()
// ok为true代表有值,false代表无值
if !ok {
// 互斥锁
mutex.Lock()
t, ok = cache[table]
// Double check whether the table exists or not.
if !ok {
// 创建一个CacheTable对象
t = &CacheTable{
name: table,
items: make(map[interface{}]*CacheItem),
}
cache[table] = t
}
mutex.Unlock()
}

return t
}

cache.go代码比较简单,先声明了两个全局变量,接着就是Cache()方法。该方法也比较简单,根据传入的参数table从map类型的cache中取出对应的值,有值则直接返回,无值则创建一个CacheTable类型的对象。这里使用了Double check机制来保证线程安全。

CacheTable

关键点就在于CacheTable,接下来看下CacheTable的具体实现。

CacheTable位于cachetable.go文件中,是一个结构体,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type CacheTable struct {
// 匿名属性
sync.RWMutex

// The table's name.
name string
// All cached items.
items map[interface{}]*CacheItem

// Timer responsible for triggering cleanup.
cleanupTimer *time.Timer
// 当前计时器持续时间
// Current timer duration.
cleanupInterval time.Duration

// The logger used for this table.
logger *log.Logger

// Callback method triggered when trying to load a non-existing key.
loadData func(key interface{}, args ...interface{}) *CacheItem
// Callback method triggered when adding a new item to the cache.
// 触发回调函数,回调函数可以不止一个
addedItem []func(item *CacheItem)
// Callback method triggered before deleting an item from the cache.
aboutToDeleteItem []func(item *CacheItem)
}

CacheTable的成员变量重点介绍下itemscleanupTimer和一些用于回调函数的函数变量,其中items是一个map,以key/value的形式存储具体的缓存数据;cleanupTimer是一个定时器,用于检查缓存对象的过期时间;剩下的loadData、addedItem和aboutToDeleteItem用于某些操作的回调函数。

接下来看下mycachedapp.go文件中对缓存使用相关的方法,增加缓存的方法Add,读取缓存中数据的方法Value和删除缓存中数据的方法Delete,这几个方法的首字母都是大写,代表方法是public。

Add

先看下Add方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
// key是缓存对象的key,data是缓存对象的值,lifeSpan是对象的过期时间
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
// golang中的“构造函数”
item := NewCacheItem(key, lifeSpan, data)
// 将item加入CacheTable中,这里涉及到多线程,所以需要加锁
// 锁的释放在addInternal方法中,是对锁粒度进行的优化
table.Lock()
table.addInternal(item)

return item
}

Add方法会根据传入的参数调用NewCacheItem方法构建一个CacheItem对象,由于golang中的结构体并没有构造函数的概念,对于需要构造函数的场景,一般都会使用NewStructName()的方式声明一个方法,作为结构体的构造函数使用。NewCacheItem位于cacheitem.go中,会在下一节介绍。

item创建成功之后,需要将其加入到map类型的items中,以便于后续数据的读取。由于此操作涉及到读写操作,在多线程下需要保证线程安全,所以在调用addInternal方法之前先获取锁,但是释放锁并没有在addInternal方法执行结束再释放,而是在addInternal代码中对相应资源读写结束之后释放,这样做减少了持有锁的时间,提高了并行度。addInternal代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (table *CacheTable) addInternal(item *CacheItem) {
// Careful: do not run this method unless the table-mutex is locked!
// It will unlock it for the caller before running the callbacks and checks
table.log("Adding item with key", item.key, "and lifespan of", item.lifeSpan, "to table", table.name)
// 将数据加入缓存中
table.items[item.key] = item

// 将值进行提前缓存,及时释放锁
expDur := table.cleanupInterval
// 得到Add的回调函数列表
addedItem := table.addedItem
// 未持有锁,而直接进行释放时,会报异常
table.Unlock()

// Trigger callback after adding an item to cache.
if addedItem != nil {
for _, callback := range addedItem {
callback(item)
}
}

// If we haven't set up any expiration check timer or found a more imminent item.
// 如果item的过期时间小于下次检查的时间间隔,则触发过期检查,重置过期时间间隔
if item.lifeSpan > 0 && (expDur == 0 || item.lifeSpan < expDur) {
table.expirationCheck()
}
}

addInternal方法才会真正的将数据添加到缓存中,并且调用相应的回调函数,并且这里还会触发过期检查。
数据的过期检查使用了Timer的定时功能,但是细心的童鞋会发现CacheTable在创建的时候并没有给Timer进行赋值,Timer是通过第一个加入到缓存中的数据激活的。
这样好处是缓存初始化之后,在添加数据之前不用让Timer去定期执行一些无意义的操作,而且在每加入一个数据就触发过期检查更能更及时的调整进行过期检查的时间点,因为缓存中数据的过期时间不定,无法设置一个合理的定期时间去周期性的执行,而且遍历一遍所有数据找出最先过期的数据,将其剩余时间作为下次检查的时间间隔
expirationCheck方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (table *CacheTable) expirationCheck() {
table.Lock()
// Timer定时功能的一种用法,结合下面对cleanupTimer的赋值方式即可理解
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
if table.cleanupInterval > 0 {
table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
} else {
// cleanupInterval 为0时,代表没有定时去执行过期检查任务
table.log("Expiration check installed for table", table.name)
}

// To be more accurate with timers, we would need to update 'now' on every
// loop iteration. Not sure it's really efficient though.
now := time.Now()
smallestDuration := 0 * time.Second
// 遍历缓存中的数据,找出最先过期的时间差
for key, item := range table.items {
// Cache values so we don't keep blocking the mutex.
item.RLock()
lifeSpan := item.lifeSpan
accessedOn := item.accessedOn
item.RUnlock()
// lifeSpan为0,代表用不过期
if lifeSpan == 0 {
continue
}
if now.Sub(accessedOn) >= lifeSpan {
// 过期的则删掉
table.deleteInternal(key)
} else {
// Find the item chronologically closest to its end-of-lifespan.
// 这个分支中 lifeSpan-now.Sub(accessedOn) 一定大于 0,如果小于0的话,会被if拦截
if smallestDuration == 0 || lifeSpan-now.Sub(accessedOn) < smallestDuration {
smallestDuration = lifeSpan - now.Sub(accessedOn)
}
}
}

// Setup the interval for the next cleanup run.
table.cleanupInterval = smallestDuration
// smallestDuration <= 0时,不再执行定时检查任务
// smallestDuration 只能等于或者大于0
if smallestDuration > 0 {
// smallestDuration大于0,则设置定时执行任务
table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
// 协程
go table.expirationCheck()
})
}
table.Unlock()
}

Add相关的代码就介绍到这里,下面看下Value

Value

Value方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (table *CacheTable) Value(key interface{}, args ...interface{}) (*CacheItem, error) {
table.RLock()
r, ok := table.items[key]
// 缓存中没有命中时到回调函数
loadData := table.loadData
table.RUnlock()

if ok {
// 缓存中有数据,调用CacheItem的KeepAlive更新访问时间和访问次数
r.KeepAlive()
return r, nil
}

// 回调函数不为null时,执行具体的回调函数,
// args参数是传递给回调函数的
if loadData != nil {
item := loadData(key, args...)
if item != nil {
table.Add(key, item.lifeSpan, item.data)
return item, nil
}
// 自定义错误类型
return nil, ErrKeyNotFoundOrLoadable
}
// 自定义错误类型
return nil, ErrKeyNotFound
}

在mycachedapp.go中调用的Value方法是cache.Value("someKey"),而这里的Value函数却是传入两个参数,这里的第二个参数是可变长度的参数,长度可以为0,所以可以传如一个参数。

Delete

Delete方法代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
table.Lock()
// 在方法执行完之后,执行Unlock
defer table.Unlock()

return table.deleteInternal(key)
}

func (table *CacheTable) deleteInternal(key interface{}) (*CacheItem, error) {
// golang从map中取值避免null值的语法
r, ok := table.items[key]
if !ok {
return nil, ErrKeyNotFound
}

// Cache value so we don't keep blocking the mutex.
// 删除数据时的回调函数
aboutToDeleteItem := table.aboutToDeleteItem
table.Unlock()

// Trigger callbacks before deleting an item from cache.
if aboutToDeleteItem != nil {
for _, callback := range aboutToDeleteItem {
callback(r)
}
}

r.RLock()
// 这里的defer是在函数的最后执行RUnlock吧
defer r.RUnlock()
// 为什么两个回调函数
if r.aboutToExpire != nil {
for _, callback := range r.aboutToExpire {
callback(key)
}
}

table.Lock()
table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
// map的系统函数进行数据删除
delete(table.items, key)

return r, nil
}

Delete方法于Add类似,都是在其内部调用了另一个内部函数,具体的逻辑在其内部函数中,这里是deleteInternal方法。代码和语法都比较简单,就不过多展开了。
接下来看下如何清空缓存,方法是Flush

Flush

Flush方法的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
func (table *CacheTable) Flush() {
table.Lock()
defer table.Unlock()

table.log("Flushing table", table.name)
// 直接将一个新的map赋值给items,老map等着自动回收
table.items = make(map[interface{}]*CacheItem)
table.cleanupInterval = 0
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
}

介绍到这里,cachetable.go中的代码就几乎撸完了,不过还有个功能需要提下,就是统计缓存中访问次数最多的数据,方法名为MostAccessed,代码也比较简单,重点介绍下这个排序功能。
排序是先将缓存中的数据放入切片中,然后调用sort.Sort对切片进行排序,调用sort.Sort时,需要传入的集合是可比类型的,而且还可以重写比较器,类似java中的使用方式。
代码中先声明了一个CacheItemPair结构体,用来作为比较对象,只存储Key和访问次数关键信息,减少内存消耗,其次声明一个切片类型的CacheItemPairList,并实现了可比的相关函数,代码如下:

1
2
3
4
5
6
7
8
9
10
type CacheItemPair struct {
Key interface{}
AccessCount int64
}

type CacheItemPairList []CacheItemPair

func (p CacheItemPairList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p CacheItemPairList) Len() int { return len(p) }
func (p CacheItemPairList) Less(i, j int) bool { return p[i].AccessCount > p[j].AccessCount }

至此,cachetable.go的相关代码就算介绍完了,剩下的就是些回调函数的设置以及一些常规用法,比较简单。
了解了cachetable.go相关代码之后,缓存的内存结构其实也就清楚了,也就对整个缓存的架构有了初步的了解,就感觉其实很简单,其内存结构图如下:
cache2go内存结构

CacheItem

CacheTable了解之后,再来看下CacheItem,在读CacheTable相关代码的时候其实已经涉及到一些CacheItem相关的代码,从中可以看出CacheItem代码并不复杂,比较简单易懂。

CacheItem相关的属性如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type CacheItem struct {
// 匿名变量
sync.RWMutex

// The item's key.
key interface{}
// The item's data.
data interface{}
// How long will the item live in the cache when not being accessed/kept alive.
lifeSpan time.Duration

// Creation timestamp.
createdOn time.Time
// Last access timestamp.
accessedOn time.Time
// How often the item was accessed.
accessCount int64

// Callback method triggered right before removing the item from the cache
aboutToExpire []func(key interface{})
}

在golang中,针对结构体并没有对行的构造函数,但是对于习惯使用构造函数的人来说该怎么办呢?于是有了一种约定俗成的语法,那就是将函数命名为NewXXX形式,以此表示该结构体的构造函数,cacheitem.go中就用到了这种命名方式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
func NewCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
t := time.Now()
return &CacheItem{
key: key,
lifeSpan: lifeSpan,
createdOn: t,
accessedOn: t,
accessCount: 0,
aboutToExpire: nil,
data: data,
}
}

代码看到这里应该对cache2go整个源码都已了解了,而且对golang也有了进一步的理解吧,只是其中还有很多语法以及用法不太熟悉,随后多看多写就OK了,下一篇我会根据这个项目中涉及到的知识点进行梳理扩展。

您的肯定,是我装逼的最大的动力!