在工作中, 我们经常会碰到重试的需求, 很多人会用定时任务来实现. 如果有更高的要求, 可以尝试下"延迟时间指数递增"的策略, 相比定时任务, 更加节省 CPU 和数据库资源.
为简化逻辑, 代码中没有使用 Redis
或 MySQL
维护失败次数.
package main
import (
"errors"
"fmt"
"github.com/lxzan/memorycache"
"math"
"sync/atomic"
"time"
)
const (
MaxRetryTimes = 17 // 最大重试次数
FirstDelay = 5 * time.Second // 初始延迟
)
func NewRetryer() *Retryer {
return &Retryer{
mc: memorycache.New[string, string](
memorycache.WithBucketNum(16),
memorycache.WithBucketSize(16, math.MaxInt64),
memorycache.WithInterval(time.Second, 5*time.Second),
),
}
}
type Retryer struct {
mc *memorycache.MemoryCache[string, string]
}
func (c *Retryer) Add(jobId string, jobFunc func() error) {
// 检查任务是否已存在
if _, exist := c.mc.GetOrCreate(jobId, jobId, -1); exist {
return
}
var callback = func(element *memorycache.Element[string, string], reason memorycache.Reason) {
// 只处理过期触发的回调
if reason != memorycache.ReasonExpired {
return
}
// 回调函数必须是非阻塞的; 酌情使用任务队列, 控制最大并发.
go func(id string) {
if err := jobFunc(); err == nil {
c.delete(id)
}
}(element.Value)
}
// 注册延迟及回调函数
var failedTimes = 1
var delay, delta = FirstDelay, FirstDelay
for i := 1; i <= MaxRetryTimes; i++ {
if i >= failedTimes {
var key = fmt.Sprintf("%s-%d", jobId, i)
c.mc.SetWithCallback(key, jobId, delay, callback)
}
delta *= 2
delay += delta
}
}
// 任务执行成功, 删除剩余的重试任务
func (c *Retryer) delete(jobId string) {
for i := 1; i <= MaxRetryTimes; i++ {
var key = fmt.Sprintf("%s-%d", jobId, i)
c.mc.Delete(key)
}
c.mc.Delete(jobId)
}
// 模拟一个任务
func newJob() (jobId string, jobFunc func() error) {
var counter = new(atomic.Int64)
return "1", func() error {
defer counter.Add(1)
serial := counter.Load()
if serial < 5 {
fmt.Printf("serial=%d, t=%d, success=false\n", serial, time.Now().Unix())
return errors.New("test")
}
fmt.Printf("serial=%d, t=%d, success=true\n", serial, time.Now().Unix())
return nil
}
}
func main() {
retryer := NewRetryer()
jobId, jobFunc := newJob()
if err := jobFunc(); err != nil {
retryer.Add(jobId, jobFunc)
}
select {}
}
serial=0, t=1705196714, success=false
serial=1, t=1705196719, success=false
serial=2, t=1705196729, success=false
serial=3, t=1705196749, success=false
serial=4, t=1705196789, success=false
serial=5, t=1705196869, success=true
1
0Z03ry75kWg9m0XS 313 天前
指数退避么
|
3
diagnostics 313 天前 2
这有啥好分享的,又不是大学生
|
4
Opportunity 313 天前 1
现在连重试都需要上 redis 了吗。。
|
5
zhangzEric 313 天前 via iPhone
@Opportunity 本地重试不需要,分布式异步重试就得有地方存储了
|
6
Nazz OP @Opportunity 我就用 redis 保存下失败次数,不想用 mysql
|
7
shellcodecow 305 天前
设计上
建议独立一个服务做调度功能 比如延迟调度、循环调度、递增调度 通过 GRPC 进行 timeout 或者 callback |
8
Nazz OP @shellcodecow 分布式是该这样
|