1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
package logic
import (
"context"
"errors"
"math/rand"
"strconv"
"time"
"github.com/go-redis/redis/v8"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/syncx"
"reatang_demo_rpc/demo_rpc"
"reatang_demo_rpc/internal/svc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// 全局初始化一个SingleFlight组件
var redisCacheSingleFlight syncx.SingleFlight
func init() {
redisCacheSingleFlight = syncx.NewSingleFlight()
}
type RedisCacheLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewRedisCacheLogic(ctx context.Context, svcCtx *svc.ServiceContext) *RedisCacheLogic {
return &RedisCacheLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
// RedisCache 第一种:完全使用 SingleFlight 承压
func (l *RedisCacheLogic) RedisCache(in *demo_rpc.Request) (*demo_rpc.Response, error) {
cacheKey := "cache:" + in.GetPing()
// 直接使用 SingleFlight 减少 redis和db的压力
result, err := redisCacheSingleFlight.Do(cacheKey, func() (any, error) {
var data string
// 1、读取 cache
data, err := l.svcCtx.CacheRedis.Get(l.ctx, cacheKey).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return "", err
}
// 数据不存在或者过期
if errors.Is(err, redis.Nil) {
// 2、查询数据库
data = strconv.Itoa(rand.Int()) // 模拟查询
// 3、缓存数据
l.svcCtx.CacheRedis.SetEX(l.ctx, cacheKey, data, time.Second*5)
// (可选)标记读取了源数据
header := metadata.Pairs("reload-cache", "true")
_ = grpc.SendHeader(l.ctx, header)
}
// 4、返回数据
return data, nil
})
if err != nil {
return nil, err
} else {
return &demo_rpc.Response{
Pong: result.(string),
}, nil
}
}
// RedisCache2 第二种:redis承压,SingleFlight保护DB
func (l *RedisCacheLogic) RedisCache2(in *demo_rpc.Request) (*demo_rpc.Response, error) {
cacheKey := "cache2:" + in.GetKey()
var data string
// 读取 cache
cacheResult, err := l.svcCtx.CacheRedis.Get(l.ctx, cacheKey).Result()
if err != nil && !errors.Is(err, redis.Nil) {
return nil, err
}
if errors.Is(err, redis.Nil) {
// 没有读取到缓存,执行单飞让并发请求中的一个请求去读取,其他请求等待
result, _ := redisCacheSingleFlight.Do(cacheKey, func() (any, error) {
// 1、查询数据库
dbData := strconv.Itoa(rand.Int()) // 模拟查询
// 2、缓存数据
l.svcCtx.CacheRedis.SetEX(l.ctx, cacheKey, dbData, time.Second*5)
// (可选)标记读取了源数据
header := metadata.Pairs("reload-cache", "true")
_ = grpc.SendHeader(l.ctx, header)
// (可选)暂停10ms,防止有请求瞬间未读取到redis数据,
// 也错过SingleFlight数据,导致SingleFlight又被唤醒
time.Sleep(10 * time.Millisecond)
// 3、返回数据
return dbData, nil
})
// 此处所有在等待的请求都会得到同样的数据
data = result.(string)
} else {
// 读取到了缓存
data = cacheResult
}
return &demo_rpc.Response{
Pong: data,
}, nil
}
|