cache2go源码分析(二):cache2go缓存表和缓存项


cache2go源码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
├── LICENSE.txt
├── README.md
├── benchmark_test.go // 压力测试程序
├── cache.go // 缓存对象创建
├── cache_test.go // 测试程序
├── cacheitem.go // 缓存项
├── cachetable.go // 缓存表
├── errors.go // 自定义错误
└── examples // 用法案例
├── callbacks
│   └── callbacks.go
├── dataloader
│   └── dataloader.go
└── mycachedapp
└── mycachedapp.go

缓存表结构体

./cache2go/cachetable.go

缓存表是一个map结构,key为表名,value为CacheItem缓存对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 缓存表结构体
type CacheTable struct {
// 读写锁
sync.RWMutex

// 缓存表名称
name string
// 缓存表中的缓存项
items map[interface{}]*CacheItem

// 缓存表清理定时器
cleanupTimer *time.Timer
// 缓存表清理定时器时间间隔
cleanupInterval time.Duration

// 缓存表日志对象
logger *log.Logger

// 回调方法:当获取一个不存在的缓存对象key时触发
loadData func(key interface{}, args ...interface{}) *CacheItem
// 回调方法:当添加一个缓存对象到缓存表中时触发
addedItem func(item *CacheItem)
// 回调方法:当删除一个对象从缓存表中时触发
aboutToDeleteItem func(item *CacheItem)
}

缓存项结构体

./cache2go/cacheitem.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 缓存项结构体
type CacheItem struct {
// 读写锁
sync.RWMutex

// 缓存项key
key interface{}
// 缓存项数据
data interface{}
// 缓存项生命周期,0表示永不失效
lifeSpan time.Duration

// 缓存项创建时间戳
createdOn time.Time
// 缓存项访问的时间戳
accessedOn time.Time
// 缓存访问次数
accessCount int64

// 回调方法:当缓存项被移除时触发
aboutToExpire func(key interface{})
}

创建缓存表对象

./cache2go/cache.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var (
cache = make(map[string]*CacheTable)
mutex sync.RWMutex
)

// 返回一个缓存表对象
func Cache(table string) *CacheTable {
// 创建一个新的缓存表
mutex.RLock()
t, ok := cache[table]
mutex.RUnlock()

// 如果创建不成功则实例化CacheTable
if !ok {
t = &CacheTable{
name: table,
items: make(map[interface{}]*CacheItem),
}

mutex.Lock()
cache[table] = t
mutex.Unlock()
}

return t
}

添加缓存对象

./cache2go/cachetable.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 添加一个缓存对象到缓存表中
// 参数key:缓存项的key,通过key可以找到对应的数据
// 参数liftSpan:缓存项生命周期
// 参数data:缓存项的value(数据)
// 返回:缓存对象
func (table *CacheTable) Add(key interface{}, lifeSpan time.Duration, data interface{}) *CacheItem {
item := CreateCacheItem(key, lifeSpan, data)

// Add item to cache.
table.Lock()
table.log("Adding item with key", key, "and lifespan of", lifeSpan, "to table", table.name)
table.items[key] = &item

// Cache values so we don't keep blocking the mutex.
expDur := table.cleanupInterval
addedItem := table.addedItem
table.Unlock()

// 触发回调函数
if addedItem != nil {
addedItem(&item)
}

// 缓存项生命周期大于0则执行缓存项生命周期检查定时器
if lifeSpan > 0 && (expDur == 0 || lifeSpan < expDur) {
table.expirationCheck()
}

return &item
}

./cache2go/cacheitem.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建一个缓存项对象
// 参数key:缓存项的key,通过key可以找到对应的数据
// 参数liftSpan:缓存项周期
// 参数data:缓存项的value(数据)
func CreateCacheItem(key interface{}, lifeSpan time.Duration, data interface{}) CacheItem {
t := time.Now()
return CacheItem{
key: key,
lifeSpan: lifeSpan,
createdOn: t,
accessedOn: t,
accessCount: 0,
aboutToExpire: nil,
data: data,
}
}

设置删除时回调

./cache2go/cachetable.go

1
2
3
4
5
6
// 删除时回调:当删除一个缓存对象时回调该方法
func (table *CacheTable) SetAboutToDeleteItemCallback(f func(*CacheItem)) {
table.Lock()
defer table.Unlock()
table.aboutToDeleteItem = f
}

删除缓存对象

./cache2go/cachetable.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 删除一个缓存项从缓存表中
func (table *CacheTable) Delete(key interface{}) (*CacheItem, error) {
table.RLock()
r, ok := table.items[key]
if !ok {
table.RUnlock()
return nil, ErrKeyNotFound
}

// Cache value so we don't keep blocking the mutex.
aboutToDeleteItem := table.aboutToDeleteItem
table.RUnlock()

// Trigger callbacks before deleting an item from cache.
// 触发回调函数
if aboutToDeleteItem != nil {
aboutToDeleteItem(r)
}

r.RLock()
defer r.RUnlock()
if r.aboutToExpire != nil {
r.aboutToExpire(key)
}

table.Lock()
defer table.Unlock()
table.log("Deleting item with key", key, "created on", r.createdOn, "and hit", r.accessCount, "times from table", table.name)
delete(table.items, key)

return r, nil
}

清空所有缓存对象

./cache2go/cachetable.go

1
2
3
4
5
6
7
8
9
10
11
12
13
// 删除所有缓存项从缓存表中
func (table *CacheTable) Flush() {
table.Lock()
defer table.Unlock()

table.log("Flushing table", table.name)

table.items = make(map[interface{}]*CacheItem)
table.cleanupInterval = 0
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
}

缓存项失效定时器实现

./cache2go/cachetable.go

每次从缓存表中取值时会执行KeepAlive函数,更新accessedOn和accessCount字段,用于判断当前缓存项是否过期

1
2
3
4
5
if ok {
// Update access counter and timestamp.
r.KeepAlive()
return r, nil
}

./cache2go/cacheitem.go

1
2
3
4
5
6
7
// 更新缓存项访问的时间戳并访问次数+1
func (item *CacheItem) KeepAlive() {
item.Lock()
defer item.Unlock()
item.accessedOn = time.Now()
item.accessCount++
}

./cache2go/cachetable.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 缓存项生命周期检查定时器
func (table *CacheTable) expirationCheck() {
table.Lock()
if table.cleanupTimer != nil {
table.cleanupTimer.Stop()
}
if table.cleanupInterval > 0 {
table.log("Expiration check triggered after", table.cleanupInterval, "for table", table.name)
} else {
table.log("Expiration check installed for table", table.name)
}

// Cache value so we don't keep blocking the mutex.
items := table.items
table.Unlock()

// To be more accurate with timers, we would need to update 'now' on every
// loop iteration. Not sure it's really efficient though.
// 判断缓存项是否过期
// 使用当前时间戳(now) 减去 缓存项最新访问的时间戳(accessedOn)得到的时间如果大于等于缓存项的缓存周期则说明该缓存项已经过期,删除该key。否则获取下一次清理的时间间隔(smallestDuration)
now := time.Now()
smallestDuration := 0 * time.Second
for key, item := range items {
// Cache values so we don't keep blocking the mutex.
item.RLock()
lifeSpan := item.lifeSpan
accessedOn := item.accessedOn
item.RUnlock()

if lifeSpan == 0 {
continue
}
if now.Sub(accessedOn) >= lifeSpan {
// Item has excessed its lifespan.
table.Delete(key)
} else {
// Find the item chronologically closest to its end-of-lifespan.
if smallestDuration == 0 || lifeSpan < smallestDuration {
smallestDuration = lifeSpan - now.Sub(accessedOn)
}
}
}

// 设置下一次清理的时间间隔
table.Lock()
table.cleanupInterval = smallestDuration
if smallestDuration > 0 {
table.cleanupTimer = time.AfterFunc(smallestDuration, func() {
go table.expirationCheck()
})
}
table.Unlock()
}