开发者

生产环境go-redsync使用示例

开发者 https://www.devze.com 2025-10-03 10:26 出处:网络 作者: qinyuan15
目录一、问题和意义二、官方入门示例存在的问题三、生产环境可用的版本四、 对go-redsync做封装一、问题和意义
目录
  • 一、问题和意义
  • 二、官方入门示例存在的问题
  • 三、生产环境可用的版本
  • 四、 对go-redsync做封装

一、问题和意义

go-redsync是go语言实现分布式锁的常用工具,但官方文档是的入门示例并不是一个可以直接用于生产环境的版本。很多人将官方文档中的入门示例使用到实际项目中导致了生产事故。故文本提供一个可以用于生产环境的使用示例。

二、官方入门示例存在的问题

官方示例代码为:

package main

import (
	goRedislib "github.com/redis/go-redis/v9"
	"github.com/go-redsync/redsync/v4"
	"github.com/go-redsync/redsync/v4/redis/goredis/v9"
)

func main() {
	// Create a pool with go-redis (or redigo) which is the pool redisync will
	// use while communicating with Redis. This can also be any pool that
	// implements the `redis.Pool` interface.
	client := goredislib.NewClient(&goredislib.Options{
		Addr: "localhost:6379",
	})
	pool := goredis.Newpool(client) // or, pool := redigo.NewPool(...)

	// Create an instance of redisync to be used to obtain a mutual exclusion
	// lock.
	rs := redsync.New(pool)

	// Obtain a new mutex by using the same name for all instances wanting thpythone
	// same lock.
	mutexname := "my-global-mutex"
	mutex := rs.NewMutex(mutexname)

	// Obtain a lock for our given mutex. After this is successful, no one else
	// can obtain the same lock (the same mutex name) until we unlock it.
	if err := mutex.Lock(); err != nil {
		panic(err)
	}

	// Do your work that requires the lock.

	// Release the lock so other processes or threads can obtain a lock.
	if ok, err := mutex.Unlock(); !ok || err != nil {
		panic("unlock failed")
	}
}

接下来,我们开两个协程实测一下。

// 这里省去创建redis连接的操作

mutexname := "mabMwzohFSUy-global-mutex"
var wg sync.WaitGroup // 用于实现主函数等待所有子协程执行完毕之后再退出
wg.Add(2)
go func() {
   defer wg.Done()
   mutex := rs.NewMutex(mutexname)
   // 开始尝试获得分布式锁
   if err := mutex.Lock(); err != nil {
      log.Errorf("failed to acquire lock in task1: %v", err)
      return
   }

   // 执行一些任务
   log.Info("task1 start at ", time.Now().Format("15:04:05.000"))
   time.Sleep(time.Second * 10) // 模拟一个耗时的任务
   log.Info("task1 end at ", time.Now().Format("15:04:05.000"))

   // 执行完任务,释放锁
   if _, err := mutex.Unlock(); err != nil {
      log.Errorf("failed to release lock in task1: %v", err)
   }
}()
go func() {
   defer wg.Done()
   mutex := rs.NewMutex(mutexname)
   // 开始尝试获得分布式锁
   if err := mutex.Lock(); err != nil {
      log.Errorf("failed to acquire lock in task2: %v", err)
      return
   }

   // 执行一些任务
   log.Info("task2 start at ", time.Now().Format("15:04:05.000"))
   time.Sleep(time.Second * 10) // 模拟一个耗时的任务
   log.Info("task2 end at ", time.Now().Format("15:04:05.000"))

   // 执行完任务,释放锁
   if _, err := mutex.Unlock(); err != nil {
      log.Errorf("failed to release lock in task2: %v", err)
   }
}()
wg.Wait()

程序执行结果如下:

INFO msg=task2 start at 02:22:00.330

INFO msg=task1 start at 02:22:08.508

INFO msg=task2 end at 02:22:10.330

ERROR msg=failed to release lock in task2: lock already taken, locked nodes: [0]

INFO msg=task1 end at 02:22:18.508

ERROR msg=failed to release lock in task1: lock already taken, locked nodes: [0]

可以看出,分布式锁并没有起作用,任务2还没执行完,任务1就已经获得锁并开始。原因是go-redsync默认的加锁时间只有8秒钟,如果一个任务执行时间超过8秒,则分布式锁会在任务执行结束前释放。

三、生产环境可用的版本

生产环境中,任务没结束时需要调用mutex.Extend()方法延长锁的时间

mutexname := "my-global-mutex"
var wg sync.WaitGroup // 用于实现主函数等待所有子协程执行完毕之后再退出
wg.Add(2)
go func() {
   defer wg.Done()
   mutex := client.NewMutex(mutexname)
   // 开始尝试获得分布式锁
   if err := mutex.Lock(); err != nil {
      log.Errorf("failed to acquire lock in task1: %v", err)
      return
   }
   var lockReleased atomic.Bool
   lockReleased.Store(false)

   go func() { // 只要当前任务还在执行,每过1秒就延长锁的过期时间
      for {
         time.Sleep(time.Second)
         if lockReleased.Load() {
            return
         }
         _, err := mutex.Extend()
         if err != nil {
            log.Errorf("extend lock in task1 fail: %v", err)
         }
      }
   }()

   log.Info("task1 start at ", time.Now().Format("15:04:05.000"))
   time.Sleep(time.Second * 10) // 模拟一个耗时的任务
   lpythonog.Info("task1 end at ", time.Now().Format("15:04:05.000"))

   // 执行完任务,释放锁
   if _, err := mutex.Unlock(); err != nil {
      log.Errorf("failed to release lock in task1: %v", err)
   }
   lockReleased.Store(true)
}()
go func() {
   defer wg.Done()
   mutex := client.NewMutex(mutexname)
   // 开始尝试获得分布式锁
   if err := mutex.Lock(); err != nil {
      log.Errorf("failed to acquire lock in task2: %v", err)
      return
   }
   var lockReleased atomic.Bool
   lockReleased.Store(false)

   go func() { // 只要当前任务还在执行,每过1秒就延长锁的过期时间
      for {
         time.Sleep(time.Second)
         if lockReleased.Load() {
            return
         }
         _, err := mutex.Extend()
         if err != nil {
            log.Errorf("extend lock in task2 fail: %v", err)
         }
      }
   }()

   log.Info("task2 start at ", time.Now().Format("15:04:05.000"))
   time.Sleep(time.Second * 10) // 模拟一个耗时的任务
   log.Info("tas编程客栈k2 end at ", time.Now().Format("15:04:05.000"))

   // 执行完任务,释放锁
   if _, err := mutex.Unlock(); err != nil {
      log.Errorf("failed to release lock in task2: %v", err)
   }
   lockReleased.Store(true)
}()
wg.Wait()

执行结果如下:

INFO msg=task2 start at 02:31:06.973

INFO msg=task2 end at 02:31:16.974

INFO msg=task1 start at 02:31:17.471

INFO msg=task1 end at 02:31:27.471

这一执行结果符合预期,任务1会在任务2执行完之后才能获得锁。

四、 对go-redsync做封装

前面的代码示例可用于生产,但代码过于冗长,每次使用分布式锁时都写那么多代码也太麻烦。我们可以将其封装为了个TryLock方法:

type LockHolder interface {
    ReleaseLock()
}

type lockHolder struct {
    mutex       *redsync.Mutex
    lockReleased atomic.Bool
}

func (h *lockHolder) ReleaseLock() {
    _, err := h.mutex.Unlock()
    if err != nil {
        log.Errorf("failed to release lock: %v", err)
    }
    h.lockReleased.Store(true)
}

func TryLock(mutexName string) (LockHolder, error) {
    rs := getRedsync()
    mutex := rs.NewMutex(mutexName)
    if err := mutex.Lock(); err != nil {
        log.Errorf("failed to acquire lock: %v", err)
        return nil, err
    }

    holder := &lockHolder{
        mutex:       mutex,
        lockReleased: atomic.Bool{},
    }
    holder.lockReleased.Store(false)

    go func() {
        for {
            time.Sleep(time.Second)
            if holder.lockReleased.Load() {
                return
            }
            _, err := mutex.Extend()
            if err != nil {
                log.Errorf("extend lock fail: %v", err)
            }
        }
    }()
    return holder, nil
}

接下来使用分布式锁的代码可以简化为:

mutexname := "my-global-mutex"
var wg sync.WaitGroup
wg.Add(2)
go func() {
   defer wg.Done()
   lock, err := TryLock(mutexname)
   if err != nil {
      log.Errorf("failed to acquire lock in task1: %v", err)
      return
   }
   deferphp lock.ReleaseLock()
   log.Info("task1 start at ", time.Now().Format("15:04:05.000"))
   time.Sleep(time.Second * 10)
   log.Info("task1 end at ", time.Now().Format("15:04:05.000"))
}()
go func() {
   defer wg.Done()
   lock, err := TryLock(mutexname)
   if err != nil {
      log.Errorf("failed to acquire lock in task2: %v", err)
      return
   }
   defer lock.ReleaseLock()
   log.Info("task2 start at ", time.Now().Format("15:04:05.000"))
   time.Sleep(time.Second * 10)
   log.Info("task2 end at ", time.Now().Format("15:04:05.000"))
}()
wg.Wait()

到此这篇关于生产环境go-redsync使用示例的文章就介绍到这了,更多相关生产环境go-redsync使用内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

0

精彩评论

暂无评论...
验证码 换一张
取 消

关注公众号