什么是分布式锁?

分布式锁是一种用于分布式系统中多台机器或多个服务实例之间同步资源访问的机制。它可以确保同一时刻只有一个客户端或服务实例能够访问某个共享资源,从而避免并发访问引起的数据不一致性或资源冲突。

在分布式系统中,通常多个服务实例运行在不同的物理或虚拟机上,因此不能像单体应用一样使用传统的线程锁机制。在这种情况下,分布式锁成为了保证数据一致性、实现资源保护和避免竞争条件的关键技术。

分布式锁的应用场景

  • 数据库操作:多个服务访问同一个数据库时,避免数据冲突。
  • 队列消费:多个服务实例并发消费同一个队列中的消息,确保每条消息只被消费一次。
  • 限流:防止多个服务实例同时修改同一资源导致错误。
  • 任务调度:确保在集群环境中只有一个实例执行特定的定时任务。

实现分布式锁的三种方式

1. 基于 Redis 的分布式锁

Redis 是一种高效的内存数据库,常用于实现分布式锁。通过 Redis 的 SETNX 命令(SET if Not Exists)或者 Redis 的 RedLock 算法,可以实现高效的分布式锁。

使用 SETNX 实现分布式锁

Redis 的 SETNX 命令用于设置一个键的值,只有当该键不存在时才会设置成功。如果键已经存在,命令执行无效。基于这个特性,可以在 Redis 中设置一个表示锁的键,如果该键不存在,表示当前没有锁,可以设置锁;如果键已存在,则表示锁已经被其他实例占用。

实现步骤

  1. 客户端尝试向 Redis 设置一个锁(键)。如果该键不存在,Redis 会成功设置,并返回 OK,表示获取到锁。
  2. 如果该键已经存在,表示锁被其他客户端持有,客户端需要等待或返回失败。
  3. 客户端在使用完资源后,会删除该锁(键)。
import redis
import time

# 创建 Redis 连接
r = redis.StrictRedis(host='localhost', port=6379, db=0)

def acquire_lock(lock_name, timeout=10):
    lock_key = f"lock:{lock_name}"
    lock_value = "locked"

    # 设置锁,如果锁已经存在,SETNX 会返回 0,表示获取锁失败
    if r.setnx(lock_key, lock_value):
        # 设置锁的过期时间,避免死锁
        r.expire(lock_key, timeout)
        return True
    return False

def release_lock(lock_name):
    lock_key = f"lock:{lock_name}"
    r.delete(lock_key)

# 使用示例
if acquire_lock("resource_lock"):
    print("获得锁,执行任务")
    time.sleep(5)  # 模拟任务执行
    release_lock("resource_lock")
    print("释放锁")
else:
    print("无法获取锁")
RedLock 实现分布式锁

RedLock 是 Redis 官方提出的分布式锁算法,旨在提高分布式环境中 Redis 锁的可靠性。它通过在多个独立的 Redis 实例上进行加锁,确保锁的安全性和健壮性。RedLock 算法在多个 Redis 实例之间进行分布式锁的获取和释放,减少了单点故障带来的风险。


2. 基于数据库的分布式锁

数据库是许多应用的核心存储系统,因此我们也可以利用数据库来实现分布式锁。通常,基于数据库的分布式锁会创建一张专门存储锁信息的表(例如 locks 表),记录锁的状态。

实现步骤
  1. 在数据库中创建一张锁表,表中存储锁的标识和锁的持有者信息。
  2. 使用数据库的 SELECT FOR UPDATE 语句来锁定某行数据,这样可以保证只有一个客户端可以获取锁。
  3. 当客户端完成任务后,删除锁信息,释放锁。
-- 创建锁表
CREATE TABLE locks (
    lock_name VARCHAR(255) PRIMARY KEY,
    locked_by VARCHAR(255),
    locked_at TIMESTAMP
);

Python 示例(使用 MySQL)

import mysql.connector
import time

def acquire_lock(lock_name, timeout=10):
    conn = mysql.connector.connect(user='root', password='password', host='localhost', database='test_db')
    cursor = conn.cursor()

    try:
        # 尝试插入锁信息,如果锁已存在,则返回失败
        cursor.execute(f"SELECT * FROM locks WHERE lock_name = '{lock_name}' FOR UPDATE")
        lock = cursor.fetchone()

        if lock:
            print("Lock already acquired")
            return False

        # 插入锁信息
        cursor.execute(f"INSERT INTO locks (lock_name, locked_by, locked_at) VALUES ('{lock_name}', 'server1', NOW())")
        conn.commit()
        return True
    finally:
        cursor.close()
        conn.close()

def release_lock(lock_name):
    conn = mysql.connector.connect(user='root', password='password', host='localhost', database='test_db')
    cursor = conn.cursor()

    try:
        cursor.execute(f"DELETE FROM locks WHERE lock_name = '{lock_name}'")
        conn.commit()
    finally:
        cursor.close()
        conn.close()

# 使用示例
if acquire_lock("resource_lock"):
    print("获得锁,执行任务")
    time.sleep(5)  # 模拟任务执行
    release_lock("resource_lock")
    print("释放锁")
else:
    print("无法获取锁")
注意事项
  • 性能问题:数据库锁通常会涉及到磁盘 I/O 操作,因此性能不如 Redis。
  • 死锁:如果业务逻辑中没有正确处理锁的释放,可能会导致死锁。

3. 基于 Zookeeper 的分布式锁

Zookeeper 是一个高可用的分布式协调服务,它可以为分布式系统提供同步、命名、配置管理等服务。在 Zookeeper 中,客户端可以通过创建临时节点的方式来实现分布式锁。

实现步骤
  1. 客户端通过 Zookeeper 创建一个临时节点。
  2. 如果其他客户端已经持有锁,则会阻塞直到锁被释放。
  3. 临时节点会在客户端断开连接时自动删除,防止死锁。

Zookeeper 锁的实现原理

  • 使用 Zookeeper 的 临时顺序节点(Ephemeral Sequential Nodes)来实现锁。
  • 客户端首先创建一个顺序节点,Zookeeper 会为每个节点分配一个唯一的顺序编号。
  • 客户端查看自己创建的节点和其他节点的顺序编号,如果当前节点是编号最小的节点,则认为获取锁成功,否则等待编号最小的节点被删除后再次尝试获取锁。
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
import time

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

def acquire_lock(lock_name):
    lock_path = f"/lock/{lock_name}"
    try:
        # 创建临时顺序节点
        lock_node = zk.create(lock_path, ephemeral=True, sequence=True)
        print(f"Lock acquired: {lock_node}")
        return True
    except NodeExistsError:
        print("Lock already exists")
        return False

def release_lock(lock_name):
    lock_path = f"/lock/{lock_name}"
    zk.delete(lock_path)
    print(f"Lock released: {lock_path}")

# 使用示例
if acquire_lock("resource_lock"):
    print("获得锁,执行任务")
    time.sleep(5)  # 模拟任务执行
    release_lock("resource_lock")
    print("释放锁")
else:
    print("无法获取锁")

zk.stop()
Zookeeper 分布式锁优点
  • 高可靠性:Zookeeper 的高可用性保证了锁的安全性。
  • 自动解锁:Zookeeper 的临时节点机制确保了锁的自动释放。
缺点
  • 性能瓶颈:Zookeeper 对比 Redis 的性能要差,适合对一致性要求更高的场景。
  • 集群管理复杂:部署 Zookeeper 集群需要额外的运维工作。

总结

分布式锁是确保多个分布式服务之间对共享资源访问互斥的技术,主要有以下三种常见实现方式:

  1. 基于 Redis 的分布式锁:性能高,适合高并发场景,使用 SETNX 或 RedLock 算法。
  2. 基于数据库的分布式锁:通过数据库行锁或表锁实现,适合不频繁访问的场景,但性能较低。
  3. 基于 Zookeeper 的分布式锁:适用于高一致性要求的分布式系统,使用临时节点实现锁,但性能和复杂度较高。

选择合适的分布式锁方式取决于

你的应用场景、系统架构以及对性能和可靠性的需求。