BigCache整体设计梳理:
- 使用shards方式实现了多个分片,避免全局锁带来的低性能。默认值是:1024
- 运用lifeWindow 实现key值过期删除,触发的时间在于有新key加入的时候,剔除掉过期的KV。
- key的哈希函数可以自定义,只要实现 Hasher 接口即可。
- 可以配置过期项删除时候的回调函数,你可能需要做一些额外的动作。
- bigcache在实例化的时候会开一个额外的协程处理过期项清除任务。
关于并发安全问题
处理并发唯一有效的就是锁,但这个锁的粒度可大可小,如果是全局锁,并发就上不去。如果锁的范围很小,并发能力就更高。这就是出现shards的原因。分片之后,每一个独立的分片有一个独立的锁。
Our service would receive many requests concurrently, so we needed to provide concurrent access to the cache. The easy way to achieve that would be to put sync.RWMutex in front of the cache access function to ensure that only one goroutine could modify it at a time. However other goroutines which would also like to make modifications to it, would be blocked, making it a bottleneck. To eliminate this problem, shards could be applied. The idea behind shards is straightforward. Array of N shards is created, each shard contains its own instance of the cache with a lock. When an item with unique key needs to be cached a shard for it is chosen at first by the function hash(key) % N. After that cache lock is acquired and a write to the cache takes place. Item reads are analogue. When the number of shards is relatively high and the hash function returns properly distributed numbers for unique keys then the locks contention can be minimized almost to zero. This is the reason why we decided to use shards in the cache.
BigCache 是如何减少GC时间的?
bigcache 结构体没多少内存。不存在GC问题。
cacheShard 结构体里的 hashmap[uint64]uint32 都是标量,因此因此可以绕过GC
cacheShard.entries 虽然是 queue.BytesQueue 类型,。 但是 BytesQueue.array 是一个很长的[]byte .包含了N多个元素。 因此即使GC,性能也不会有太大的问题。
bigcache 正是利用了将要存储的值转化为[]byte . 然后追加到 BytesQueue.array ,变成单一的一个大数组来减少GC时间。map如果里面的值是指针,那么gc会增对map里的每一个元素标记判断。因此不能用map。
Eviction
Eviction is performed during writes to the cache since the lock is already acquired.
wrapEntry
为什么 wrapEntry 里需要包含实际的 key 值?因为hash是有碰撞的,后面的设置值会覆盖掉前面的设置值。
wrapEntry == [timestame][hash][keyLength][key][entry]
[]byte [ leftMarginIndex.. ... ...head ... .... ...tail ... ... ... ... ... .. ..... .. .......].capacity
[]byte [ leftMarginIndex... ... ... .... ... ... ... ... ... ... .. .....head...tail.......].capacity
[]byte [ leftMarginIndex... ...tail ... .... ... ... ... ... ... ... .. .....head... .......].capacity
[..head.................[dataLen.inBytes=headerSize+bodySize][...data...]tail.......]
bigCache 引擎结构
type BigCache struct {
shards []*cacheShard // 分片,分片数量必须是 2^n 倍数
lifeWindow uint64 // 时间周期窗口,用秒表示
clock clock
hash Hasher // 哈希函数, default: fnv64a
config Config
shardMask uint64 // 给shard 用的
close chan struct{} // chan 用于通知关闭
}
缓存值删除有3种情况:
- 过期删除
- 没有空间,需要腾出空间,因此删除。
- 主动删除。
bigcache.go:48 newBigCache 用于初始化一个缓存实例
- 初始化 BigCache 结构体,
- 初始化Shard实例,如果分片了,那么会初始化多个。调用initNewShard 做初始化
- 用go起一个额外的协程去处理定时的过期清除动作和接收关闭信号。
cacheShard 结构体分析
type cacheShard struct {
hashmap map[uint64]uint32
entries queue.BytesQueue
lock sync.RWMutex
entryBuffer []byte
onRemove onRemoveCallback
isVerbose bool
statsEnabled bool
logger Logger
clock clock
lifeWindow uint64
hashmapStats map[uint64]uint32
stats Stats
}
// 初始化例子
return &cacheShard{
hashmap: make(map[uint64]uint32, config.initialShardSize()), // 初始大小
hashmapStats: make(map[uint64]uint32, config.initialShardSize()), // 初始大小
entries: *queue.NewBytesQueue(bytesQueueInitialCapacity, maximumShardSizeInBytes, config.Verbose),
entryBuffer: make([]byte, config.MaxEntrySize+headersSizeInBytes),
onRemove: callback,
isVerbose: config.Verbose,
logger: newLogger(config.Logger),
clock: clock,
lifeWindow: uint64(config.LifeWindow.Seconds()),
statsEnabled: config.StatsEnabled,
}
initNewShard 初始逻辑
从 BigCache.Set 最基本的操作分析 bigcache.go:136
// Set saves entry under the key
func (c *BigCache) Set(key string, entry []byte) error {
hashedKey := c.hash.Sum64(key) // 哈希算下,会落在那个shard里。
shard := c.getShard(hashedKey)
return shard.set(key, hashedKey, entry) // 调用shard实例,设置值。注意 entry 是 []byte类型。
}
//cacheShard.set 需要加锁处理。
type cacheShard struct {
hashmap map[uint64]uint32
entries queue.BytesQueue
lock sync.RWMutex
entryBuffer []byte
onRemove onRemoveCallback
isVerbose bool
statsEnabled bool
logger Logger
clock clock
lifeWindow uint64
hashmapStats map[uint64]uint32
stats Stats
}
queue.BytesQueue
// BytesQueue is a non-thread safe queue type of fifo based on bytes array.
// For every push operation index of entry is returned. It can be used to read the entry later
type BytesQueue struct {
full bool
array []byte
capacity int
maxCapacity int
head int
tail int
count int
rightMargin int
headerBuffer []byte
verbose bool
}
package binary
func PutUvarint(buf []byte, x uint64) int {
i := 0
for x >= 0x80 {
buf[i] = byte(x) | 0x80
x >>= 7 //
i++
}
buf[i] = byte(x)
return i + 1
}
过期数据检测的时机
- 单独的协程,每隔Xs执行一次。这个是针对所有分片,并且可以一次性删除多个过期数据。
- 每次写入新数据之前检测下单独的分片是否有过期的数据需要剔除。过期的数据总是在头部位置。
- 这个操作只正对正在操作的分片,并且只删除一个过期的数据。
bigcache 没有做序列化动作,接收的值只能是[]byte , 因此序列化的动作教给调用方完成,可以用json,也可以用protobuf。 bigcache 写入新数据的一个机制是,如果空间不够了,会强制删除头部元素。这个元素可能没有过期。 一直删除直到有足够的空间。
操作模块
- Set
- Get
- Append
- GetWithInfo
- Delete
Set
- 如果key存在,删除旧的。
- 试图删除分片里一个过期的元素
- 包装数据 entry =
[timestamp][hash][keyLength][strKey][data......]
- 包装数据写入ByteQueue
[head.size][body.data....]
- 如果 data.length < 127 , head = 1
- if data.length < 16382 ; header = 2;
- if data.length < 2097149; header = 3;
- neededSize = data.length + header
- 比如 data.length = 126 , 那么 neededSize = 127 , 那么 headerEntrySize = 1, 此时 q.headerBuffer = 127值
[127][126.bytes.data]
所以头部的值,是包含 q.headerBuffer 占用的字节数的。
- shard.count++
binary 包, 用途:数字和字节之间的转化。比如吧数字写入到[]byte , 然后从 []byte 读取数字。
Append
- 如果key不存在,就当做新的key-value插入处理。
- encoding.go:33:appendToWrappedEntry 包装里的时间戳用了新值。
[timestamp][hash][keyLength][strKey][data......][data2....]
为什么这里可以这么追加呢?因为外面默认了追加的数据和原来的一个整体,是不需要划分的。- setWrappedEntryWithoutLock - 猜想,头部大小肯定要变更了。 1. 先把旧的的key给清除掉。 在BytesQueue 尾部写入全新的数据。