0%

Golang 使用 Zookeeper 实现分布式锁

什么是分布式锁?

分布式锁是一种在分布式系统中用于控制并发访问的机制。在分布式系统中,多个客户端可能会同时对同一个资源进行访问,这可能导致数据不一致的问题。分布式锁的作用是确保同一时刻只有一个客户端能够对某个资源进行访问,从而避免数据不一致的问题。

分布式锁的实现通常依赖于一些具有分布式特性的技术,如 ZooKeeperRedis、数据库等。这些技术提供了在分布式环境中实现互斥访问的机制,使得多个客户端在竞争同一个资源时能够有序地进行访问。

通过使用分布式锁,可以确保分布式系统中的数据一致性和并发访问的有序性,从而提高系统的可靠性和稳定性。

Zookeeper 与 Redis 的分布式锁对比

ZooKeeperRedis 都是常用的实现分布式锁的工具,但它们在实现方式、特性、适用场景等方面有一些区别。以下是 ZooKeeper 分布式锁与 Redis 分布式锁的比较:

实现方式

  • ZooKeeper 分布式锁主要依赖于其临时节点和顺序节点的特性。客户端在 ZooKeeper 中创建临时顺序节点,并通过监听机制来实现锁的获取和释放。
  • Redis 分布式锁通常使用 SETNX(set if not exists) 命令来尝试设置一个 key,如果设置成功则获取到锁。也可以通过设置过期时间和轮询机制来防止死锁和提高锁的可靠性。

特性

  • ZooKeeper 分布式锁具有严格的顺序性和公平性,保证了锁的获取顺序与请求顺序一致,避免了饥饿问题。
  • Redis 分布式锁的性能通常更高,因为它是一个内存数据库,读写速度非常快。然而,它可能存在不公平性和死锁的风险,需要额外的机制来避免这些问题。

适用场景

  • ZooKeeper 分布式锁适用于对顺序性和公平性要求较高的场景,如分布式调度系统、分布式事务等。
  • Redis 分布式锁适用于对性能要求较高的场景,如缓存系统、高并发访问的系统等。Redis 的高性能使得它在处理大量并发请求时具有优势。

可靠性

  • ZooKeeper 分布式锁具有较高的可靠性,因为它依赖于 ZooKeeper 的高可用性和强一致性保证。即使部分节点宕机,ZooKeeper 也能保证锁的正确性和一致性。
  • Redis 分布式锁的可靠性取决于其实现方式和配置。在某些情况下,如 Redis 节点宕机或网络故障,可能会导致锁失效或死锁。因此,需要合理配置 Redis 和采取额外的措施来提高锁的可靠性。

综上所述,ZooKeeper 分布式锁和 Redis 分布式锁各有优缺点,具体选择哪种方式取决于实际业务场景和需求。在需要保证顺序性和公平性的场景下,ZooKeeper 分布式锁可能更适合;而在需要高性能和快速响应的场景下,Redis 分布式锁可能更合适。

为什么 Zookeeper 可以实现分布式锁

ZooKeeper 可以实现分布式锁,主要得益于其以下几个特性:

  1. 临时节点:ZooKeeper 支持创建临时节点,这些节点在创建它们的客户端会话结束时会被自动删除。这种特性使得 ZooKeeper 的节点具有生命周期,可以随着客户端的存活而存在,客户端断开连接后自动消失,非常适合作为锁的标识。
  2. 顺序节点:ZooKeeper 的另一个重要特性是支持创建顺序节点。在创建节点时,ZooKeeper 会在节点名称后自动添加一个自增的数字,确保节点在 ZNode 中的顺序性。这个特性使得 ZooKeeper 可以实现分布式锁中的公平锁,按照请求的顺序分配锁。
  3. Watcher 机制:ZooKeeper 还提供了 Watcher 机制,允许客户端在指定的节点上注册监听事件。当这些事件触发时,ZooKeeper 服务端会将事件通知到感兴趣的客户端,从而允许客户端做出相应的措施。这种机制使得 ZooKeeper 的分布式锁可以实现阻塞锁,即当客户端尝试获取已经被其他客户端持有的锁时,它可以等待锁被释放。

基于以上特性,ZooKeeper 可以实现分布式锁。具体实现流程如下:

  1. 客户端需要获取锁时,在 ZooKeeper 中创建一个临时顺序节点作为锁标识。
  2. 客户端判断自己创建的节点是否是所有临时顺序节点中序号最小的。如果是,则客户端获得锁;如果不是,则客户端监听序号比它小的那个节点。
  3. 当被监听的节点被删除时(即持有锁的客户端释放锁),监听者会收到通知,然后重新判断自己是否获得锁。
  4. 当客户端释放锁时,只需要将会话关闭,临时节点就会被自动删除,从而释放了锁。

因此,ZooKeeper 通过其临时节点、顺序节点和 Watcher 机制等特性,实现了分布式锁的功能。

使用 Golang 实现 Zookeeper 分布式锁

下面我们通过一个简单的例子来演示如何使用 Golang 实现 ZooKeeper 分布式锁。

创建 zookeeper 客户端连接

1
2
3
4
5
6
7
8
9
10
import "github.com/go-zookeeper/zk"

func client() *zk.Conn {
// 默认端口 2181
c, _, err := zk.Connect([]string{"192.168.2.168"}, time.Second)
if err != nil {
panic(err)
}
return c
}

创建父节点 - /lock

我们可以在获取锁之前,先创建一个父节点,用于存放锁节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Lock struct {
c *zk.Conn
}

// 父节点 /lock 不存在的时候进行创建
func NewLock() *Lock {
c := client()
e, _, err := c.Exists("/lock")
if err != nil {
panic(err)
}
if !e {
_, err := c.Create("/lock", []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
}

return &Lock{c: c}
}

获取锁

在 Zookeeper 分布式锁实现中,获取锁的过程实际上就是创建一个临时顺序节点,并判断自己是否是所有临时顺序节点中序号最小的。

获取锁的关键是:

  1. 创建的需要是临时节点
  2. 创建的需要是顺序节点

具体创建代码如下:

1
p, err := l.c.Create("/lock/lock", []byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))

其中 zk.FlagEphemeral 表示创建的是临时节点,zk.FlagSequence 表示创建的是顺序节点。

判断当前创建的节点是否是最小节点

具体步骤如下:

  1. 通过 l.c.Children("/lock") 获取 /lock 下的所有子节点
  2. 对所有子节点进行排序
  3. 判断当前创建的节点是否是最小节点
  4. 如果是最小节点,则获取到锁,函数调用返回;如果不是,则监听前一个节点(这会导致函数调用阻塞)
1
2
3
4
5
6
7
8
9
10
11
12
13
childs, _, err := l.c.Children("/lock")
if err != nil {
return "", err
}

// childs 是无序的,所以需要排序,以便找到当前节点的前一个节点,然后监听前一个节点
sort.Strings(childs)

// 成功获取到锁
p1 := strings.Replace(p, "/lock/", "", 1)
if childs[0] == p1 {
return p, nil
}

不是最小节点,监听前一个节点

具体步骤如下:

  1. 通过 sort.SearchStrings 找到当前节点在所有子节点中的位置
  2. 调用 l.c.ExistsW 判断前一个节点是否依然存在(锁有可能在调用 ExistsW 之前已经被释放了),如果不存在则获取到锁
  3. 如果前一个节点依然存在,则阻塞等待前一个节点被删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 监听锁,等待锁释放
// 也就是说,如果当前节点不是最小的节点,那么就监听前一个节点
// 一旦前一个节点被删除,那么就可以获取到锁
index := sort.SearchStrings(childs, p1)
b, _, ev, err := l.c.ExistsW("/lock/" + childs[index-1])
if err != nil {
return "", err
}

// 在调用 ExistsW 之后,前一个节点已经被删除
if !b {
return p, nil
}

// 等待前一个节点被删除
<-ev

return p, nil

在调用 ExistsW 的时候,如果前一个节点已经被删除,那么 ExistsW 会立即返回 false,否则我们可以通过 ExistsW 返回的第三个参数 ev 来等待前一个节点被删除。

<-ev 处,我们通过 <-ev 来等待前一个节点被删除,一旦前一个节点被删除,ev 会收到一个事件,这个时候我们就可以获取到锁了。

释放锁

如果调用 Lock 可以成功获取到锁,我们会返回当前创建的节点的路径,我们可以通过这个路径来释放锁。

1
2
3
func (l *Lock) Unlock(p string) error {
return l.c.Delete(p, -1)
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main

import (
"github.com/go-zookeeper/zk"
"sort"
"strings"
"time"
)

func client() *zk.Conn {
c, _, err := zk.Connect([]string{"192.168.2.168"}, time.Second) //*10)
if err != nil {
panic(err)
}
return c
}

type Lock struct {
c *zk.Conn
}

func NewLock() *Lock {
c := client()
e, _, err := c.Exists("/lock")
if err != nil {
panic(err)
}
if !e {
_, err := c.Create("/lock", []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil {
panic(err)
}
}

return &Lock{c: c}
}

func (l *Lock) Lock() (string, error) {
p, err := l.c.Create("/lock/lock", []byte(""), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
if err != nil {
return "", err
}
childs, _, err := l.c.Children("/lock")
if err != nil {
return "", err
}

// childs 是无序的,所以需要排序,以便找到当前节点的前一个节点,然后监听前一个节点
sort.Strings(childs)

// 成功获取到锁
p1 := strings.Replace(p, "/lock/", "", 1)
if childs[0] == p1 {
return p, nil
}

// 监听锁,等待锁释放
// 也就是说,如果当前节点不是最小的节点,那么就监听前一个节点
// 一旦前一个节点被删除,那么就可以获取到锁
index := sort.SearchStrings(childs, p1)
b, _, ev, err := l.c.ExistsW("/lock/" + childs[index-1])
if err != nil {
return "", err
}

// 在调用 ExistsW 之后,前一个节点已经被删除
if !b {
return p, nil
}

// 等待前一个节点被删除
<-ev

return p, nil
}

func (l *Lock) Unlock(p string) error {
return l.c.Delete(p, -1)
}

测试代码

下面这个例子模拟了分布式的 counter 操作,我们通过 ZooKeeper 分布式锁来保证 counter 的原子性。

当然这个例子只是为了说明 ZooKeeper 分布式锁的使用,实际上下面的功能通过 redis 自身提供的 incr 就可以实现,不需要这么复杂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"sync"
)

func main() {
var count = 1000
var wg sync.WaitGroup
wg.Add(count)

l := NewLock()
// 创建 redis 客户端连接
redisClient = redis.NewClient(&redis.Options{
Addr: "192.168.2.168:6379",
Password: "", // no password set
DB: 0, // use default DB
})

for i := 0; i < count; i++ {
go func(i1 int) {
defer wg.Done()

// 获取 Zookeeper 分布式锁
p, err := l.Lock()
if err != nil {
return
}
// 成功获取到了分布式锁:
// 1. 从 redis 获取 zk_counter 的值
// 2. 然后对 zk_counter 进行 +1 操作
// 3. 最后将 zk_counter 的值写回 redis
cmd := redisClient.Get(context.Background(), "zk_counter")
i2, _ := cmd.Int()
i2++
redisClient.Set(context.Background(), "zk_counter", i2, 0)
// 释放分布式锁
err = l.Unlock(p)
if err != nil {
println(fmt.Errorf("unlock error: %v", err))
return
}
}(i)
}

wg.Wait()

l.c.Close()
}

我们需要将测试程序放到不同的机器上运行,这样才能模拟分布式环境。

总结

最后,再来回顾一下本文内容:

  1. sync.Mutex 这种锁只能保证单进程内的并发安全,无法保证分布式环境下的并发安全。
  2. 使用 ZookeeperRedis 都能实现分布式锁,但是 Zookeeper 可以保证顺序性和公平性,而 Redis 可以保证高性能。
  3. Zookeeper 通过其临时节点、顺序节点和 Watcher 机制等特性,实现了分布式锁的功能。