分布式锁
想要知道分布式锁我们就要先了解两个概念——锁、分布式。
问题一:什么是锁?
在单进程的系统中,当存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。
而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。这个标记可以理解为锁。
不同地方实现锁的方式也不一样,只要能满足所有线程都能看得到标记即可。如 Java 中 synchronize 是在对象头设置标记,Lock 接口的实现类基本上都只是某一个 volitile 修饰的 int 型变量其保证每个线程都能拥有对该 int 的可见性和原子修改,linux 内核中也是利用互斥量或信号量等内存数据做标记。
除了利用内存数据做锁其实任何互斥的都能做锁(只考虑互斥情况),如流水表中流水号与时间结合做幂等校验可以看作是一个不会释放的锁,或者使用某个文件是否存在作为锁等。只需要满足在对标记进行修改能保证原子性和内存可见性即可。
由此可见,锁的存在是为了使某一时刻,只有一个线程可以对数据进行修改操作,以此保证数据一致性问题。
问题二:什么是分布式?
分布式的 CAP 理论告诉我们:
任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。
目前很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。基于 CAP理论,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证最终一致性。
分布式场景
此处主要指集群模式下,多个相同服务同时开启.
在许多的场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。很多时候我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,通过 Java 提供的并发 API 我们可以解决,但是在分布式环境下,就没有那么简单啦。
分布式与单机情况下最大的不同在于其不是多线程而是
多进程。多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。
了解完这两个基础问题之后就该本文的重点戏了!
什么是分布式锁?我们需要怎样的分布式锁?
关于这个问题,其实我们可以简化为锁的问题,只不过是前提条件发生了变化,那么接下来我们来详细看看吧。
什么是分布式锁
当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
与单机模式下的锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。
分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。
我们需要怎样的分布式锁?
可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器-上的一个线程执行。
这把锁要是一把可重入锁(避免死锁)
这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
这把锁最好是一把公平锁(根据业务需求考虑要不要这条)
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好
OK,了解完问题后就该想解决办法了,这里主要分享两种办法:基于数据库做分布式锁和基于Redis做分布式锁。
问题解决
1. 基于数据库做分布式锁
基于乐观锁
基于表主键唯一做分布式锁
思路:利用主键唯一的特性,如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,当方法执行完毕之后,想要释放锁的话,删除这条数据库记录即可。
上面这种简单的实现有以下几个问题:
这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
这把锁只能是非阻塞的,因为数据的 insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。
这把锁是非公平锁,所有等待锁的线程凭运气去争夺锁。
在 MySQL 数据库中采用主键冲突防重,在大并发情况下有可能会造成锁表现象。
当然,我们也可以有其他方式解决上面的问题。
数据库是单点?搞两个数据库,数据之前双向同步,一旦挂掉快速切换到备库上。
没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
非阻塞的?搞一个 while 循环,直到 insert 成功再返回成功。
非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。
非公平的?再建一张中间表,将等待锁的线程全记录下来,并根据创建时间排序,只有最先创建的允许获取锁。
比较好的办法是在程序中生产主键进行防重。
基于表字段版本号做分布式锁
这个策略源于 mysql 的 mvcc 机制,使用这个策略其实本身没有什么问题,唯一的问题就是对数据表侵入较大,我们要为每个表设计一个版本号字段,然后写一条判断 sql 每次进行判断,增加了数据库操作的次数,在高并发的要求下,对数据库连接的开销也是无法忍受的。
基于悲观锁
基于数据库排他锁做分布式锁
在查询语句后面增加for update,数据库会在查询过程中给数据库表增加排他锁 (注意: InnoDB 引擎在加锁的时候,只有通过索引进行检索的时候才会使用行级锁,否则会使用表级锁。这里我们希望使用行级锁,就要给要执行的方法字段名添加索引,值得注意的是,这个索引一定要创建成唯一索引,否则会出现多个重载方法之间无法同时被访问的问题。重载方法的话建议把参数类型也加上。)。当某条记录被加上排他锁之后,其他线程无法再在该行记录上增加排他锁。
我们可以认为获得排他锁的线程即可获得分布式锁,当获取到锁之后,可以执行方法的业务逻辑,执行完方法之后,通过connection.commit()操作来释放锁。
这种方法可以有效的解决上面提到的无法释放锁和阻塞锁的问题。
阻塞锁?
for update语句会在执行成功后立即返回,在执行失败时一直处于阻塞状态,直到成功。锁定之后服务宕机,无法释放?使用这种方式,服务宕机之后数据库会自己把锁释放掉。
但是还是无法直接解决数据库单点和可重入问题。
这里还可能存在另外一个问题,虽然我们对方法字段名使用了唯一索引,并且显示使用 for update 来使用行级锁。但是,MySQL 会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁。如果发生这种情况就悲剧了。
还有一个问题,就是我们要使用排他锁来进行分布式锁的 lock,那么一个排他锁长时间不提交,就会占用数据库连接。一旦类似的连接变得多了,就可能把数据库连接池撑爆。
优缺点
优点:简单,易于理解
缺点:会有各种各样的问题(操作数据库需要一定的开销,使用数据库的行级锁并不一定靠谱,性能不靠谱)
2. 基于 Redisson 做分布式锁
Maven引入依赖包
<!-- 引入Redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.17.5</version> </dependency>创建Redisson配置类
import lombok.Data; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "spring.redis") @Data public class RedissonConfig { //ip地址 private String host; //端口号 private String port; @Bean public RedissonClient redissonClient(){ // 1. 创建配置 Config config = new Config(); String redisAddress = String.format("redis://%s:%s",host,port); config.useSingleServer().setAddress(redisAddress).setDatabase(3).setPassword("xxx"); // 2. 创建实例 RedissonClient redisson = Redisson.create(config); return redisson; } }在application.yml中配置redis相关依赖
spring: redis: host: localhost port: 6379 password: "xxx"编写定时任务
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @Component @Slf4j public class PreCacheJob { @Resource private UserService userService; @Resource private RedisTemplate<String,Object> redisTemplate; @Resource private RedissonClient redissonClient; //重点用户 private List<Long> mainUserList = Arrays.asList(1L); //每天执行,预热推荐用户 @Scheduled(cron = "0 4 0 * * *") public void doCacheRecommendUSer(){ RLock lock = redissonClient.getLock("partner:precachejob:docache:lock"); try { //只有一个线程能获取到锁 if (lock.tryLock(0, -1, TimeUnit.MILLISECONDS)){ //参数: 等待时间、释放时间(-1自动续期)、时间单位 for (Long userId : mainUserList) { QueryWrapper<User> queryWrapper = new QueryWrapper<>(); Page<User> userPage = userService.page(new Page<>(1,20), queryWrapper); String redisKey = String.format("partner:user:recommend:%s",userId); ValueOperations<String,Object> valueOperations = redisTemplate.opsForValue(); //将数据写入缓存 try { valueOperations.set(redisKey,userPage,30000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.error("redis set key error",e); } } } } catch (InterruptedException e) { log.error("doCacheRecommendUSer error:",e); } finally { //只能释放自己的锁 if (lock.isHeldByCurrentThread()){ lock.unlock(); } } } }我这里的仅供大家参考,实际可以根据自己需求去写代码。