更新時(shí)間:2022-11-11 來源:黑馬程序員 瀏覽量:
相信很多同學(xué)都聽說過分布式鎖,但也僅僅停留在概念的理解上,這篇文章會(huì)從分布式鎖的應(yīng)用場景講起,從實(shí)現(xiàn)的角度上深度剖析redis如何實(shí)現(xiàn)分布式鎖。
一、超賣問題
我們先來看超賣的概念:
當(dāng)寶貝庫存接近0時(shí),如果多個(gè)買家同時(shí)付款購買此寶貝,或者店鋪后臺(tái)在架數(shù)量大于倉庫實(shí)際數(shù)量,將會(huì)出現(xiàn)超賣現(xiàn)象。超賣現(xiàn)象本質(zhì)上就是買到了比倉庫中數(shù)量更多的寶貝。
> 本文主要解決超賣問題的第一種,同時(shí)多人購買寶貝時(shí),造成超賣。
測試代碼
那么超賣問題是如何產(chǎn)生的呢?我們準(zhǔn)備一段代碼進(jìn)行測試:
@Autowired private StringRedisTemplate stringRedisTemplate; /** * 第一種實(shí)現(xiàn),進(jìn)程內(nèi)就存在線程安全問題 * 可以只啟動(dòng)一個(gè)進(jìn)程測試 */ @RequestMapping("/deduct_stock1") public void deductStock1(){ String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設(shè)置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設(shè)置庫存" + realStock); }else{ System.out.println("庫存不足"); } }
這段代碼中,使用redis先獲取庫存數(shù)量(當(dāng)然實(shí)際場景中不會(huì)只保存一個(gè)全局庫存數(shù),應(yīng)該根據(jù)每一個(gè)商品單元(sku)保存一份庫存數(shù))。
String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock);
接下來,判斷庫存數(shù)是否大于0:
- 如果大于0,將庫存數(shù)減一,通過set命令,寫回redis
>這里沒有使用redis的decrement命令,因?yàn)榇嗣钤趓edis單線程模型下是線程安全的,而為了可以模擬線程不安全的情況將其拆成三步操作。
//設(shè)置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設(shè)置庫存" + realStock);
- 如果小于等于0,提示庫存不足
JMeter測試
通過JMeter進(jìn)行并發(fā)測試,看下會(huì)不會(huì)出現(xiàn)超賣的問題:
1.啟動(dòng)tomcat
這種情況下,只需要啟動(dòng)一個(gè)tomcat就會(huì)出現(xiàn)超賣。我們先啟動(dòng)一個(gè)tomcat在8080端口上。
2.下載JMeter
Apache JMeter是Apache組織開發(fā)的基于Java的壓力測試工具。
從官網(wǎng)上下載即可:
[https://jmeter.apache.org/download_jmeter.cgi](https://links.jianshu.com/go?to=https%3A%2F%2Fjmeter.apache.org%2Fdownload_jmeter.cgi)
下載完之后解壓,運(yùn)行bin目錄下的jmeter.bat,顯示如下界面:
如果嫌字體太小,可以選擇放大:
3.配置JMeter
在Test Plan上點(diǎn)擊右鍵,創(chuàng)建`線程組(Thread Group)`
配置一下具體參數(shù):
- `Number of Threads` 同時(shí)并發(fā)線程數(shù)
- `Ramp-Up Period(in-seconds)` 代表隔多長時(shí)間執(zhí)行,0代表同時(shí)并發(fā)。假設(shè)線程數(shù)為100, 估計(jì)的點(diǎn)擊率為每秒10次, 那么估計(jì)的理想ramp-up period 就是 100/10 = 10 秒
- `Loop Count` 循環(huán)次數(shù)
> 這里給出500是為了直接測試并發(fā)500搶,看看能不能正好把500個(gè)貨物搶完。
添加Http請求:
添加請求URL:
添加聚合結(jié)果,用來顯示整體的運(yùn)行情況:
到此為止JMeter的配置結(jié)束。
4.設(shè)置庫存量
啟動(dòng)redis-server,使用redis-client連接:
把庫存數(shù)設(shè)置為500。
5.開始測試
點(diǎn)擊運(yùn)行按鈕,啟動(dòng)測試:
首先我們看到聚合報(bào)告里輸出的結(jié)果:
錯(cuò)誤率0%,樣本數(shù)500,證明500個(gè)請求都已經(jīng)執(zhí)行,但是發(fā)現(xiàn)控制臺(tái)輸出如下:
<img src="assets/12.png" alt="img" style="zoom:67%;" />
很顯然,一份商品都被賣了多次,這顯然是不合理的。
原因分析
現(xiàn)在我們只啟動(dòng)了一個(gè)tomcat,在單jvm進(jìn)程的情況下,tomcat會(huì)使用線程池接收請求:
而由于每個(gè)線程可能同時(shí)獲取到庫存量,所以庫存量在兩個(gè)線程中顯示的都是500,然后兩個(gè)線程就繼續(xù)進(jìn)行扣減庫存操作,得出499寫回redis中,在這個(gè)過程中,顯然存在線程安全的問題。同一個(gè)商品被賣出了2份,超賣問題就出現(xiàn)了。
二、加鎖優(yōu)化
synchronized鎖
要保證單jvm中線程安全,最簡單直接的方式就是添加synchronized關(guān)鍵字,那么這樣行不行呢,我們來做一個(gè)測試:
/** * 第二種實(shí)現(xiàn),使用synchronized加鎖 * 可以只啟動(dòng)一個(gè)進(jìn)程測試 */ @RequestMapping("/deduct_stock2") public void deductStock2(){ synchronized (this){ String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設(shè)置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設(shè)置庫存" + realStock); }else{ System.out.println("庫存不足"); } } }
在進(jìn)行扣減庫存前,先通過synchronized關(guān)鍵字,對資源加鎖,這樣就只有一個(gè)線程能進(jìn)入到扣減庫存的代碼塊中。來測試一下:
重置庫存
set stock 500
修改接口地址
測試
<img src="assets/15.png" alt="img" style="zoom:67%;" />
可以看到,庫存被扣減為0,并且沒有出現(xiàn)超賣的情況(設(shè)置了500庫存,并且500個(gè)人搶,正好搶完)。
但是這種方案顯然是不行的,在生產(chǎn)環(huán)境上如果部署多個(gè)tomcat實(shí)例,那么就會(huì)出現(xiàn)如下情況:
多個(gè)進(jìn)程無法共享jvm內(nèi)存中的鎖,所以會(huì)出現(xiàn)多把鎖,這種情況下也會(huì)出現(xiàn)超賣問題。
三、分布式鎖的實(shí)現(xiàn)
多Tomcat實(shí)例下的超賣演示
接下來我們演示一下如何在多個(gè)Tomcat情況下,演示超賣的問題:
1.啟動(dòng)兩個(gè)tomcat服務(wù)
在IDEA中配置兩個(gè)spring boot的啟動(dòng)項(xiàng),使用vm參數(shù)指定不同的端口號(hào)
```undefined
-Dserver.port=8080
```
2.配置nginx
編寫~/nginx_redis/conf/nginx.conf如下:
user nginx; worker_processes 1; error_log /var/log/nginx/error.log warn; pid /var/run/nginx.pid; events { worker_connections 1024; } http { include /etc/nginx/mime.types; default_type application/octet-stream; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; upstream redislock{ server 192.168.226.1:8080 weight=1; server 192.168.226.1:8081 weight=1; } server { listen 80; server_name localhost; location /{ root html; proxy_pass http://redislock; } } access_log /var/log/nginx/access.log main; sendfile on; #tcp_nopush on; keepalive_timeout 65; #gzip on; include /etc/nginx/conf.d/*.conf; }
> 192.168.226.1這是我宿主機(jī)的IP
準(zhǔn)備一個(gè)虛擬機(jī)(也可以使用windows下的nginx),使用docker啟動(dòng)nginx:
docker pull nginx docker run -di -p 10085:80 --name nginx-redis-hc -v ~/nginx_redis/html:/usr/share/nginx/html -v ~/nginx_redis/conf/nginx.conf:/etc/nginx/nginx.conf -v ~/nginx_redis/logs:/var/log/nginx nginx
在宿主機(jī)下使用`虛擬機(jī)的IP地址:10085`訪問nginx,如果出現(xiàn)如下頁面就代表成功:
3.測試
修改接口地址為nginx:
運(yùn)行查看兩個(gè)tomcat的控制臺(tái):
- tomcat1
<img src="assets/21.png" alt="img" style="zoom:67%;" />
- tomcat2
<img src="assets/22.png" alt="img" style="zoom:67%;" />
沒有將庫存清空,證明存在超賣問題。
手動(dòng)實(shí)現(xiàn)分布式鎖
使用redis手動(dòng)實(shí)現(xiàn)分布式鎖,需要用到命令`setnx`。先來介紹一下setnx:
SETNX key value[]
> 可用版本: >= 1.0.0
>
> 時(shí)間復(fù)雜度: O(1)
只在鍵 `key` 不存在的情況下, 將鍵 `key` 的值設(shè)置為 `value` 。
若鍵 `key` 已經(jīng)存在, 則 `SETNX` 命令不做任何動(dòng)作。
`SETNX` 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
返回值
命令在設(shè)置成功時(shí)返回 `1` , 設(shè)置失敗時(shí)返回 `0` 。
代碼示例
redis> EXISTS job # job 不存在 # job 不存在 (integer) 0 redis> SETNX job "programmer" # job 設(shè)置成功 (integer) 1 redis> SETNX job "code-farmer" # 嘗試覆蓋 job ,失敗 (integer) 0 redis> GET job # 沒有被覆蓋
使用redis構(gòu)建分布式鎖流程如下:
image.png
- 線程1申請鎖(`setnx`),拿到了鎖。
- 線程2申請鎖,由于線程1已經(jīng)擁有了鎖,`setnx`返回0失敗,這一步用戶操作會(huì)失敗。
- 線程1執(zhí)行扣減庫存操作并釋放鎖。
- 線程2再次申請鎖,獲取到鎖并執(zhí)行扣減庫存,然后釋放鎖。
> 注意這里線程沒有拿到鎖,如果不嘗試while(true)重新獲取鎖,這個(gè)操作就直接失敗了。
代碼實(shí)現(xiàn)
/** * 第三種實(shí)現(xiàn),使用redis中的setIfAbsent(setnx命令)實(shí)現(xiàn)分布式鎖 */ @RequestMapping("/deduct_stock3") public void deductStock3(){ //在獲取到鎖的時(shí)候,給鎖分配一個(gè)id String opId = UUID.randomUUID().toString(); Boolean stockLock = stringRedisTemplate .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30); if(stockLock){ try{ String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設(shè)置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設(shè)置庫存" + realStock); }else{ System.out.println("庫存不足"); } }catch(Exception e){ e.printStackTrace(); }finally { if(opId.equals(stringRedisTemplate .opsForValue().get("stockLock"))){ stringRedisTemplate.delete("stockLock"); } } } }
測試略過,這里有幾個(gè)知識(shí)點(diǎn)需要說明
setIfAbsent設(shè)置超時(shí)
如果setIfAbsent不設(shè)置超時(shí)時(shí)間,假設(shè)線程執(zhí)行業(yè)務(wù)代碼時(shí)間時(shí)死鎖或者其他原因?qū)е麻L時(shí)間不釋放,那么會(huì)影響其他線程獲取到鎖,這個(gè)時(shí)候整體業(yè)務(wù)就會(huì)出現(xiàn)不可用。
Boolean stockLock = stringRedisTemplate .opsForValue().setIfAbsent("stockLock", opId, Duration.ofSeconds(30);
設(shè)置超時(shí)時(shí)間為30秒,該時(shí)間一般大于業(yè)務(wù)執(zhí)行的最大時(shí)間。
每次獲取到鎖,設(shè)置唯一ID
考慮這樣的場景
- 線程1獲取鎖扣減庫存,但是由于操作不當(dāng),長時(shí)間卡住,這樣會(huì)觸發(fā)超時(shí)時(shí)間鎖被釋放。
- 線程2獲取到鎖,扣減庫存。
- 線程1的代碼拋出異常,執(zhí)行finally釋放鎖,但是釋放的是進(jìn)程B的鎖。
解決方案就是在**加鎖前生成UUID**,釋放的時(shí)候校驗(yàn)UUID是否正確,如果不正確,說明加鎖線程不是當(dāng)前線程。
使用Redisson實(shí)現(xiàn)分布式鎖
setnx雖好,但是實(shí)現(xiàn)起來畢竟太過麻煩,一不小心就可能陷入并發(fā)編程的陷阱中,那么有沒有更加簡單的實(shí)現(xiàn)方式呢?答案就是`redisson`。
> Redisson是架設(shè)在[Redis](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.oschina.net%2Fp%2Fredis)基礎(chǔ)上的一個(gè)Java駐內(nèi)存數(shù)據(jù)網(wǎng)格(In-Memory Data Grid)?!綶Redis官方推薦](https://links.jianshu.com/go?to=http%3A%2F%2Fwww.redis.io%2Fclients)】
> Redisson在基于NIO的[Netty](https://links.jianshu.com/go?to=http%3A%2F%2Fnetty.io%2F)框架上,充分的利用了Redis鍵值數(shù)據(jù)庫提供的一系列優(yōu)勢,在Java實(shí)用工具包中常用接口的基礎(chǔ)上,為使用者提供了一系列具有分布式特性的常用工具類。使得原本作為協(xié)調(diào)單機(jī)多線程并發(fā)程序的工具包獲得了協(xié)調(diào)分布式多機(jī)多線程并發(fā)系統(tǒng)的能力,大大降低了設(shè)計(jì)和研發(fā)大規(guī)模分布式系統(tǒng)的難度。同時(shí)結(jié)合各富特色的分布式服務(wù),更進(jìn)一步簡化了分布式環(huán)境中程序相互之間的協(xié)作。
總而言之,`redisson`提供了一系列較為完善的工具類,其中就包含了分布式鎖。用`redisson`實(shí)現(xiàn)分布式鎖的流程極為簡單。
引入依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.14.0</version> </dependency>
創(chuàng)建Redisson實(shí)例
@Bean public RedissonClient redisson(){ // 1. Create config object Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); // config.useClusterServers() // // use "rediss://" for SSL connection // .addNodeAddress("redis://127.0.0.1:7181"); return Redisson.create(config); }
編寫分布式鎖代碼
@Autowired private RedissonClient redissonClient; /** * 第四種實(shí)現(xiàn),使用redisson實(shí)現(xiàn) */ @RequestMapping("/deduct_stock4") public void deductStock4(){ RLock lock = redissonClient.getLock("redisson:stockLock"); try{ //加鎖 lock.lock(); String stock = stringRedisTemplate.opsForValue().get("stock"); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //設(shè)置庫存減1 int realStock = stockNum - 1; stringRedisTemplate.opsForValue().set("stock",realStock + ""); System.out.println("設(shè)置庫存" + realStock); }else{ System.out.println("庫存不足"); } }catch(Exception e){ e.printStackTrace(); }finally { lock.unlock(); } }
其中加鎖代碼基本與進(jìn)程內(nèi)加鎖一致,就不再詳細(xì)解讀,讀者自行實(shí)踐即可。
Redisson分布式鎖原理
`Redisson分布式鎖`的主要原理非常簡單,利用了lua腳本的原子性。
在分布式環(huán)境下產(chǎn)生并發(fā)問題的主要原因是三個(gè)操作并不是原子操作:
- 獲取庫存
- 扣減庫存
- 寫入庫存
那么如果我們把三個(gè)操作合并為一個(gè)操作,在默認(rèn)單線程的Redis中運(yùn)行,是不會(huì)產(chǎn)生并發(fā)問題的。源碼如下:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
這一段源碼中,`redisson`利用了lua腳本的原子性,校驗(yàn)key是否存在,如果不存在就創(chuàng)建key并利用incrby加一操作(這步操作主要是為了實(shí)現(xiàn)可重入性)。`redisson`實(shí)現(xiàn)的分布式鎖具備如下特性:
- 鎖失效
- 鎖續(xù)租
> 執(zhí)行時(shí)間長的鎖快要到期時(shí)會(huì)自動(dòng)續(xù)租
- 可重入
- 操作原子性
鎖續(xù)租原理
使用如下代碼進(jìn)行測試鎖續(xù)租的情況
@Test void test() throws InterruptedException { RLock testlock1111 = redissonClient.getLock("testlock"); testlock1111.lock(); try{ Thread thread = new Thread(() -> { while(true){ Long testlock = redisTemplate.getExpire("testlock"); System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) + " ttl:" + testlock); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); thread.join(); }finally { if(testlock1111.isHeldByCurrentThread()){ testlock1111.unlock(); } } }
我們會(huì)發(fā)現(xiàn),每隔10秒會(huì)自動(dòng)續(xù)租一次,保證鎖不被釋放。
<img src="assets/25.png" alt="image-20220721153251169" style="zoom:67%;" />
那么這種續(xù)租的行為是如何實(shí)現(xiàn)的呢?考慮這種情況:如果線程加鎖之后,進(jìn)程宕機(jī),線程無法執(zhí)行解鎖代碼,那么這個(gè)鎖就無法得到釋放(注意,不是加鎖線程不允許亂解鎖),為了避免這種情況的發(fā)生,鎖都會(huì)設(shè)置一個(gè)過期時(shí)間。比如使用**lock**無參命令會(huì)默認(rèn)設(shè)置30秒的過期時(shí)間。那么30秒之后呢?如果線程還在工作,自動(dòng)釋放依然會(huì)產(chǎn)生線程安全的問題。所以Redisson使用了watch dog看門狗機(jī)制來實(shí)現(xiàn)自動(dòng)續(xù)租。
核心代碼及注釋:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture<Long> ttlRemainingFuture; //lock()無參方法leaseTime為-1,所以進(jìn)else分值 if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //通過lua腳本加鎖 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> { // 異步方法,等到加鎖成功會(huì)回調(diào),第一次加鎖ttlRemaining為空,leaseTime為-1 if (ttlRemaining == null) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { //設(shè)置延時(shí)任務(wù) scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper<>(f); }
接下來分析scheduleExpirationRenewal的過程:
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //創(chuàng)建一個(gè)延遲任務(wù) Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //執(zhí)行l(wèi)ua腳本進(jìn)行續(xù)租 CompletionStage<Boolean> future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error("Can't update lock " + getRawName() + " expiration", e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //執(zhí)行l(wèi)ua續(xù)租,鎖還在就續(xù)租,鎖不在返回false就取消續(xù)租的行為 if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //internalLockLeaseTime默認(rèn)值30,所以每10秒會(huì)續(xù)租一次,續(xù)租到30秒 ee.setTimeout(task); }
其中,renewExpirationAsync執(zhí)行的lua腳本如下:
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId)); }
判斷hash中是否存在鎖,如果存在就設(shè)置過期時(shí)間為30秒,返回1。如果不存在就返回0。
總結(jié)
本文介紹了超賣問題產(chǎn)生的原因:操作不具備原子性,同時(shí)提出了集中解決思路。
- `synchronized鎖`,無法保證多實(shí)例下的線程安全
- `setnx`手動(dòng)實(shí)現(xiàn),坑很多、代碼較為復(fù)雜
- `redisson`實(shí)現(xiàn),能夠保證多實(shí)例下線程安全,代碼簡單可靠