跳至主要內容

分布式锁

Mr.Liu大约 10 分钟Python并发MySQLRedis

在并发编程中,经常会遇到多个线程访问同一个共享资源而这时候/我们就需要保证数据的“致性,那么就要用到锁的概念,给资源加上锁,拿到锁所有权的人才能够进行操作共享资源,没有拿到资源的线程需要等待,等其他线程使用完,释放锁。

在项目中,遇到多个用户抢购商品时、商品的数量就是共享资源。因此,在操作商品库存数据时,也需要使用锁保证商品库存的一致。

线程锁

如果在单服务器架构中,就可以使用线程锁保证数据的一致性。但是,此种方案不适合多服务器架构。

假设服务器1上使用线程锁保证同一时刻只有1个用户操作数据,却无法保证其他服务器上同时不能有用户操作,仍然会产生资源竞争。

因此,在多服务器架构中,不能使用线程锁的机制。

分布式锁

在上面的情况中,锁是分布在每个服务器上的。因此,不能保证所有服务器间数据的一致性。

想一想,可以将锁独立于各个服务器之外吗?

答案是肯定的,这就是分布式锁。

当确定了不同节点服务器之间需要分布式锁,那么我们就需要确定分布式锁到底有哪些功能?

互斥性:和本地锁一样,互斥性是最基本的,但是分布式锁需要保证在不同节点不同线程的互斥。

可重入性:同一个节点同一个线程如果获取了锁之后,那么也可以再次获取这个锁。

锁超时:和本地锁一样,支持锁超时,防止死锁。

高效、高可用:加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效。

常见的分布式锁实现方式:MySQLRedis

MySQL

通过MySQL实现分布式锁,有两种方式:悲观锁和乐观锁。

悲观锁

概念

悲观锁是基于一种悲观的态度来防止一切数据冲突。它是以一种预防的姿态在修改数据之前把数据锁住,然后再对数据读写,在它释放锁之前,其他任何人不能对数据操作,直到前一个人把锁释放后,下一个人才对数据进行加锁,然后进行数据操作。

一般的关系型数据库管理系统中,锁的机制都是基于悲观锁的机制实现的。

特点:可以完全保证数据的独占性和正确性,因为每次请求都会对数据进行加锁,然后进行数据操作,最后再解锁,而加锁解锁的过程会造成服务器性能消耗。因此,在高并发的情况下,用MySQL实现分布式锁,毫无疑问是不行的!!!

只是了解MySQL实现分布式锁的思想即可。

自定义锁表实现

首先,需要创建一个有关资源的锁记录表:

CREATE TABLE resourceLock(
    id INT UNSIGNED PRIMARY KEY AUTO_NCREMENT,
    resource_name VARCHAR(128) NOT NULL UNIQUE DEFAULT '' COMMENT '资源名字',
    node_info VARCHAR(128) DEFAULT NUUL COMMENT '机器信息',
    count INT UNSIGNED NOT NULL DEFAULT O COMMENT '锁的次数,实现重入性',
    desc_info VARCHAR(128) DEFAULT NULL COMMENT '额外的信息',
    update_time TIMESTAMP DEFAULT NULL COMMENT'更新时间',
    create_time TIMESTAMP DEFAULT NULL COMMENT '创建时间'
)ENGINE=INNODB DEFAULT CHARSET=utf8;

定义锁的工具类

import time
from django.db import connection, transaction


class MysqlLock(object):
    def lock(self, resource, node_info, block=False, timeout=10):
        """
        默认非阻塞实现
        :param resource: 需要加锁的资源名称
        :param node_info: 当前线程的身份信息
        :param block: 是否阻塞等待加锁
        :param timeout: 超时时间
        :return: True 或 False
        1.先开启事务
        2.根据参数block,决定是否阻塞等待锁的获取,即使阻塞等待,超时仍然会退出
        3.查询表中,是否已经存在有关该资源的锁记录
        4.假设已经存在锁,判断是否为当前线程添加: 是,就把锁记录加1,返回Teue;不是,返回False
        5.假设不存在锁,添加锁记录,但是,资源名称唯一,表中进没有强制开锁,因此有失败的可能。所以,成动,返回True,失败,返回False
        """
        pass
        with connection.cursor() as cursor:
            start = time.time()
            n = 1
            while n:
                n = 1 if block else 0
                now = time.time()
                if now-start > timeout:
                    return False
                cursor.execute(
                    'select id,node_info,count from resourceLock where resource_name=%s', [resource])
                row = cursor.fetchone()
                if row:
                    idi, node_infol, count1 = row
                    if node_info == node_infol:
                        cursor.execute(
                            'update resourceLock set count=%s', [count1 + 1])
                        return True
                    elif block is False:
                        return False
                else:
                    try:
                        cursor.execute('insert into resourceLock values(0,%s,%s,%s,1,'', %s,%s)', [
                                       resource, node_info, now, now])
                    except:
                        if block is False:
                            return False
                    return True

    def unlock(self, resource, node_info):
        """
        释放锁
        :param resource:要加锁的资源名称
        :param node_info:当前线程的身份信息
        :return:True或False
        1.先开启事务
        2.查询表中,是否已经存在当前线程有关该资源的锁记录
        3.假设存在锁记录,如果次数大于1,就更新次数减1,否则,删除该条记录,最后返回True
        4.假设不存在所记录,则返回False
        """
        with connection.cursor() as cursor:
            with transaction.atomic():
                cursor.execute('select id,node_info,count from resourceLock where resource_name=%s and node_info=%s',[resource, node_info])
                row = cursor.fetchone()
                if not row:
                    return False
                id1, node_infol, count1 = row
                if count1 > 1:
                    cursor.execute(
                        'update resourceLock set count=%s', [count1-1])
                else:
                    cursor.execute('delete from resourceLock where resource_name=%s and node_info=%s',[resource, node_info])
                return True

每次操作资源之前,进行加锁,加锁成功,继续操作;否则,就不操作;操作成功,释放锁。

上面这种实现有以下几个问题:

  • 这把锁高度依赖数据库的可用性,数据库是个单点,一旦挂掉,会导致业务系统不可用
  • 这把锁没有关闭时间,一旦解锁操作失败,就会导致锁记录一直存在数据库中,其他线程无法再获取锁

针对上面的问题,可以对症下药:

  • 数据库是单点?实现数据库之间的主从同步,一个MySQL服务挂掉,立即切换到从机
  • 没有失效时间?实现一个定时服务,每隔一定时间把数据库中的超时数据清理掉

MySQL行锁实现

开启事务; 开启需要操作的MySQL表记录的行锁,防止其他操作干扰;

操作完成,提交事务;

假设需要更新商品id为3的库存减一

SQL语句实现:

begin; # 开启事务
select count from tb_goods where id=3 for update; # 查询库存为20,并开启行锁
update tb_goods set count=19 where id=3; # 设置库存减一
commit; # 提交事务

Django实现:

with transaction.atomicC():
    try:
        goods = Goods.objects.select_for_update().get(id=3) # 开启行锁
    except:
        transaction.rollback() # 失败,进行回滚
    goods.count -= 1 # 修改库存
    goods.save() # 保存库
    transaction.commit() # 提交事务

乐观锁

所谓的乐观锁;是对数据冲突保持 种乐观态度,假设数据一般情况下,不会产生冲突。所以,在操作数据时不会对操作的数据进行加锁(这使得多个任务可以并行得对数据进行操作),只有在数据提交时,才通过一种机制对数据的冲突与否进行检测。如果,发现数据冲突了;则返回错误信息,让用户决定如何去做。

特点:乐观锁是一种并发型的锁,本身不对数据进行加锁,而是通过业务实现锁的功能。不对数据加锁就意味着允许多个请求同时访问数据,同时也省掉了对数据加锁、解锁过程,所以在一定程度下会提高性能。不过在高并发情况下,会导致大量的请求冲突,冲突会导致大部分操作无功而返,浪费系统资源。所以,在高并发情况下,乐观锁性能反而不如悲观锁。

SQL语句实现:

select id, count, version from tb_goods where id=3; # 在商品表中多一个数据的版本号version,查询库存为20,版本号为27
update tb_goods set count=19,version=28 where id=3 and version=27; # 将库存减1,版本号加1
# 如果更新成功,说明没有其他线程抢占资源,否则,说明资源已经被抢占,需要重新再来。

Django实现:

try:
    goods = Goads.objects.get(id=3) # 开启行锁
except:
    pass 
Goods.objects.filter(id=3, version=goods.version).update(count=goods.count+1)

Redis

Redis实现分布式锁原理是:一个线程在redis中存入一个value,其他线程也要存的时候,发现key已经有value,就只好放弃或稍后再试。

一般使用的命令是:setnxsetnx参数,只允许一个客户端添加数据,先到先得;再调用del删除数据,释放资源。

加锁

加锁的过程实际就是在redis中,给key设置个value。

106.128.28.10:0>keys *  # redis中不存在数据
106.128.28.10:0>setnx 'tb_goods' 1  # 将商品有关的标志存入redis
"1"
106.128.28.10:0>setnx 'tb_goods' 1 # 其他客户端再存,就无法存入
"0"

但是,此时存在一个问题,就是服务器1设置锁之后,突然挂掉,无法删除锁;同时其他服务器一直无法获取,导致死锁,怎么办?

很简单,设置过期时间。

106.128.28.10:0>setnx 'tb_goods' 1  # 先存
"1"
106.128.28.10:0>EXPIRE 'tb goods' 10  # 后设置过期时间
"1"

这种情况下,仍然会出现问题,因为不是一步操作,不具备原子性,可能会出现:第一步执行完,服务器挂掉,并没有执行第二步,此时,有两种操作方案可以解决这个问题:使用lua脚本或使用set指令。

106.128.28.10:0>keys *

106.228.28.10:0>set 'tb_goods' 1 ex 10 nx  # 此时就是原子操作,在设置值同时还设置过期时间
"OK"
106.128.28.10:0>keys *
1) "tb_goods"
106.128.28.10:0>set 'tb_goods' 1 ex 10 nx 
null

解锁

解锁的过程实际就是将key键删除。

106.128.28.10:0>del 'tb_goods'
"1"

注意:删除键的过程中不能乱删,也就是自己只能释放自己的锁,不允许释放别人的锁。

有人会有疑惑:在多线程场景中,线程A获取到锁,但如果A不释放锁,此时,线程B是获取不到的,怎么会释放别人的锁?

假设线程A和B都用同一个键加锁。线程A加锁成功,由于业务时间过长,超过了设置的超时时间。这时,redis会释放锁。此时,线程B就可以加锁成功,然后执行业务操作。恰巧,此时A执行完业务逻辑,删除键释放锁。此时,就会出现线程A删除线程B的锁。

如何解决这个问题呢?在set命令加锁时,value可以使用随机数,删除时,先判断是否相等,再删除。

因为这两步操作在redis中是分开执行的,所以,需要通过lua脚本,实现原子操作。

if redis.call('get',KEYS[1]) == ARGV[1] then 
    return redis.call('del', KEYS[1])
else 
    return 0
end

上面的代码解决了释放其他人的锁的问题。但是还是没有解决 锁超时的问题,怎么办呢?给锁续命

  • 到了超时时间,如果业务代码还没有执行完成,需要给锁自动续期。

  • 获取锁之后,自动开启一个定时任务,每个10s,自动刷新一次过期时间。

当然,还有锁竞争问题、锁重入问题、主从复制问题等,之后再说吧。