Redis之分布式锁的实现方案 - 如何优雅地实现分布式锁(JAVA)
关键词
分布式锁
: 是控制分布式系统 之间同步访问共享资源 的一种方式。
spring-data-redis
: Spring针对redis的封装, 配置简单, 提供了与Redis存储交互的抽象封装, 十分优雅, 也极具扩展性, 推荐读一读源码
Lua
: Lua 是一种轻量小巧的脚本语言, 可在redis执行.
前言 本文阐述了Redis分布式锁的一种简单JAVA实现及优化进阶, 实现了自动解锁、自定义异常、重试、注解锁等功能, 尝试用更优雅简洁的代码完成分布式锁.
需求
互斥性: 在分布式系统环境下, 一个锁只能被一个线程持有.
高可用: 不会发生死锁、即使客户端崩溃也可超时释放锁.
非阻塞: 获取锁失败即返回.
方案 Redis具有极高的性能, 且其命令对分布式锁支持友好, 借助SET
命令即可实现加锁处理.
SET
EX
seconds – Set the specified expire time, in seconds.
PX
milliseconds – Set the specified expire time, in milliseconds.
NX
– Only set the key if it does not already exist.
XX
– Only set the key if it already exist.
实现
简单实现
做法为set if not exist(如果不存在则赋值), redis命令为原子操作, 所以单独使用set
命令时不用担心并发导致异常.
具体代码实现如下: (spring-data-redis:2.1.6
)
依赖引入 1 2 3 4 5 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > <version > 2.1.4.RELEASE</version > </dependency >
配置RedisTemplate 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Bean @ConditionalOnMissingBean public StringRedisTemplate stringRedisTemplate (RedisConnectionFactory factory) { StringRedisSerializer keySerializer = new StringRedisSerializer(); RedisSerializer<?> serializer = new StringRedisSerializer(); StringRedisTemplate template = new StringRedisTemplate(); template.setConnectionFactory(factory); template.setKeySerializer(keySerializer); template.setHashKeySerializer(keySerializer); template.setValueSerializer(serializer); template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; }
简单的分布式锁实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public Boolean tryLock (String key, String value, long timeout, TimeUnit unit) { return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); }
以上代码即完成了一个简单的分布式锁功能:
其中redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
即为执行redis命令:
1 2 3 4 redis> set dlock:test -try-lock a EX 10 NX OK redis> set dlock:test -try-lock a EX 10 NX null
早期版本spring-data-redis
分布式锁实现及注意事项
方法Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit);
是在2.1版本中新增的, 早期版本中setIfAbsent无法同时指定过期时间, 若先使用setIfAbsent
再设置key的过期时间, 会存在产生死锁的风险, 故旧版本中需要使用另外的写法进行实现. 以spring-data-redis:1.8.20
为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public Boolean tryLock (String key, String value, long timeout, TimeUnit unit) { return redisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis (RedisConnection connection) throws DataAccessException { JedisCommands commands = (JedisCommands)connection.getNativeConnection(); String result = commands.set(key, value, "NX" , "PX" , unit.toMillis(timeout)); return "OK" .equals(result); } }); }
spring-data-redis:1.8.20
默认redis客户端为jedis
, 可通过getNativeConnection
直接调用jedis方法进行操作. 新旧版本实现方式最终效果相同.
优化进阶
基于AOP实现分布式锁注解工具 - 不仅能用, 而且好用
优化一 (自动解锁及重试)
自动解锁、重试: 上一节针对分布式锁的简单实现可满足基本需求, 但仍有较多可优化改进之处, 本小节将针对分布式锁自动解锁及重试进行优化
分布式锁抽象类
实现AutoCloseable
接口, 可使用try-with-resource
方便地完成自动解锁.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 abstract public class DistributedLock implements AutoCloseable { private final Logger LOGGER = LoggerFactory.getLogger(getClass()); abstract public void release () ; @Override public void close () throws Exception { LOGGER.debug("distributed lock close , {}" , this .toString()); this .unlock(); } }
封装Redis分布式锁
RedisDistributedLock
是Redis分布式锁的抽象, 继承了DistributedLock
并实现了unlock接口.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class RedisDistributedLock extends DistributedLock { private RedisOperations<String, String> operations; private String key; private String value; private static final String COMPARE_AND_DELETE = "if redis.call('get',KEYS[1]) == ARGV[1]\n" + "then\n" + " return redis.call('del',KEYS[1])\n" + "else\n" + " return 0\n" + "end" ; public RedisDistributedLock (RedisOperations<String, String> operations, String key, String value) { this .operations = operations; this .key = key; this .value = value; } @Override public void release () { List<String> keys = Collections.singletonList(key); operations.execute(new DefaultRedisScript<String>(COMPARE_AND_DELETE), keys, value); } @Override public String toString () { return "RedisDistributedLock [key=" + key + ", value=" + value + "]" ; } }
(一): 通过Lua脚本进行解锁, 使对比锁的值
+删除
成为原子操作, 确保解锁操作的正确性. 简单来说就是防止删了别人的锁
. 例如: 线程A方法未执行完毕时锁超时了, 随后B线程也获取到了该锁(key相同), 但此时如果A线程方法执行完毕尝试解锁, 如果不比对value, 那么A将删掉B的锁, 这时候C线程又能加锁, 业务将产生更严重的混乱.(不要过分依赖分布式锁, 在数据一致性要求较高的情况下, 数据库层面也要进行一定的处理, 例如唯一键约束、事务等来确保数据的正确)
(二): 使用RedisOperations
执行Lua脚本进行解锁操作.
可参阅redis官方文档
加锁方法实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public DistributedLock acquire (String key, long timeout, int retries, long waitingTime) throws InterruptedException { final String value = RandomStringUtils.randomAlphanumeric(4 ) + System.currentTimeMillis(); do { Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); if (result) { return new RedisDistributedLock(stringRedisTemplate, key, value); } if (retries > NumberUtils.INTEGER_ZERO) { TimeUnit.MILLISECONDS.sleep(waitingTime); } if (Thread.currentThread().isInterrupted()){ break ; } } while (retries-- > NumberUtils.INTEGER_ZERO); return null ; }
(一): 锁值要保证唯一, 使用4位随机字符串+时间戳基本可满足需求 注: UUID.randomUUID()
在高并发情况下性能不佳.
(二): 尝试加锁, 代码中是2.1版本的做法, 早起版本参考上一节的实现.
此代码已经可以满足自动解锁和重试的需求了, 使用方法:
1 2 3 4 try (DistributedLock lock = redisLockService.acquire(key, 10000 , 2 , 500 );){ }
但还可以再优雅一点, 将模板代码封装起来, 可支持Lambda 表达式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @FunctionalInterface public interface LockHandler <T > { T handle () throws Throwable ; }
(一): 定义函数式接口, 将业务逻辑放入Lambda 表达式使代码更加简洁.
(二): 业务中的异常不建议在分布式锁中处理, 直接抛出来更合理.
使用LockHandler
完成加锁的实现:
1 2 3 4 5 6 7 8 9 10 public <T> T tryLock (String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime) throws Throwable { try (DistributedLock lock = this .acquire(key, timeout, retries, waitingTime);) { if (lock != null ) { LOGGER.debug("get lock success, key: {}" , key); return handler.handle(); } LOGGER.debug("get lock fail, key: {}" , key); return null ; } }
此时可以通过比较优雅的方式使用分布式锁来完成编码:
1 2 3 4 5 6 7 8 @Test public void testTryLock () throws Throwable { final String key = "dlock:test-try-lock" ; AnyObject anyObject = redisLockService.tryLock(key, () -> { return new AnyObject(); }, 10000 , true , 0 , 0 ); }
优化二 (自定义异常)
自定义异常: 前文中针对分布式锁的封装可满足多数业务场景, 但是考虑这样一种情况, 如果业务本身会返回NULL
当前的实现方式可能会存在错误的处理, 因为获取锁失败也会返回NULL
. 避免返回NULL
固然是一种解决方式, 但无法满足所有的场景, 此时支持自定义异常或许是个不错的选择.
实现起来很容易, 在原代码的基础之上增加onFailure
参数, 如果锁为空直接抛出异常即可.
加锁方法实现 1 2 3 4 5 6 7 8 9 10 11 12 13 public <T> T tryLock (String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime, Class<? extends RuntimeException> onFailure) throws Throwable { try (DistributedLock lock = this .getLock(key, timeout, retries, waitingTime);) { if (lock != null ) { LOGGER.debug("get lock success, key: {}" , key); return handler.handle(); } LOGGER.debug("get lock fail, key: {}" , key); if (null != onFailure) { throw onFailure.newInstance(); } return null ; } }
(一): Class<? extends RuntimeException>
限定onFailure
必须是RuntimeException
或其子类. 笔者认为使用RuntimeException
在语义上更容易理解. 如有需要使用其他异常也未尝不可(如获取锁失败需要统一处理等情况).
(二): 反射
优化三 (优雅地使用注解)
结合APO优雅地使用注解完成分布式锁:
定义注解
为了减小篇幅折叠部分注释
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Documented @Retention (RetentionPolicy.RUNTIME)@Target (ElementType.METHOD)public @interface DistributedLockable { long timeout () default 5L ; TimeUnit unit () default TimeUnit.MILLISECONDS ; int retries () default 0 ; long waitingTime () default 0L ; String prefix () default "" ; String[] argNames() default {}; boolean argsAssociated () default true ; boolean autoUnlock () default true ; Class<? extends RuntimeException> onFailure() default NoException.class ; public static final class NoException extends RuntimeException { private static final long serialVersionUID = -7821936618527445658L ; } }
timeout
: 超时时间
unit
: 时间单位
retries
: 重试次数
waitingTime
: 重试间隔时间
prefix
: key前缀, 默认为包名
+类名
+方法名
argNames
: 组成key的参数
注解可使用在方法上, 需要注意的是, 本文注解通过spring AOP实现, 故对象内部方法间调用将无效.
切面实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Aspect @Order (10 ) public class DistributedLockableAspect implements KeyGenerator { private final Logger LOGGER = LoggerFactory.getLogger(getClass()); @Resource private RedisLockClient redisLockClient; @Pointcut (value = "execution(* *(..)) && @annotation(com.github.piaoruiqing.dlock.annotation.DistributedLockable)" ) public void distributedLockable () {} @Around (value = "distributedLockable() && @annotation(lockable)" ) public Object around (ProceedingJoinPoint joinPoint, DistributedLockable lockable) throws Throwable { long start = System.nanoTime(); final String key = this .generate(joinPoint, lockable.prefix(), lockable.argNames(), lockable.argsAssociated()).toString(); Object result = redisLockClient.tryLock( key, () -> { return joinPoint.proceed(); }, lockable.unit().toMillis(lockable.timeout()), lockable.autoUnlock(), lockable.retries(), lockable.unit().toMillis(lockable.waitingTime()), lockable.onFailure() ); long end = System.nanoTime(); LOGGER.debug("distributed lockable cost: {} ns" , end - start); return result; } }
(一): 切面优先级
(二): KeyGenerator
为自定义的key生成策略, 使用 prefix+argName+arg
作为key, 具体实现见源码 .
此时可以通过注解的方式使用分布式锁, 这种方式对代码入侵较小, 且简洁.
1 2 3 4 5 6 7 8 9 10 11 12 13 @DistributedLockable ( argNames = {"anyObject.id" , "anyObject.name" , "param1" }, timeout = 20 , unit = TimeUnit.SECONDS, onFailure = RuntimeException.class ) public Long distributedLockableOnFaiFailure (AnyObject anyObject , String param1 , Object param2 , Long timeout ) { try { TimeUnit.SECONDS.sleep(timeout); LOGGER.info("distributed-lockable: " + System.nanoTime()); } catch (InterruptedException e) { } return System.nanoTime(); }
扩展 分布式锁的实现有多种方式, 可根据实际场景和需求选择不同的介质进行实现:
Redis: 性能高, 对分布式锁支持友好, 实现简单, 多数场景下表现较好.
Zookeeper: 可靠性较高, 对分布式锁支持友好, 实现较复杂但有现成的实现可以使用.
数据库: 实现简单, 可使用乐观锁
/悲观锁
实现, 性能一般, 高并发场景下不推荐
结语 本文阐述了Redis分布式锁的JAVA实现, 完成了自动解锁、自定义异常、重试、注解锁等功能, 源码见地址 .
本实现还有诸多可以优化之处, 如:
重入锁的实现
优化重试策略为订阅Redis事件: 订阅Redis事件可以进一步优化锁的性能, 可通过wait+notifyAll来替代文中的sleep.
篇幅有限, 后续再行阐述.
参考文献
欢迎关注公众号(代码如诗):