|   1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
 | 
package logic
import (
	"context"
	"errors"
	"math/rand"
	"strconv"
	"time"
	"github.com/go-redis/redis/v8"
	"github.com/zeromicro/go-zero/core/logx"
	"github.com/zeromicro/go-zero/core/syncx"
	"reatang_demo_rpc/demo_rpc"
	"reatang_demo_rpc/internal/svc"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
)
// 全局初始化一个SingleFlight组件
var redisCacheSingleFlight syncx.SingleFlight
func init() {
	redisCacheSingleFlight = syncx.NewSingleFlight()
}
type RedisCacheLogic struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
	logx.Logger
}
func NewRedisCacheLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RedisCacheLogic {
	return &RedisCacheLogic{
		ctx:    ctx,
		svcCtx: svcCtx,
		Logger: logx.WithContext(ctx),
	}
}
// RedisCache 第一种:完全使用 SingleFlight 承压
func (l *RedisCacheLogic) RedisCache(in *demo_rpc.Request) (*demo_rpc.Response, error) {
	cacheKey := "cache:" + in.GetPing()
	// 直接使用 SingleFlight 减少 redis和db的压力
	result, err := redisCacheSingleFlight.Do(cacheKey, func() (any, error) {
		var data string
		
		// 1、读取 cache
		data, err := l.svcCtx.CacheRedis.Get(l.ctx, cacheKey).Result()
		if err != nil && !errors.Is(err, redis.Nil) {
			return "", err
		}
	
		// 数据不存在或者过期
		if errors.Is(err, redis.Nil) {
			// 2、查询数据库
			data = strconv.Itoa(rand.Int()) // 模拟查询
			// 3、缓存数据
			l.svcCtx.CacheRedis.SetEX(l.ctx, cacheKey, data, time.Second*5)
			// (可选)标记读取了源数据
			header := metadata.Pairs("reload-cache", "true")
			_ = grpc.SendHeader(l.ctx, header)
		}
		// 4、返回数据
		return data, nil
	})
	if err != nil {
		return nil, err
	} else {
		return &demo_rpc.Response{
			Pong: result.(string),
		}, nil
	}
}
// RedisCache2 第二种:redis承压,SingleFlight保护DB
func (l *RedisCacheLogic) RedisCache2(in *demo_rpc.Request) (*demo_rpc.Response, error) {
	cacheKey := "cache2:" + in.GetKey()
	var data string
	// 读取 cache
	cacheResult, err := l.svcCtx.CacheRedis.Get(l.ctx, cacheKey).Result()
	if err != nil && !errors.Is(err, redis.Nil) {
		return nil, err
	}
	if errors.Is(err, redis.Nil) {
		// 没有读取到缓存,执行单飞让并发请求中的一个请求去读取,其他请求等待
		result, _ := redisCacheSingleFlight.Do(cacheKey, func() (any, error) {
			// 1、查询数据库
			dbData := strconv.Itoa(rand.Int()) // 模拟查询
			// 2、缓存数据
			l.svcCtx.CacheRedis.SetEX(l.ctx, cacheKey, dbData, time.Second*5)
			// (可选)标记读取了源数据
			header := metadata.Pairs("reload-cache", "true")
			_ = grpc.SendHeader(l.ctx, header)
			// (可选)暂停10ms,防止有请求瞬间未读取到redis数据,
			// 也错过SingleFlight数据,导致SingleFlight又被唤醒
			time.Sleep(10 * time.Millisecond)
			// 3、返回数据
			return dbData, nil
		})
		// 此处所有在等待的请求都会得到同样的数据
		data = result.(string)
	} else {
		// 读取到了缓存
		data = cacheResult
	}
	return &demo_rpc.Response{
		Pong: data,
	}, nil
}
 |