使用Redis分布式锁

常见锁类执行逻辑:

1、写+写 阻塞排队

2、写+读 等待写结束

3、读+写(先读再写) 读完才能写!! 有读锁写也需要等待

4、多个读:相当于无锁,并发读会同时上锁成功,会记录所有的读

5、fair lock公平锁 并发执行之后有先后顺序,不是抢占,因此第一个执行结束将锁交给下一个人、

6、读写锁 写锁控制读锁,当有进程中在写的时候不能读取,只有当写入结束之后才可以读取 写锁自身也会等待

1、分布式锁与本地锁的区别

image-20220411163401780

2、本地锁

本地锁使用过程中,可以防止在一个微服务内部由于并发问题产生的读写数据不一致问题。

2.1 Synchronized关键字加锁

1、同步方法

//加锁方式1
public synchronized void method()
{
//TODO
}
//加锁方式2(同步方法中使用同步代码块)
public void method()
{
synchronized(this) {
// TODO
}
}

1.1、同步实例方法

public synchronized static void method()
{
//TODO
}

2、同步代码块(锁住当前括号里面 的对象)

2.1、加类锁——锁住当前类

//使用代码块加锁
class ClassName {
public void method() {
synchronized(ClassName.class) {
// todo
}
}
}

2.2、加锁当前对象(当调用是同一个对象的时候可以i锁住,不是同一个对象不能锁住)

public void main(){
synchronized(this){
//TODO
}
}

2.2 JUC锁包加锁

java.util.concurrent处理业务过程中出现的竞争和死锁问题

线程的状态:

>新生
>运行
>阻塞
>等待
>超时等待
>终止

wait与object

wait::来自Object类,会释放锁,必须在同步代码块中,不需要捕获异常

sleep:来自线程类,不会释放锁,任何地方,需要捕获异常

image-20220411170848261

1、Tools 工具类

1)闭锁CountDownLatch

​ 其执行过程中允许线程一直等待,相当于存在一个初始值,当所有的初始值得到释放锁的数量减为0的时候就会被解锁。

2)CyclicBarrier 栅栏

​ 允许线程等待,直到设置的一个公共访问点。

3)Semaphore 信号量

​ 是一个共享的锁,允许执行PV操作,获取释放信号量,有可用的可用使用,信号量减一,没有的时候等待。

2、Executor 执行者

3、Atomic 原子包

4、Locks 锁包

3、分布式锁

在分布式业务环境调用过程中,多个微服务同时访问同一个数据的时候会存在并发读写的情况,因此需要锁来保证每个分布式业务不会出现因为并发读写出现的数据不一致的情况。

3.1、手动在Redis中放置分布式锁

image-20220411162734221

分布式锁的基本使用流程:

  1. 获取分布式锁
  2. 执行自己的业务逻辑代码
  3. 解除锁
  4. 结束自己的逻辑

代码逻辑如下

String token = UUID.randomUUID().toString();
//原子命令占锁 EX NX一起设置 原子性
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", token, 30, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(lock)) {
//获取锁成功
try {
//执行业务逻辑
} finally {
//lua脚本解锁
String lua = """解锁脚本(这里是代码块)""";
Long resultUnlock = stringRedisTemplate.execute(new DefaultRedisScript<Long>("lua脚本", "返回值类型"),"lua脚本的参数.....");

使用分布式锁Set命令可能会出现的问题:

  1. 不设置过期时间:虽然业务在Try-catch语句中执行,最后在Finally中释放锁,但是程序在业务过程中发生闪断,那么就执行不到Finally语句块中导致死锁。

    解决方案:为锁设置过期时间

  2. 不同时获取锁和设置过期时间:当程序刚好执行到设置过期时间的时候发生闪断,那么这个时候🔒就会被永远的占用出现死锁情况

    解决方案:在获取锁的时候同时设置过期时间set Ex Nx

  3. 因锁过期,但业务为执行完成,导致锁误删情况:当程序业务执行时间过长的时候,因为设置了锁的过期时间,这个时候实际上所以已经被释放了,但是业务还会执行释放锁的操作,导致锁被误删除

    解决方案:为锁设置自己唯一的标识符:锁的值为一组UUID

    解锁过程:

    ​ 1、获取Lock锁对应的值,进行比对UUID是否相等

    ​ 2、相等的话进行释放

  4. 释放锁的过程不是原子操作:如果刚好获取到UUID之后比对成功,这个时候出现锁过期,这样在删除就会导致删除别人的锁。

    解决方案:使用Lua脚本进行操作

    if redis.call("get",KEYS[1]) == ARGV[1]
    then
    return redis.call("del",KEYS[1])
    else
    return 0
    end

3.2、使用Redisson设置分布式锁

3.2.1、基本使用方法

  1. 在业务类中注入RedissonClient实例对象
  2. 使用实例对象获取锁(同一个锁名称就是统一把锁)
  3. 执行业务逻辑
  4. 释放锁
public void method() {
RLock categoryLock = redissonClient.getLock("锁的名称");
//加锁
categoryLock.lock();
try {
//执行业务逻辑
} finally {
//解锁
categoryLock.unlock();
}
//方法返回值
}

3.2.2、基本锁类示例

1、ReadWriteLock(读写锁)

/**
* 测试写锁 修改期间,写锁是一个排他锁(互斥锁,独享锁)只能一次操作一个,只能有一个写锁
*
* @return
*/
@ResponseBody
@GetMapping("/writevalue")
public String writeValue() {
//写数据加写锁,读数据加读锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
//获取写锁
RLock rLock = readWriteLock.writeLock();
String uuid = "";
try {
//上写锁
rLock.lock();
uuid = UUID.randomUUID().toString();
Thread.sleep(30000);
stringRedisTemplate.opsForValue().set("uuid", uuid);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放写锁
rLock.unlock();
}
return uuid;
}

/**
* 测试读取锁 能够保证能够读到最新值,读锁是一个共享的,但是写存在读必须等待
*
* @return
*/
@ResponseBody
@GetMapping("/readvalue")
public String readValue() {
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.readLock();
String uuid = "";
try {
//上读锁
rLock.lock();
uuid = stringRedisTemplate.opsForValue().get("uuid");
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
rLock.unlock();
}
return uuid;
}

2、Semaphore(信号量)

 /**
* 获取信号量 信号量也可以做分布式限流 总量恒定
*
* @return ok
*/
@ResponseBody
@GetMapping("/park")
public String park() {
RSemaphore position = redissonClient.getSemaphore("position");
//申请一个信号量
// 会阻塞! boolean tryBoolean = position.acquire();
//非阻塞!

boolean tryBoolean = position.tryAcquire();
if (tryBoolean) {
//执行业务
System.out.println("执行业务");
} else {
//返回
System.out.println("返回信息");
}
return "ok" + ">" + tryBoolean;
}

/**
* 释放信号量
*
* @return release
*/
@ResponseBody
@GetMapping("/go")
public String go() {
RSemaphore position = redissonClient.getSemaphore("position");
//释放一个信号量
position.release();
return "release";
}

3、CountDownLatch(闭锁)

/**
* 测试闭锁
* 5个班没人了,才能锁大门 减到零就结束锁,闭锁
*
* @return success
*/
@GetMapping("/lockdoor")
@ResponseBody
public String countDownLatchLock() {
//获取闭锁
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.trySetCount(5);
//等待完成闭锁5个人
try {
door.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

return "锁门了";
}

/**
* 班级走人
*
* @param id
* @return gone
*/
@GetMapping("/leave")
@ResponseBody
public String countDownLatchLeave(@Param("id") Long id) {
//获取闭锁
RCountDownLatch door = redissonClient.getCountDownLatch("door");
//放开闭锁,计数减一
door.countDown();
return id + "走了";
}