- AtomicInteger
- AtomicReference
- AtomicStampedReference
- 原子数组AtomicIntegerArray
- 原子字段更新器AtomicReferenceFieldUpdater<T,V>
- 原子累加器(LongAdder)
AtomicInteger
package com.huangbei.cas;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
public class TestAccount {
public static void main(String[] args) {
Account accountUnsafe = new AccountUnsafe(10000);
Account accountSafe = new AccountSafe(10000);
Account.test(accountUnsafe);
Account.test(accountSafe);
}
}
class AccountSafe implements Account{
private AtomicInteger balance;
public AccountSafe(int balance) {
this.balance=new AtomicInteger(balance);
}
@Override
public int getBalance() {
return balance.get();
}
@Override
public void withDraw(int amount) {
while (true){
int prev=balance.get();
int next = prev - amount;
boolean success = balance.compareAndSet(prev, next);
if(success){
break;
}
}
}
}
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(int balance) {
this.balance = balance;
}
@Override
public int getBalance() {
return this.balance;
}
@Override
public void withDraw(int amount) {
this.balance -= amount;
}
}
interface Account {
int getBalance();
void withDraw(int amount);
static void test(Account account) {
ArrayList<Thread> threads = new ArrayList<>();
//创建一千个线程,每个调用withDraw(10)
long start = System.currentTimeMillis();
int n = 1000;
for (int i = 0; i < n; i++) {
threads.add(new Thread(() -> {
account.withDraw(10);
}));
}
threads.forEach(Thread::start);
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println("balance:" + account.getBalance());
System.out.println("time:" + (end - start) + " ms");
System.out.println("--------------------------");
}
}
AtomicReference
package com.huangbei.cas;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
public class TestAccount2 {
public static void main(String[] args) throws InterruptedException {
BigDecimalAccount account = new BigDecimalAccountImpl(new BigDecimal("10000"));
ArrayList<Thread> threads = new ArrayList<>();
for (int i = 0; i < 999; i++) {
threads.add(new Thread(()->{
account.withDraw("10");
}));
}
threads.forEach(Thread::start);
for (Thread t:threads){
t.join();
}
System.out.println(account.getBalance());
}
}
class BigDecimalAccountImpl implements BigDecimalAccount{
private AtomicReference<BigDecimal> balance;
public BigDecimalAccountImpl(BigDecimal initialBalance) {
this.balance=new AtomicReference<>(initialBalance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withDraw(String amount) {
BigDecimal prev,next;
do{
prev=balance.get();
next=prev.subtract(new BigDecimal(amount));
}while (!balance.compareAndSet(prev,next));
}
}
interface BigDecimalAccount{
void withDraw(String amount);
BigDecimal getBalance();
}
注意
需要说明的是AtomicReference<V>
在compare and set时,比较的是预期值和实际值的引用地址是否相等。
AtomicStampedReference
AtomicReference<V>
使用时,只会比较expectedReference和内存中ref实际的值是否相等,如果执行CAS操作之前其他线程对reference进行了形如”A–>B–>A”形式的修改,导致修改后ref没有变,CAS操作会成功。
如果这种变化在实际程序中不可接受,那么就要使用AtomicStampedReference<V>
。
它内置了一个int类型的变量stamp
,用来标记每一次CAS操作,只要每次都给stamp一个不同的值,当在线程进行CAS操作之前,其他线程做了”ABA“形式的修改,这个线程的CAS操作就会失败,等到下一次获取最新的stamp值,在其基础上进行CAS就会成功。
由于stamp
是一个int类型的变量,根据stamp还可以得到共享的reference被修改过多少次。
package com.huangbei.cas;
import com.huangbei.util.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicStampedReference;
/*
AtomicStampedReference<V>
*/
@Slf4j(topic = "c.Test-AtomicStampedReference")
public class Test3 {
public static void main(String[] args) {
AtomicStampedReference<String> ref=new AtomicStampedReference<>("A",0);
//在主线程对ref进行更新之前,有其他的线程对ref进行了"A->B->A"的修改
String expRef = ref.getReference();
int stamp = ref.getStamp();
updateByNewThread(ref);
Sleeper.sleep(1);
boolean success = ref.compareAndSet(expRef, "C", stamp, stamp + 1);
log.debug("A->C:{}",success);
}
public static void updateByNewThread(AtomicStampedReference<String> reference){
new Thread(()->{
//对共享对象进行修改
String ref = reference.getReference();
int stamp = reference.getStamp();
log.debug("A->B:{}",reference.compareAndSet(ref,"B",stamp,stamp+1));
//B->A
ref=reference.getReference();
stamp = reference.getStamp();
log.debug("B->A:{}",reference.compareAndSet(ref,"A",stamp,stamp+1));
}).start();
}
}
原子数组AtomicIntegerArray
package com.huangbei.cas;
/*
原子数组
*/
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class Test5 {
public static void main(String[] args) {
//线程不安全
demo(
() -> new int[10],
arr -> arr.length,
(arr, index) -> arr[index]++,
arr -> System.out.println(Arrays.toString(arr))
);
//线程安全
demo(
() -> new AtomicIntegerArray(10),
arr -> arr.length(),
(arr, index) -> arr.incrementAndGet(index),
arr -> System.out.println(arr)
);
}
public static <T> void demo(
Supplier<T> supplier,
Function<T, Integer> lengthFunction,
BiConsumer<T, Integer> putValueConsumer,
Consumer<T> printConsumer
) {
//supplier
T array = supplier.get();
//function
Integer length = lengthFunction.apply(array);
List<Thread> threads = new ArrayList<>();
//putValueConsumer
//创建10个线程,每个线程对T做10000次操作
for (int i = 0; i < length; i++) {
threads.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putValueConsumer.accept(array, j % length);
}
}));
}
//线程开始
threads.forEach(t -> t.start());
//等待所有线程结束
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
//printConsumer
printConsumer.accept(array);
}
}
原子字段更新器AtomicReferenceFieldUpdater<T,V>
AtomicReferenceFieldUpdater<T,V>
可以对类型T的对象的类型为V的字段进行CAS操作。
注意:字段必须用关键字volatile
修饰,因为要保证变量在线程间的可见性。
package com.huangbei.cas;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class Test6 {
public static void main(String[] args) {
Student stu = new Student();
AtomicReferenceFieldUpdater<Student, String> nameUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
System.out.println(nameUpdater.compareAndSet(stu, null, "张三"));
System.out.println(stu);
}
}
class Student{
volatile String name;
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
'}';
}
}
原子累加器(LongAdder)
原子累加器提升性能的原理:设置了多个累加单元,最后将所有累加单元的结果汇总。当出现竞争时对其他的累加单元进行操作,减少了CAS失败的次数,提高了性能。累加单元的数量跟CPU核心数有关(不会超过核心数)。
LongAdder使用了分段锁的思想。