分布式锁
在并发编程中,经常会遇到多个线程访问同一个共享资源而这时候,我们就需要保证数据的"一致性",那么就要用到锁的概念,给资源加上锁,拿到锁所有权的人才能够进行操作共享资源,没有拿到资源的线程需要等待,等其他线程使用完,释放锁。
在项目中,遇到多个用户抢购商品时、商品的数量就是共享资源。因此,在操作商品库存数据时,也需要使用锁保证商品库存的一致。
线程锁
如果在单服务器架构中,就可以使用线程锁保证数据的一致性。但是,此种方案不适合多服务器架构。
假设服务器1上使用线程锁保证同一时刻只有1个用户操作数据,却无法保证其他服务器上同时不能有用户操作,仍然会产生资源竞争。
因此,在多服务器架构中,不能使用线程锁的机制。
分布式锁
在上面的情况中,锁是分布在每个服务器上的。因此,不能保证所有服务器间数据的一致性。
想一想,可以将锁独立于各个服务器之外吗?
答案是肯定的,这就是分布式锁。
当确定了不同节点服务器之间需要分布式锁,那么我们就需要确定分布式锁到底有哪些功能?
互斥性:和本地锁一样,互斥性是最基本的,但是分布式锁需要保证在不同节点不同线程的互斥。
可重入性:同一个节点同一个线程如果获取了锁之后,那么也可以再次获取这个锁。
锁超时:和本地锁一样,支持锁超时,防止死锁。
高效、高可用:加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效。
常见的分布式锁实现方式:MySQL
、Redis
等
MySQL
锁
通过MySQL
实现分布式锁,有两种方式:悲观锁和乐观锁。
悲观锁
概念
悲观锁是基于一种悲观的态度来防止一切数据冲突。它是以一种预防的姿态在修改数据之前把数据锁住,然后再对数据读写,在它释放锁之前,其他任何人不能对数据操作,直到前一个人把锁释放后,下一个人才对数据进行加锁,然后进行数据操作。
一般的关系型数据库管理系统中,锁的机制都是基于悲观锁的机制实现的。
特点:可以完全保证数据的独占性和正确性,因为每次请求都会对数据进行加锁,然后进行数据操作,最后再解锁,而加锁解锁的过程会造成服务器性能消耗。因此,在高并发的情况下,用MySQL
实现分布式锁,毫无疑问是不行的!!!
只是了解MySQL
实现分布式锁的思想即可。
MySQL
行锁实现
开启事务;
开启需要操作的MySQL
表记录的行锁,防止其他操作干扰;
操作完成,提交事务;
假设需要更新商品id为3的库存减一
SQL
语句实现:
begin; # 开启事务
select count from tb_goods where id=3 for update; # 查询库存为20,并开启行锁
update tb_goods set count=19 where id=3; # 设置库存减一
commit; # 提交事务
Django
实现:
with transaction.atomicC():
try:
goods = Goods.objects.select_for_update().get(id=3) # 开启行锁
except:
transaction.rollback() # 失败,进行回滚
goods.count -= 1 # 修改库存
goods.save() # 保存库
transaction.commit() # 提交事务
乐观锁
所谓的乐观锁;是对数据冲突保持 种乐观态度,假设数据一般情况下,不会产生冲突。所以,在操作数据时不会对操作的数据进行加锁(这使得多个任务可以并行得对数据进行操作),只有在数据提交时,才通过一种机制对数据的冲突与否进行检测。如果,发现数据冲突了;则返回错误信息,让用户决定如何去做。
特点:乐观锁是一种并发型的锁,本身不对数据进行加锁,而是通过业务实现锁的功能。不对数据加锁就意味着允许多个请求同时访问数据,同时也省掉了对数据加锁、解锁过程,所以在一定程度下会提高性能。不过在高并发情况下,会导致大量的请求冲突,冲突会导致大部分操作无功而返,浪费系统资源。所以,在高并发情况下,乐观锁性能反而不如悲观锁。
SQL
语句实现:
select id, count, version from tb_goods where id=3; # 在商品表中多一个数据的版本号version,查询库存为20,版本号为27
update tb_goods set count=19,version=28 where id=3 and version=27; # 将库存减1,版本号加1
# 如果更新成功,说明没有其他线程抢占资源,否则,说明资源已经被抢占,需要重新再来。
Django
实现:
try:
goods = Goads.objects.get(id=3) # 开启行锁
except:
pass
Goods.objects.filter(id=3, version=goods.version).update(count=goods.count+1)
自定义锁表实现
首先,需要创建一个有关资源的锁记录表:
CREATE TABLE resourceLock(
id INT UNSIGNED PRIMARY KEY AUTO_NCREMENT,
resource_name VARCHAR(128) NOT NULL UNIQUE DEFAULT '' COMMENT '资源名字',
node_info VARCHAR(128) DEFAULT NUUL COMMENT '机器信息',
count INT UNSIGNED NOT NULL DEFAULT O COMMENT '锁的次数,实现重入性',
desc_info VARCHAR(128) DEFAULT NULL COMMENT '额外的信息',
update_time TIMESTAMP DEFAULT NULL COMMENT'更新时间',
create_time TIMESTAMP DEFAULT NULL COMMENT '创建时间'
)ENGINE=INNODB DEFAULT CHARSET=utf8;
定义锁的工具类
import time
from django.db import connection, transaction
class MysqlLock(object):
def lock(self, resource, node_info, block=False, timeout=10):
"""
默认非阻塞实现
:param resource: 需要加锁的资源名称
:param node_info: 当前线程的身份信息
:param block: 是否阻塞等待加锁
:param timeout: 超时时间
:return: True 或 False
1.先开启事务
2.根据参数block,决定是否阻塞等待锁的获取,即使阻塞等待,超时仍然会退出
3.查询表中,是否已经存在有关该资源的锁记录
4.假设已经存在锁,判断是否为当前线程添加: 是,就把锁记录加1,返回Teue;不是,返回False
5.假设不存在锁,添加锁记录,但是,资源名称唯一,表中进没有强制开锁,因此有失败的可能。所以,成动,返回True,失败,返回False
"""
pass
with connection.cursor() as cursor:
start = time.time()
n = 1
while n:
n = 1 if block else 0
now = time.time()
if now-start > timeout:
return False
cursor.execute(
'select id,node_info,count from resourceLock where resource_name=%s', [resource])
row = cursor.fetchone()
if row:
idi, node_infol, count1 = row
if node_info == node_infol:
cursor.execute(
'update resourceLock set count=%s', [count1 + 1])
return True
elif block is False:
return False
else:
try:
cursor.execute('insert into resourceLock values(0,%s,%s,%s,1,'', %s,%s)', [
resource, node_info, now, now])
except:
if block is False:
return False
return True
def unlock(self, resource, node_info):
"""
释放锁
:param resource:要加锁的资源名称
:param node_info:当前线程的身份信息
:return:True或False
1.先开启事务
2.查询表中,是否已经存在当前线程有关该资源的锁记录
3.假设存在锁记录,如果次数大于1,就更新次数减1,否则,删除该条记录,最后返回True
4.假设不存在所记录,则返回False
"""
with connection.cursor() as cursor:
with transaction.atomic():
cursor.execute('select id,node_info,count from resourceLock where resource_name=%s and node_info=%s',[resource, node_info])
row = cursor.fetchone()
if not row:
return False
id1, node_infol, count1 = row
if count1 > 1:
cursor.execute(
'update resourceLock set count=%s', [count1-1])
else:
cursor.execute('delete from resourceLock where resource_name=%s and node_info=%s',[resource, node_info])
return True
每次操作资源之前,进行加锁,加锁成功,继续操作;否则,就不操作;操作成功,释放锁。
上面这种实现有以下几个问题:
- 这把锁高度依赖数据库的可用性,数据库是个单点,一旦挂掉,会导致业务系统不可用
- 这把锁没有关闭时间,一旦解锁操作失败,就会导致锁记录一直存在数据库中,其他线程无法再获取锁
针对上面的问题,可以对症下药:
- 数据库是单点?实现数据库之间的主从同步,一个
MySQL
服务挂掉,立即切换到从机 - 没有失效时间?实现一个定时服务,每隔一定时间把数据库中的超时数据清理掉
Redis
锁
Redis
实现分布式锁原理是:一个线程在redis
中存入一个value,其他线程也要存的时候,发现key已经有value,就只好放弃或稍后再试。
一般使用的命令是:setnx
或set
加nx
参数,只允许一个客户端添加数据,先到先得;再调用del
删除数据,释放资源。
加锁
加锁的过程实际就是在redis中,给key设置个value。
106.128.28.10:0>keys * # redis中不存在数据
106.128.28.10:0>setnx 'tb_goods' 1 # 将商品有关的标志存入redis
"1"
106.128.28.10:0>setnx 'tb_goods' 1 # 其他客户端再存,就无法存入
"0"
但是,此时存在一个问题,就是服务器1设置锁之后,突然挂掉,无法删除锁;同时其他服务器一直无法获取,导致死锁,怎么办?
很简单,设置过期时间。
106.128.28.10:0>setnx 'tb_goods' 1 # 先存
"1"
106.128.28.10:0>EXPIRE 'tb goods' 10 # 后设置过期时间
"1"
这种情况下,仍然会出现问题,因为不是一步操作,不具备原子性,可能会出现:第一步执行完,服务器挂掉,并没有执行第二步,此时,有两种操作方案可以解决这个问题:使用lua脚本或使用set指令。
106.128.28.10:0>keys *
106.228.28.10:0>set 'tb_goods' 1 ex 10 nx # 此时就是原子操作,在设置值同时还设置过期时间
"OK"
106.128.28.10:0>keys *
1) "tb_goods"
106.128.28.10:0>set 'tb_goods' 1 ex 10 nx
null
解锁
解锁的过程实际就是将key键删除。
106.128.28.10:0>del 'tb_goods'
"1"
注意:删除键的过程中不能乱删,也就是自己只能释放自己的锁,不允许释放别人的锁。
有人会有疑惑:在多线程场景中,线程A获取到锁,但如果A不释放锁,此时,线程B是获取不到的,怎么会释放别人的锁?
假设线程A和B都用同一个键加锁。线程A加锁成功,由于业务时间过长,超过了设置的超时时间。这时,redis会释放锁。此时,线程B就可以加锁成功,然后执行业务操作。恰巧,此时A执行完业务逻辑,删除键释放锁。此时,就会出现线程A删除线程B的锁。
如何解决这个问题呢?在set命令加锁时,value可以使用随机数,删除时,先判断是否相等,再删除。
因为这两步操作在redis中是分开执行的,所以,需要通过lua脚本,实现原子操作。
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
上面的代码解决了释放其他人的锁的问题。但是还是没有解决 锁超时的问题,怎么办呢?给锁续命
- 到了超时时间,如果业务代码还没有执行完成,需要给锁自动续期。
- 获取锁之后,自动开启一个定时任务,每个10s,自动刷新一次过期时间。
当然,还有锁竞争问题、锁重入问题、主从复制问题等,之后再说吧。