CAS与原子操作类

CAS 与原子操作类

CAS

CAS(Compare And Swap)是一条 CPU 并发原语,它的功能是判断内存某个位置的值是否为预期值,如果是则更改为新的值,这个过程是原子的
是实现并发算法时常用到的一种技术,包含三个操作数(内存位置、预期原值及更新值),执行 CAS 操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值,如果不匹配,处理器不做任何操作,多个线程同时执行 CAS 操作只有一个会成功

1
2
3
4
5
6
7
8
public class Demo1 {

public static void main(String[] args) throws InterruptedException {
AtomicInteger atomicInteger = new AtomicInteger(5);
System.out.println(atomicInteger.compareAndSet(5, 6) + "-" + atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(5, 7) + "-" + atomicInteger.get());
}
}
原理

CAS 有 3 个操作数,位置内存值 V,旧的预期值 A,要修改的更新值 B,当且仅当旧的预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做或重来

image.png

硬件级别保证

CAS 是 JDK 提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性,它是非阻塞的且自身原子性,也就是说它效率更高且通过硬件保证,说明更可靠

CAS 是一条 CPU 的原子指令(cmpxchg 指令),不会造成所谓的数据不一致问题,Unsafe 提供的 CAS 方法(如 compareAndSwapXXX)底层实现即为 CPU 指令 cmpxchg,执行 cmpxchg 指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行 CAS 操作,也就是说 CAS 的原子性实际上是 CPU 实现的, 其实在这一点上还是有排他锁的,只是比起用 synchronized,这里的排他时间要短的多,所以在多线程情况下性能会比较好

缺点
  • 自旋带来的 CPU 开销
  • ABA 问题
ABA 问题

CAS 算法实现一个重要前提需要取出内存中某时刻的数据并在当下时刻比较并替换,那么在这个时间差类会导致数据的变化,比如说一个线程 1 从内存位置 V 中取出 A,这时候另一个线程 2 也从内存中取出 A,并且线程 2 进行了一些操作将值变成了 B,然后线程 2 又将 V 位置的数据变成 A,这时候线程 1 进行 CAS 操作发现内存中仍然是 A,然后线程 1 操作成功,尽管线程 1 的 CAS 操作成功,但是不代表这个过程就是没有问题的

如何解决

  • 版本号:AtomicStampedReference
  • 时间戳:AtomicMarkableReference
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Demo1 {

static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100, 1);

public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp); // 1
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(100, 101, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 2次版本号:" + atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101, 100, atomicStampedReference.getStamp(), atomicStampedReference.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + "\t 3次版本号:" + atomicStampedReference.getStamp());
}, "t3").start();

new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName() + "\t 首次版本号:" + stamp); // 1
// 暂停一会儿线程,获得初始值 100 和初始版本号 1,故意暂停 3 秒钟让 t3 线程完成一次 ABA 操作产生问题
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
boolean result = atomicStampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName() + "\t" + result + "\t" + atomicStampedReference.getReference());
}, "t4").start();
}
}
UnSafe

image.png

valueOffset:表示该变量值在内存中的偏移地址,因为 Unsafe 就是根据内存偏移地址获取数据的
value:用 volatile 修饰,保证了多线程之间的内存可见性
AtomicInteger 类主要利用 CAS + volatile + native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升

它是 CAS 的核心类,由于 Java 方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe 相当于一个后门,基于该类可以直接操作特定内存的数据,Unsafe 类存在于 sun.misc 包中,其内部方法操作可以像 C 的指针一样直接操作内存,因为 Java 中 CAS 操作的执行依赖于 Unsafe 类的方法
注意 Unsafe 类中的所有方法都是 native 修饰的,也就是说 Unsafe 类中的方法都直接调用操作系统底层资源执行相应任务

CAS 并发原语体现在 JAVA 语言中就是 sun.misc.Unsafe 类中的各个方法,调用 UnSafe 类中的 CAS 方法,JVM 会帮我们实现出 CAS 汇编指令,这是一种完全依赖于硬件的功能,通过它实现了原子操作。再次强调,由于 CAS 是一种系统原语,原语属于操作系统用语范畴,是由若干条指令组成的,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许被中断,也就是说 CAS 是一条 CPU 的原子指令,不会造成所谓的数据不一致问题

自旋锁

是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗 CPU

image.png

Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Demo1  {

AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "\t come in");
while (!atomicReference.compareAndSet(null, thread)) {
}
}

public void myUnLock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t myUnLock over");
}

public static void main(String[] args) {
Demo1 demo1 = new Demo1();

new Thread(() -> {
demo1.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
demo1.myUnLock();
}, "A").start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(() -> {
demo1.myLock();
demo1.myUnLock();
}, "B").start();
}
}

原子操作类

基本类型原子类
  • AtomicInteger
  • AtomicBoolean
  • AtomicLong
常用方法
方法 说明
public final int get() 获取当前的值
public final int getAndSet(int newValue) 获取当前的值,并设置新的值
public final int getAndIncrement() 获取当前的值,并自增
public final int getAndDecrement() 获取当前的值,并自减
public final int getAndAdd(int delta) 获取当前的值,并加上预期的值
public final boolean compareAndSet(int expect, int update) 如果输入的数值等于预期值,则以原子方式将该值设置为输入值(update)
Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Demo1 {

public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(100);

for (int i = 1; i <= 100; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 5000; j++) {
myNumber.addPlusPlus();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}

countDownLatch.await();
System.out.println(myNumber.getAtomicInteger().get());
}
}

@Getter
class MyNumber {

private AtomicInteger atomicInteger = new AtomicInteger();

public void addPlusPlus() {
atomicInteger.incrementAndGet();
}
}
数据组类原子类
  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray
Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Demo1 {

public static void main(String[] args) {
// AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[5]);
// AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(new int[]{1, 2, 3, 4, 5});

for (int i = 0; i < atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}

int tmpInt = atomicIntegerArray.getAndSet(0, 1122);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(0));

atomicIntegerArray.getAndIncrement(1);
atomicIntegerArray.getAndIncrement(1);
tmpInt = atomicIntegerArray.getAndIncrement(1);
System.out.println(tmpInt + "\t" + atomicIntegerArray.get(1));
}
}
引用类型原子类
  • AtomicReference
  • AtomicStampedReference
  • AtomicMarkableReference
AtomicMarkableReference 使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Demo1 {

static AtomicMarkableReference<Integer> markableReference = new AtomicMarkableReference<>(100, false);

public static void main(String[] args) {

new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
markableReference.compareAndSet(100, 101, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t 2次版本号" + markableReference.isMarked());
markableReference.compareAndSet(101, 100, markableReference.isMarked(), !markableReference.isMarked());
System.out.println(Thread.currentThread().getName() + "\t 3次版本号" + markableReference.isMarked());
}, "t1").start();

new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName() + "\t 1次版本号" + marked);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
markableReference.compareAndSet(100, 2020, marked, !marked);
System.out.println(Thread.currentThread().getName() + "\t" + markableReference.getReference() + "\t" + markableReference.isMarked());
}, "t2").start();
}
}
对象的属性修改原子类
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater
  • AtomicReferenceFieldUpdater
使用目的及要求

目的:

  • 以一种线程安全的方式操作非线程安全对象内的某些字段

要求:

  • 更新的对象属性必须使用 public volatile 修饰符
  • 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法 newUpdater() 创建一个更新器,并且需要设置想要更新的类和属性
AtomicIntegerFieldUpdater 使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 一种线程安全的方式操作非线程安全对象的某些字段
* 需求:1000 个人同时向一个账号转账 1 元钱,那么累计应该增加 1000 元,除了 synchronized 和 CAS,还可以使用 AtomicIntegerFieldUpdater 来实现
*/
public class Demo1 {

public static void main(String[] args) {

BankAccount bankAccount = new BankAccount();

for (int i = 1; i <= 1000; i++) {
new Thread(() -> bankAccount.transferMoney(bankAccount), String.valueOf(i)).start();
}

try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(bankAccount.money);
}
}

class BankAccount {

public volatile int money = 0; // 金额

AtomicIntegerFieldUpdater<BankAccount> accountAtomicIntegerFieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");

// 不用加锁,性能高
public void transferMoney(BankAccount bankAccount) {
accountAtomicIntegerFieldUpdater.incrementAndGet(bankAccount);
}
}
AtomicReferenceFieldUpdater 使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 多线程并发调用一个类的初始化方法,如果未被初始化过,将执行初始化工作,要求只能初始化一次
*/
public class Demo1 {

public static void main(String[] args) throws InterruptedException {

MyVar myVar = new MyVar();

for (int i = 1; i <= 5; i++) {
new Thread(() -> myVar.init(myVar), String.valueOf(i)).start();
}
}
}

class MyVar {

public volatile Boolean isInit = Boolean.FALSE;
AtomicReferenceFieldUpdater<MyVar, Boolean> atomicReferenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class, Boolean.class, "isInit");

public void init(MyVar myVar) {
if (atomicReferenceFieldUpdater.compareAndSet(myVar, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + "\t" + "线程初始化");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t" + "线程初始化完毕");
} else {
System.out.println(Thread.currentThread().getName() + "\t" + "其它线程正在初始化");
}
}
}
原子操作增强类
  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder
LongAdder 为什么这么快

LongAdder 的基本思路就是分散热点,将 value 值分散到一个 Cell 数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多,如果要获取真正的 long 值,只要将各个槽中的变量值累加返回

sum() 会将所有 Cell 数组中的 value 和 base 累加作为返回值,核心的思想就是将之前 AtomicLong 一个 value 的更新压力分散到多个 value 中去,从而降级更新热点

Demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* 50个线程,每个线程 100W 次,总点赞数出来
*/
public class Demo2 {

public static void main(String[] args) throws InterruptedException {

CountDownLatch countDownLatch = new CountDownLatch(50);
CountDownLatch countDownLatch2 = new CountDownLatch(50);
CountDownLatch countDownLatch3 = new CountDownLatch(50);
CountDownLatch countDownLatch4 = new CountDownLatch(50);
ClickNumberNet clickNumberNet = new ClickNumberNet();
long startTime;
long endTime;

startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickBySync();
}
} finally {
countDownLatch.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickBySync result: " + clickNumberNet.number);

startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByAtomicLong();
}
} finally {
countDownLatch2.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByAtomicLong result: " + clickNumberNet.atomicLong);

startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByLongAdder();
}
} finally {
countDownLatch3.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAdder result: " + clickNumberNet.longAdder.sum());

startTime = System.currentTimeMillis();
for (int i = 1; i <= 50; i++) {
new Thread(() -> {
try {
for (int j = 1; j <= 100 * 10000; j++) {
clickNumberNet.clickByLongAccumulator();
}
} finally {
countDownLatch4.countDown();
}
}, String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: " + (endTime - startTime) + " 毫秒" + "\t clickByLongAccumulator result: " + clickNumberNet.longAccumulator.longValue());
}
}

class ClickNumberNet {
int number = 0;

public synchronized void clickBySync() {
number++;
}

AtomicLong atomicLong = new AtomicLong(0);

public void clickByAtomicLong() {
atomicLong.incrementAndGet();
}

LongAdder longAdder = new LongAdder();

public void clickByLongAdder() {
longAdder.increment();
}

LongAccumulator longAccumulator = new LongAccumulator((x, y) -> x + y, 0);

public void clickByLongAccumulator() {
longAccumulator.accumulate(1);
}
}
缺点

sum 求和后还有计算线程修改结果的话,最后结果不够准确

使用总结
  • AtomicLong
    • 线程安全,可允许一些性能损耗,要求高精度时可使用
    • 保证精度,性能代价
    • AtomicLong 是多个线程针对单个热点值 value 进行原子操作
  • LongAdder
    • 当需要在高并发下有较好的性能表现,且对值的精确度要求不高时,可以使用
    • 保证性能,精度代价
    • LongAdder 是每个线程拥有自己的槽,各个线程一般只对自己槽中的那个值进行 CAS 操作