首頁(yè)技術(shù)文章正文

Java培訓(xùn):java高性能并發(fā)計(jì)數(shù)器之巔峰對(duì)決

更新時(shí)間:2022-08-25 來源:黑馬程序員 瀏覽量:

  并發(fā)計(jì)數(shù)器各個(gè)方案介紹

  方案概述

  1. jdk5提供的原子更新長(zhǎng)整型類 AtomicLong

  2. synchronized

  3. jdk8提供的 LongAdder 【單機(jī)推薦】

  4. Redisson分布式累加器【分布式推薦】

  方案介紹

  jdk5提供的原子更新長(zhǎng)整型類 AtomicLong

  在JDK1.5開始就新增了并發(fā)的Integer/Long的操作工具類AtomicInteger和AtomicLong。

  AtomicLong 利用底層操作系統(tǒng)的CAS來保證原子性,在一個(gè)死循環(huán)內(nèi)不斷執(zhí)行CAS操作,直到操作成功。不過,CAS操作的一個(gè)問題是在并發(fā)量比較大的時(shí)候,可能很多次的執(zhí)行CAS操作都不成功,這樣性能就受到較大影響。

  示例代碼

AtomicLong value = new AtomicLong(0);  //定義
incrementAndGet(); //遞增1
```

  synchronized

  synchronized是一個(gè)重量級(jí)鎖,主要是因?yàn)榫€程競(jìng)爭(zhēng)鎖會(huì)引起操作系統(tǒng)用戶態(tài)和內(nèi)核態(tài)切換,浪費(fèi)資源效率不高,在jdk1.5之前,synchronized沒有做任何優(yōu)化,但在jdk1.6做了性能優(yōu)化,它會(huì)經(jīng)歷偏向鎖,輕量級(jí)鎖,最后才到重量級(jí)鎖這個(gè)過程,在性能方面有了很大的提升,在jdk1.7的ConcurrentHashMap是基于ReentrantLock的實(shí)現(xiàn)了鎖,但在jdk1.8之后又替換成了synchronized,就從這一點(diǎn)可以看出JVM團(tuán)隊(duì)對(duì)synchronized的性能還是挺有信心的。下面我們分別來介紹下無鎖,偏向鎖,輕量級(jí)鎖,重量級(jí)鎖。

  jdk8提供的 LongAdder 【單機(jī)推薦】

  在JDK8中又新增了LongAdder,這是一個(gè)針對(duì)Long類型的數(shù)據(jù)的操作工具類。

  那我們知道,在ConcurrentHashMap中,對(duì)Map分割成多個(gè)segment,這樣多個(gè)Segment的操作就可以并行執(zhí)行,從而可以提高性能。在JDK8中,LongAdder與ConcurrentHashMap類似,將內(nèi)部操作數(shù)據(jù)value分離成一個(gè)Cell數(shù)組,每個(gè)線程訪問時(shí),通過Hash等算法映射到其中一個(gè)Cell上。

  計(jì)算最終的數(shù)據(jù)結(jié)果,則是各個(gè)Cell數(shù)組的累計(jì)求和。

  LongAddr常用api方法

add():  //增加指定的數(shù)值;
increament(): //增加1;
decrement(): //減少1;
intValue();  //intValue();/floatValue()/doubleValue():得到最終計(jì)數(shù)后的結(jié)果
sum()://求和,得到最終計(jì)數(shù)結(jié)果
sumThenReset()://求和得到最終計(jì)數(shù)結(jié)果,并重置value。
```

  Redisson分布式累加器【分布式推薦】

  基于Redis的Redisson分布式整長(zhǎng)型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的接口。通過利用客戶端內(nèi)置的LongAdder對(duì)象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計(jì)其性能最高比分布式AtomicLong對(duì)象快 10000 倍以上。

RLongAddr itheimaLongAddr = redission.getLongAddr("itheimaLongAddr");
itheimaLongAddr.add(100);  //添加指定數(shù)量
itheimaLongAddr.increment(); //遞增1
itheimaLongAddr.increment(); //遞減1
itheimaLongAddr.sum(); //聚合求和
```

  基于Redis的Redisson分布式雙精度浮點(diǎn)累加器(DoubleAdder)采用了與java.util.concurrent.atomic.DoubleAdder類似的接口。通過利用客戶端內(nèi)置的DoubleAdder對(duì)象,為分布式環(huán)境下遞增和遞減操作提供了很高得性能。據(jù)統(tǒng)計(jì)其性能最高比分布式AtomicDouble對(duì)象快 12000 倍。

  示例代碼

RLongDouble itheimaDouble = redission.getLongDouble("itheimaLongDouble");
itheimaDouble.add(100);  //添加指定數(shù)量
itheimaDouble.increment(); //遞增1
itheimaDouble.increment(); //遞減1
itheimaDouble.sum(); //聚合求和
```

  以上【整長(zhǎng)型累加器】和【雙精度浮點(diǎn)累加器】完美適用于分布式統(tǒng)計(jì)計(jì)量場(chǎng)景。

  各個(gè)方案性能測(cè)試

  測(cè)試代碼

```
package com.itheima._01性能比較;

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

/**
 * @author 黑馬程序員
 */
public class CountTest {

    private int count = 0;

    @Test
    public void startCompare() {
        compareDetail(1, 100 * 10000);
        compareDetail(20, 100 * 10000);
        compareDetail(30, 100 * 10000);
        compareDetail(40, 100 * 10000);
        compareDetail(100, 100 * 10000);
    }

    /**
     * @param threadCount 線程數(shù)
     * @param times 每個(gè)線程增加的次數(shù)
     */
    public void compareDetail(int threadCount, int times) {
        try {
            System.out.println(String.format("threadCount: %s, times: %s", threadCount, times));
            long start = System.currentTimeMillis();
            testSynchronized(threadCount, times);
            System.out.println("testSynchronized cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("testAtomicLong cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("testLongAdder cost: " + (System.currentTimeMillis() - start));
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void testSynchronized(int threadCount, int times) throws InterruptedException {
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    add();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }

    public synchronized void add() {
        count++;
    }

    public void testAtomicLong(int threadCount, int times) throws InterruptedException {
        AtomicLong count = new AtomicLong();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.incrementAndGet();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }

    public void testLongAdder(int threadCount, int times) throws InterruptedException {
        LongAdder count = new LongAdder();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.increment();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }
}
```

  運(yùn)行結(jié)果

threadCount: 1, times: 1000000
testSynchronized cost: 69
testAtomicLong cost: 16
testLongAdder cost: 15

threadCount: 20, times: 1000000
testSynchronized cost: 639
testAtomicLong cost: 457
testLongAdder cost: 59

threadCount: 30, times: 1000000
testSynchronized cost: 273
testAtomicLong cost: 538
testLongAdder cost: 70

threadCount: 40, times: 1000000
testSynchronized cost: 312
testAtomicLong cost: 717
testLongAdder cost: 81

threadCount: 100, times: 1000000
testSynchronized cost: 719
testAtomicLong cost: 2098
testLongAdder cost: 225
```

  結(jié)論

  

1661393077095_1.png

  并發(fā)量比較低的時(shí)候AtomicLong優(yōu)勢(shì)比較明顯,因?yàn)锳tomicLong底層是一個(gè)樂觀鎖,不用阻塞線程,不斷cas即可。但是在并發(fā)比較高的時(shí)候用synchronized比較有優(yōu)勢(shì),因?yàn)榇罅烤€程不斷cas,會(huì)導(dǎo)致cpu持續(xù)飆高,反而會(huì)降低效率

  LongAdder無論并發(fā)量高低,優(yōu)勢(shì)都比較明顯。且并發(fā)量越高,優(yōu)勢(shì)越明顯

  原理分析

  AtomicLong 實(shí)現(xiàn)原子操作原理

  非原子操作示例代碼

package com.itheima._02Unsafe測(cè)試;

import java.util.ArrayList;
import java.util.List;

/**
 * @author 黑馬程序員
 */
public class Test1 {

    private int value = 0;

    public static void main(String[] args) throws InterruptedException {
        Test1 test1 = new Test1();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value);
        //結(jié)果,期待值:10000,最終結(jié)果值:xxxx
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動(dòng)100個(gè)線程,每個(gè)線程對(duì)value進(jìn)行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value++;
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運(yùn)行完成
        for (Thread thread : list) {
            thread.join();
        }
    }

}
```

  運(yùn)行效果

1661393298046_2.jpg

  結(jié)論

  > 可以發(fā)現(xiàn)輸出的結(jié)果值錯(cuò)誤,這是因?yàn)?`value++` 不是一個(gè)原子操作,它將 `value++` 拆分成了 3 個(gè)步驟 `load、add、store`,多線程并發(fā)有可能上一個(gè)線程 add 過后還沒有 store 下一個(gè)線程又執(zhí)行了 load 了這種重復(fù)造成得到的結(jié)果可能比最終值要小。

  AtomicLong是JDK1.5提供的原子操作示例代碼

package com.itheima._03AtomicLong的CAS原子操作示例;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author 黑馬程序員
 */
public class Test2 {

    private AtomicLong value = new AtomicLong(0);

    public static void main(String[] args) throws InterruptedException {
        Test2 test1 = new Test2();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value);
        //結(jié)果,期待值:10000,最終結(jié)果值:10000
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動(dòng)100個(gè)線程,每個(gè)線程對(duì)value進(jìn)行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value.incrementAndGet();
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運(yùn)行完成
        for (Thread thread : list) {
            thread.join();
        }
    }

}
```

  運(yùn)行效果

1661393347277_3.jpg

  AtomicLong CAS原理介紹

  1.使用volatile保證內(nèi)存可見性,獲取主存中最新的操作數(shù)據(jù)

  2.使用CAS(Compare-And-Swap)操作保證數(shù)據(jù)原子性

  CAS算法是jdk對(duì)并發(fā)操作共享數(shù)據(jù)的支持,包含了3個(gè)操作數(shù)

  第一個(gè)操作數(shù):內(nèi)存值value(V)

  第二個(gè)操作數(shù):預(yù)估值expect(O)

  第三個(gè)操作數(shù):更新值new(N)

  含義:CAS比較交換的過程可以通俗的理解為CAS(V,O,N),包含三個(gè)值分別為:V 內(nèi)存地址(主存)存放的實(shí)際值;O 預(yù)期的值(舊值);N 更新的新值。當(dāng)V和O相同時(shí),也就是說舊值和內(nèi)存中實(shí)際的值相同表明該值沒有被其他線程更改過,即該舊值O就是目前來說最新的值了,自然而然可以將新值N賦值給V;當(dāng)V和O不相同時(shí),會(huì)一致循環(huán)下去直至修改成功。

  AtomicLong底層CAS實(shí)現(xiàn)原子操作原理

  查看incrementAndGet()方法源碼

public final long incrementAndGet() {
    return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
```

getAndAddLong方法源碼

```java
public final long getAndAddLong(Object var1, long var2, long var4) {
    long var6;
    do {
        var6 = this.getLongVolatile(var1, var2);
    } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));

    return var6;
}
```

  > 這里是一個(gè)循環(huán)CAS操作

  compareAndSwapLong方法源碼

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
```

  我們發(fā)現(xiàn)調(diào)用的是 native 的 `unsafe.compareAndSwapLong(Object obj, long valueOffset, Long expect, Long update)`,我們翻看 Hotspot 源碼發(fā)現(xiàn)在 unsafe.cpp 中定義了這樣一段代碼

  > Unsafe中基本都是調(diào)用native方法,那么就需要去JVM里面找對(duì)應(yīng)的實(shí)現(xiàn)。

  >

  > 到`http://hg.openjdk.java.net/` 進(jìn)行一步步選擇下載對(duì)應(yīng)的hotspot版本,我這里下載的是`http://hg.openjdk.java.net/jdk8u/jdk8u60/hotspot/archive/tip.tar.gz`,

  >

  > 然后解hotspot目錄,發(fā)現(xiàn) `\src\share\vm\prims\unsafe.cpp`,這個(gè)就是對(duì)應(yīng)jvm相關(guān)的c++實(shí)現(xiàn)類了。

  >

  > 比如我們對(duì)CAS部分的實(shí)現(xiàn)很感興趣,就可以在該文件中搜索compareAndSwapInt,此時(shí)可以看到對(duì)應(yīng)的JNI方法為`Unsafe_CompareAndSwapInt`

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapLong(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jlong e, jlong x))
  UnsafeWrapper("Unsafe_CompareAndSwapLong");
  Handle p (THREAD, JNIHandles::resolve(obj));
  jlong* addr = (jlong*)(index_oop_from_field_offset_long(p(), offset));
#ifdef SUPPORTS_NATIVE_CX8
  return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
#else
  if (VM_Version::supports_cx8())
    return (jlong)(Atomic::cmpxchg(x, addr, e)) == e;
  else {
    jboolean success = false;
    MutexLockerEx mu(UnsafeJlong_lock, Mutex::_no_safepoint_check_flag);
    jlong val = Atomic::load(addr);
    if (val == e) { Atomic::store(x, addr); success = true; }
    return success;
  }
#endif
UNSAFE_END
```

  Atomic::cmpxchg c++源碼

  可以看到調(diào)用了“Atomic::cmpxchg”方法,“Atomic::cmpxchg”方法在linux_x86和windows_x86的實(shí)現(xiàn)如下。

  linux_x86的實(shí)現(xiàn):

1661393503544_4.jpg

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
```

  windows_x86的實(shí)現(xiàn)(c++源文件):

1661393567504_5.jpg

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  // alternative for InterlockedCompareExchange
  int mp = os::is_MP();
  __asm {
    mov edx, dest
    mov ecx, exchange_value
    mov eax, compare_value
    LOCK_IF_MP(mp)
    cmpxchg dword ptr [edx], ecx
  }
}
```

  Atomic::cmpxchg方法解析:

  mp是“os::is_MP()”的返回結(jié)果,“os::is_MP()”是一個(gè)內(nèi)聯(lián)函數(shù),用來判斷當(dāng)前系統(tǒng)是否為多處理器。

  如果當(dāng)前系統(tǒng)是多處理器,該函數(shù)返回1。

  否則,返回0。

  LOCK_IF_MP(mp)會(huì)根據(jù)mp的值來決定是否為cmpxchg指令添加lock前綴。

  如果通過mp判斷當(dāng)前系統(tǒng)是多處理器(即mp值為1),則為cmpxchg指令添加lock前綴。

  否則,不加lock前綴。

  這是一種優(yōu)化手段,認(rèn)為單處理器的環(huán)境沒有必要添加lock前綴,只有在多核情況下才會(huì)添加lock前綴,因?yàn)閘ock會(huì)導(dǎo)致性能下降。cmpxchg是匯編指令,作用是比較并交換操作數(shù)。

  > 底層會(huì)調(diào)用cmpxchg匯編指令,如果是多核處理器會(huì)加鎖實(shí)現(xiàn)原子操作

  反匯編指令查詢

  查看java程序運(yùn)行的匯編指令資料

1661393650563_6.jpg

  將上圖2個(gè)文件拷貝到j(luò)re\bin目錄下,如下圖

1661393663582_7.jpg

  配置運(yùn)行參數(shù)

  ```

  -server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*

  ```

1661393691738_8.jpg

1661393707679_9.jpg

1661393719174_10.jpg

  運(yùn)行Test2效果

1661393737130_11.jpg

  synchronized 實(shí)現(xiàn)同步操作原理

  鎖對(duì)象

  java中任何一個(gè)對(duì)象都可以稱為鎖對(duì)象,原因在于java對(duì)象在內(nèi)存中存儲(chǔ)結(jié)構(gòu),如下圖所示:

1661393750947_12.jpg

  在對(duì)象頭中主要存儲(chǔ)的主要是一些運(yùn)行時(shí)的數(shù)據(jù),如下所示:

1661393763314_13.jpg

  其中 在Mark Work中存儲(chǔ)著該對(duì)象作為鎖時(shí)的一些信息,如下所示是Mark Work中在64位系統(tǒng)中詳細(xì)信息:

1661393777046_14.jpg

  偏向鎖

  在無競(jìng)爭(zhēng)環(huán)境中(沒有并發(fā))使用一種鎖

  > 偏向鎖的作用是當(dāng)有線程訪問同步代碼或方法時(shí),線程只需要判斷對(duì)象頭的Mark Word中判斷一下是否有偏向鎖指向線程ID.

  >

  > 偏向鎖記錄過程

  >

  > - 線程搶到了對(duì)象的同步鎖(鎖標(biāo)志為01參考上圖即無其他線程占用)

  > - 對(duì)象Mark World 將是否偏向標(biāo)志位設(shè)置為1

  > - 記錄搶到鎖的線程ID

  > - 進(jìn)入偏向狀態(tài)

  輕量級(jí)鎖

  當(dāng)有另外一個(gè)線程競(jìng)爭(zhēng)獲取這個(gè)鎖時(shí),由于該鎖已經(jīng)是偏向鎖,當(dāng)發(fā)現(xiàn)對(duì)象頭 Mark Word 中的線程 ID 不是自己的線程 ID,就會(huì)進(jìn)行 CAS 操作獲取鎖,**如果獲取成功**,直接替換 Mark Word 中的線程 ID 為自己的 ID,該鎖會(huì)保持偏向鎖狀態(tài);**如果獲取鎖失敗**,代表當(dāng)前鎖有一定的競(jìng)爭(zhēng),偏向鎖將升級(jí)為輕量級(jí)鎖。

  - 舉個(gè)例子來說明一下什么時(shí)候需要升級(jí)偏向鎖

  假設(shè)A線程 持有鎖 X(此時(shí)X是偏向鎖) 這是有個(gè)B線程也同樣用到了鎖X,而B線程在檢查鎖對(duì)象的Mark World時(shí)發(fā)現(xiàn)偏向鎖的線程ID已經(jīng)指向了線程A。這時(shí)候就需要升級(jí)鎖X為輕量級(jí)鎖。輕量級(jí)鎖意味著標(biāo)示該資源現(xiàn)在處于競(jìng)爭(zhēng)狀態(tài)。

  當(dāng)有其他線程想訪問加了輕量級(jí)鎖的資源時(shí),會(huì)使用自旋鎖優(yōu)化,來進(jìn)行資源訪問。

  > 自旋策略

  >

  > JVM 提供了一種自旋鎖,可以通過自旋方式不斷嘗試獲取鎖,從而避免線程被掛起阻塞。這是基于大多數(shù)情況下,線程持有鎖的時(shí)間都不會(huì)太長(zhǎng),畢竟線程被掛起阻塞可能會(huì)得不償失。

  >

  > 從 JDK1.7 開始,自旋鎖默認(rèn)啟用,自旋次數(shù)由 JVM 設(shè)置決定,這里我不建議設(shè)置的重試次數(shù)過多,因?yàn)?CAS 重試操作意味著長(zhǎng)時(shí)間地占用 CPU。自旋鎖重試之后如果搶鎖依然失敗,同步鎖就會(huì)升級(jí)至重量級(jí)鎖,鎖標(biāo)志位改為 10。在這個(gè)狀態(tài)下,未搶到鎖的線程都會(huì)進(jìn)入 Monitor,之后會(huì)被阻塞在 _WaitSet 隊(duì)列中。

  重量級(jí)鎖

  自旋失敗,很大概率 再一次自選也是失敗,因此直接升級(jí)成重量級(jí)鎖,進(jìn)行線程阻塞,減少cpu消耗。

  當(dāng)鎖升級(jí)為重量級(jí)鎖后,未搶到鎖的線程都會(huì)被阻塞,進(jìn)入阻塞隊(duì)列。

  重量級(jí)鎖在高并發(fā)下性能就會(huì)變慢,因?yàn)樗袥]有獲取鎖的線程會(huì)進(jìn)行阻塞等待,到獲取鎖的時(shí)候被喚醒,這些操作都是消耗很多資源。

  輕量級(jí)鎖膨脹流程圖

1661393802532_15.jpg

  LongAdder 實(shí)現(xiàn)原子操作原理

  LongAdder實(shí)現(xiàn)高并發(fā)計(jì)數(shù)實(shí)現(xiàn)思路

  LongAdder實(shí)現(xiàn)高并發(fā)的秘密就是用空間換時(shí)間,對(duì)一個(gè)值的cas操作,變成對(duì)多個(gè)值的cas操作,當(dāng)獲取數(shù)量的時(shí)候,對(duì)這多個(gè)值加和即可。

1661393816329_16.jpg

  測(cè)試代碼

```
package com.itheima._04LongAddr使用測(cè)試;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;

/**
 * @author 黑馬程序員
 */
public class Test3 {

    private LongAdder value = new LongAdder(); //默認(rèn)初始值0

    public static void main(String[] args) throws InterruptedException {
        Test3 test1 = new Test3();
        test1.increment();
        System.out.println("期待值:" + 100 * 100 + ",最終結(jié)果值:" + test1.value.sum());
        //結(jié)果,期待值:10000,最終結(jié)果值:10000
    }

    private void increment() throws InterruptedException {
        List<Thread> list = new ArrayList<>();
        //啟動(dòng)100個(gè)線程,每個(gè)線程對(duì)value進(jìn)行累加100次
        for (int i = 0; i < 100; i++) {
            Thread t = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    value.increment();
                }
            });
            list.add(t);
            t.start();
        }

        //保證所有線程運(yùn)行完成
        for (Thread thread : list) {
            thread.join();
        }

    }

}
```

  源碼分析

  1. 先對(duì)base變量進(jìn)行cas操作,cas成功后返回

  2. 對(duì)線程獲取一個(gè)hash值(調(diào)用getProbe),hash值對(duì)數(shù)組長(zhǎng)度取模,定位到cell數(shù)組中的元素,對(duì)數(shù)組中的元素進(jìn)行cas

  增加數(shù)量源碼

public void increment() {
    add(1L);
}
```

```java
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
```

  當(dāng)數(shù)組不為空,并且根據(jù)線程hash值定位到數(shù)組某個(gè)下標(biāo)中的元素不為空,對(duì)這個(gè)元素cas成功則直接返回,否則進(jìn)入longAccumulate方法

1661393945975_17.jpg

  1. cell數(shù)組已經(jīng)初始化完成,主要是在cell數(shù)組中放元素,對(duì)cell數(shù)組進(jìn)行擴(kuò)容等操作

  2. cell數(shù)組沒有初始化,則對(duì)數(shù)組進(jìn)行初始化

  3. cell數(shù)組正在初始化,這時(shí)其他線程利用cas對(duì)baseCount進(jìn)行累加操作

  完整代碼

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
```

  獲取計(jì)算數(shù)量源碼

public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
```

  需要注意的是,調(diào)用sum()返回的數(shù)量有可能并不是當(dāng)前的數(shù)量,因?yàn)樵谡{(diào)用sum()方法的過程中,可能有其他數(shù)組對(duì)base變量或者cell數(shù)組進(jìn)行了改動(dòng),所以需要確保所有線程運(yùn)行完再獲取就是準(zhǔn)確值

  LongAdder 的前世今生

  其實(shí)在 Jdk1.7 時(shí)代,LongAdder 還未誕生時(shí),就有一些人想著自己去實(shí)現(xiàn)一個(gè)高性能的計(jì)數(shù)器了,比如一款 Java 性能監(jiān)控框架 dropwizard/metrics 就做了這樣事,在早期版本中,其優(yōu)化手段并沒有 Jdk1.8 的 LongAdder 豐富,而在 metrics 的最新版本中,其已經(jīng)使用 Jdk1.8 的 LongAdder 替換掉了自己的輪子。在最后的測(cè)評(píng)中,我們將 metrics 版本的 LongAdder 也作為一個(gè)參考對(duì)象。

  應(yīng)用場(chǎng)景

  AtomicLong等原子類的使用

  并發(fā)少競(jìng)爭(zhēng)少(讀多寫少)的計(jì)數(shù)原子操作

  LongAdder 的使用

  高性能計(jì)數(shù)器的首選方案, 單體項(xiàng)目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器

  應(yīng)用場(chǎng)景功能:獲取全局自增id值

  Synchronized與Lock的使用比較

  Synchronized 適合少量的同步并發(fā)競(jìng)爭(zhēng)

  Lock 適合大量的同步并發(fā)競(jìng)爭(zhēng)

1661394039753_18.jpg

  總結(jié)

  并發(fā)情況優(yōu)化鎖思路:

  互斥鎖 -> 樂觀鎖 -> 鎖的粒度控制

  在Java中對(duì)應(yīng)的實(shí)現(xiàn)方式:

  ReentrantLock或者Syschronized -> CAS + Volatile -> 拆分競(jìng)爭(zhēng)點(diǎn)(longAddr,分布式累加器,ConcurrentHashMap等)

  ReentrantLock或者Syschronized 在高并發(fā)時(shí)都存在獲取鎖等待、阻塞、喚醒等操作,所以在使用的使用注意拆分競(jìng)爭(zhēng)點(diǎn)。

  AtomicLong

  1. 并發(fā)量非常高,可能導(dǎo)致都在不停的爭(zhēng)搶該值,可能導(dǎo)致很多線程一致處于循環(huán)狀態(tài)而無法更新數(shù)據(jù),從而導(dǎo)致 CPU 資源的消耗過高。解決這個(gè)問題需要使用LongAdder

  2. ABA 問題,比如說上一個(gè)線程增加了某個(gè)值,又改變了某個(gè)值,然后后面的線程以為數(shù)據(jù)沒有發(fā)生過變化,其實(shí)已經(jīng)被改動(dòng)了。解決這個(gè)問題請(qǐng)參考《擴(kuò)展:原子更新字段類-ABA問題解決》

  synchronized

  synchronized鎖升級(jí)實(shí)際上是把本來的悲觀鎖變成了 在一定條件下 使用無所(同樣線程獲取相同資源的偏向鎖),以及使用樂觀(自旋鎖 cas)和一定條件下悲觀(重量級(jí)鎖)的形式。

  偏向鎖:適用于單線程適用鎖的情況

  輕量級(jí)鎖:適用于競(jìng)爭(zhēng)較不激烈的情況(這和樂觀鎖的使用范圍類似)

  重量級(jí)鎖:適用于競(jìng)爭(zhēng)激烈的情況

  LongAdder

  - AtomicLong :并發(fā)場(chǎng)景下讀性能優(yōu)秀,寫性能急劇下降,不適合作為高性能的計(jì)數(shù)器方案。內(nèi)需求量少。

  - LongAdder :并發(fā)場(chǎng)景下寫性能優(yōu)秀,讀性能由于組合求值的原因,不如直接讀值的方案,但由于計(jì)數(shù)器場(chǎng)景寫多讀少的緣故,整體性能在幾個(gè)方案中最優(yōu),是高性能計(jì)數(shù)器的首選方案。由于 Cells 數(shù)組以及緩存行填充的緣故,占用內(nèi)存較大。

  最佳方案

  高性能計(jì)數(shù)器的首選方案, 單體項(xiàng)目建議使用LongAddr,分布式環(huán)境建議使用Redisson分布式累加器

  應(yīng)用場(chǎng)景功能:獲取全局自增id值

分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!