[Golang]Gozero Singleflight 库使用

SingleFlight: 单飞/单航。主要用于在并发场景下对多请求读取同一个数据进行控制,只允许其中一个请求真实发起读取,其他请求阻塞等待结果。

缓存三大坑:

  1. 穿透:因为错误的请求,缓存没有数据库也没有,导致一直触发DB操作。
    • 应对:
    • ① 为不存在的数据也构建空缓存。
    • ② 使用布隆过滤拦截不存在的数据。
  2. 击穿:缓存不存在或者过期的瞬间大量请求击穿缓存直接查询DB。
    • 应对:
    • ① 热点数据不设置过期时间,定期执行刷新(也要设计并发脏数据问题),或使用Canal之类的中间件做双写控制。
    • ② 使用并发控制,如SingleFlight。但是该工具也只应对单机并发,在数百个replicas下还需进一步优化。
  3. 雪崩:因为key的过期时间设置问题,导致大量缓存在短时间内失效。
    • 应对:
    • ① 设置宽范围的动态缓存时间
    • ② 上线预热数据
    • ③ 多级缓存架构

示例代码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

package logic

import (
	"context"
	"errors"
	"math/rand"
	"strconv"
	"time"

	"github.com/go-redis/redis/v8"
	"github.com/zeromicro/go-zero/core/logx"
	"github.com/zeromicro/go-zero/core/syncx"
	"reatang_demo_rpc/demo_rpc"
	"reatang_demo_rpc/internal/svc"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
)

// 全局初始化一个SingleFlight组件
var redisCacheSingleFlight syncx.SingleFlight

func init() {
	redisCacheSingleFlight = syncx.NewSingleFlight()
}

type RedisCacheLogic struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
	logx.Logger
}

func NewRedisCacheLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RedisCacheLogic {
	return &RedisCacheLogic{
		ctx:    ctx,
		svcCtx: svcCtx,
		Logger: logx.WithContext(ctx),
	}
}

// RedisCache 第一种:完全使用 SingleFlight 承压
func (l *RedisCacheLogic) RedisCache(in *demo_rpc.Request) (*demo_rpc.Response, error) {
	cacheKey := "cache:" + in.GetPing()

	// 直接使用 SingleFlight 减少 redis和db的压力
	result, err := redisCacheSingleFlight.Do(cacheKey, func() (any, error) {
		var data string
		
		// 1、读取 cache
		data, err := l.svcCtx.CacheRedis.Get(l.ctx, cacheKey).Result()
		if err != nil && !errors.Is(err, redis.Nil) {
			return "", err
		}
	
		// 数据不存在或者过期
		if errors.Is(err, redis.Nil) {
			// 2、查询数据库
			data = strconv.Itoa(rand.Int()) // 模拟查询

			// 3、缓存数据
			l.svcCtx.CacheRedis.SetEX(l.ctx, cacheKey, data, time.Second*5)

			// (可选)标记读取了源数据
			header := metadata.Pairs("reload-cache", "true")
			_ = grpc.SendHeader(l.ctx, header)
		}

		// 4、返回数据
		return data, nil
	})

	if err != nil {
		return nil, err
	} else {
		return &demo_rpc.Response{
			Pong: result.(string),
		}, nil
	}
}

// RedisCache2 第二种:redis承压,SingleFlight保护DB
func (l *RedisCacheLogic) RedisCache2(in *demo_rpc.Request) (*demo_rpc.Response, error) {
	cacheKey := "cache2:" + in.GetKey()
	var data string

	// 读取 cache
	cacheResult, err := l.svcCtx.CacheRedis.Get(l.ctx, cacheKey).Result()
	if err != nil && !errors.Is(err, redis.Nil) {
		return nil, err
	}

	if errors.Is(err, redis.Nil) {
		// 没有读取到缓存,执行单飞让并发请求中的一个请求去读取,其他请求等待
		result, _ := redisCacheSingleFlight.Do(cacheKey, func() (any, error) {
			// 1、查询数据库
			dbData := strconv.Itoa(rand.Int()) // 模拟查询

			// 2、缓存数据
			l.svcCtx.CacheRedis.SetEX(l.ctx, cacheKey, dbData, time.Second*5)

			// (可选)标记读取了源数据
			header := metadata.Pairs("reload-cache", "true")
			_ = grpc.SendHeader(l.ctx, header)

			// (可选)暂停10ms,防止有请求瞬间未读取到redis数据,
			// 也错过SingleFlight数据,导致SingleFlight又被唤醒
			time.Sleep(10 * time.Millisecond)

			// 3、返回数据
			return dbData, nil
		})

		// 此处所有在等待的请求都会得到同样的数据
		data = result.(string)
	} else {
		// 读取到了缓存
		data = cacheResult
	}

	return &demo_rpc.Response{
		Pong: data,
	}, nil
}