groupcache 是一个分布式缓冲库
git clone git@github.com:colinrs/groupcache-db-example.git
cd groupcache-db-example
make run
├── byteview.go # 字节操作
├── byteview_test.go
├── consistenthash #一致性hash实现
├── groupcache.go # cache操作
├── groupcache_test.go
├── groupcachepb # pb文件
├── http.go # http 服务
├── http_test.go
├── lru # LRU 实现
├── peers.go # 操作peer
├── singleflight
├── sinks.go
└── testpb
type Group struct {
name string
getter Getter // 获取数据接口
peersOnce sync.Once // 保证初始化一次peer
peers PeerPicker // peer获取
cacheBytes int64 // 对缓存大小的限制接口
mainCache cache // mainCache 是分布式中本地分配到的cache部分
hotCache cache // hotcache是由于访问频率高而被复制到此节点的缓存,尽管本节点不是它的拥有者。
loadGroup flightGroup // 保证key只会获取一次
_ int32
Stats Stats
}
// InitCache ...
func InitCache(port string) {
// HTTP Server 设置
opt := &groupcache.HTTPPoolOptions{
Replicas: 1, // 缓存副本
BasePath: "/gouache/", // 缓存请求路径
}
// peers地址
cacheGroupHosts := []string{"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"}
// peer 初始化
peers := groupcache.NewHTTPPoolOpts("http://127.0.0.1:" + cachePort, opt)
peerMap := consistenthash.New(opt.Replicas, opt.HashFn)
peerMap.Add(cacheGroupHosts...)
cacheGroup := groupcache.NewGroup("SlowDBCache", 64<<20, groupcache.GetterFunc(
// 源数据获取实现
))
peers.Set(cacheGroupHosts...) //设置peers地址
logger.Info("cachegroup:%s slave starting on:127.0.0.1:%s",cacheGroup.Name(), cachePort)
// 开启HTTP服务
logger.Fatal(http.ListenAndServe(fmt.Sprintf("127.0.0.1:%s",cachePort),http.HandlerFunc(peers.ServeHTTP)))
}
func GetData(c *gin.Context) {
req := new(Req)
err := c.ShouldBind(req)
if err!=nil{
c.String(http.StatusOK, err.Error())
return
}
var b []byte
//Get方法就是groupcache获取数据的方法, b []byte 会存储获取到的值
apiCacheGroup.group.Get(c.Request.Context(), req.Key, groupcache.AllocatingByteSliceSink(&b))
result := map[string]interface{}{
"key": req.Key,
"value": string(b),
}
c.JSON(http.StatusOK, result)
}
// Get ...
func (g *Group) Get(ctx context.Context, key string, dest Sink) error {
g.peersOnce.Do(g.initPeers)
g.Stats.Gets.Add(1)
if dest == nil {
return errors.New("groupcache: nil dest Sink")
}
value, cacheHit := g.lookupCache(key) // 从本地的mainCache 和 hitCache 获取数据
if cacheHit {
g.Stats.CacheHits.Add(1)
return setSinkView(dest, value)
}
// Optimization to avoid double unmarshalling or copying: keep
// track of whether the dest was already populated. One caller
// (if local) will set this; the losers will not. The common
// case will likely be one caller.
destPopulated := false
value, destPopulated, err := g.load(ctx, key, dest) // 从数据源或者peer获取数据
if err != nil {
return err
}
if destPopulated {
return nil
}
return setSinkView(dest, value)
}
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// 再一次从本地缓存获取, 因为在并发的情况下,有可能有一个协程已经将值获取到了并设置到本地缓存中
if value, cacheHit := g.lookupCache(key); cacheHit {
g.Stats.CacheHits.Add(1)
return value, nil
}
g.Stats.LoadsDeduped.Add(1)
var value ByteView
var err error
// 获取到peer
if peer, ok := g.peers.PickPeer(key); ok {
// 从peer获取到数据
value, err = g.getFromPeer(ctx, peer, key)
if err == nil {
g.Stats.PeerLoads.Add(1)
return value, nil
}
g.Stats.PeerErrors.Add(1)
// TODO(bradfitz): log the peer's error? keep
// log of the past few for /groupcachez? It's
// probably boring (normal task movement), so not
// worth logging I imagine.
}
// 从数据源获取到值,也就是我们在初始化的注册的 Getter 接口
value, err = g.getLocally(ctx, key, dest)
if err != nil {
g.Stats.LocalLoadErrs.Add(1)
return nil, err
}
g.Stats.LocalLoads.Add(1)
destPopulated = true // only one caller of load gets this return value
g.populateCache(key, value, &g.mainCache) // 从数据源获取到的数据缓存在mainCache中,同时也会根据缓存大小清除hotCache中较少使用的
return value, nil
})
if err == nil {
value = viewi.(ByteView)
}
return
}
func (g *Group) getFromPeer(ctx context.Context, peer ProtoGetter, key string) (ByteView, error) {
req := &pb.GetRequest{
Group: &g.name,
Key: &key,
}
res := &pb.GetResponse{}
err := peer.Get(ctx, req, res) // 从perr获取,这里的peer是 httpGetter 的实例,最终是通过HTTP请求去请求peer
if err != nil {
return ByteView{}, err
}
value := ByteView{b: res.Value}
// TODO(bradfitz): use res.MinuteQps or something smart to
// conditionally populate hotCache. For now just do it some
// percentage of the time.
if rand.Intn(10) == 0 {
g.populateCache(key, value, &g.hotCache) // 从peer获取到的数据是设置到hotCache中
}
return value, nil
}
// 设置peer集群
cacheGroupHosts := []string{"http://127.0.0.1:8001", "http://127.0.0.1:8002", "http://127.0.0.1:8003"}
// 初始化本地peer
peers := groupcache.NewHTTPPoolOpts("http://127.0.0.1:" + cachePort, opt)
// 设置peer集群
peers.Set(cacheGroupHosts...)
// peer 提供HTTP 服务供其他的peer来查询数据
logger.Fatal(http.ListenAndServe(fmt.Sprintf("127.0.0.1:%s",cachePort),http.HandlerFunc(peers.ServeHTTP)))
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(p.opts.Replicas, p.opts.HashFn)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{transport: p.Transport, baseURL: peer + p.opts.BasePath}
}
}
// Add adds some keys to the hash.
func (m *Map) Add(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
m.keys = append(m.keys, hash)
m.hashMap[hash] = key
}
}
sort.Ints(m.keys)
}
// Get gets the closest item in the hash to the provided key.
func (m *Map) Get(key string) string {
if m.IsEmpty() {
return ""
}
hash := int(m.hash([]byte(key)))
// Binary search for appropriate replica.
idx := sort.Search(len(m.keys), func(i int) bool { return m.keys[i] >= hash })
// Means we have cycled back to the first replica.
if idx == len(m.keys) {
idx = 0
}
return m.hashMap[m.keys[idx]]
}
// load loads key either by invoking the getter locally or by sending it to another machine.
func (g *Group) load(ctx context.Context, key string, dest Sink) (value ByteView, destPopulated bool, err error) {
g.Stats.Loads.Add(1)
viewi, err := g.loadGroup.Do(key, func() (interface{}, error) {
// 从数据源或者peer获取数据
})
return
}
// flightGroup的Do方法
// 使用 mux 保证只会有一个协程在设置 g.m
// g.m 用来判断是否有key存在
// 使用 call (实际上就是 WaitGroup 包了一次)保证第二个请求同一个Key时需要等到前一个请求完成,直接使用前一个请求的结果就可以
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok { //如果已经有一个key初始化了,那么只需要等到请求完成就可以了,不需要再请求
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err // 使用前一个请求的结果就可以
}
c := new(call) // 如果没有,则西药初始化call
c.wg.Add(1)
g.m[key] = c // 设置key对应的call
g.mu.Unlock()
c.val, c.err = fn() // 实际的业务函数
c.wg.Done()
g.mu.Lock()
delete(g.m, key) // 删除key对应的call
g.mu.Unlock()
return c.val, c.err
}