solitaryclown

原子类和原子数组

2021-12-23
solitaryclown

AtomicInteger

package com.huangbei.cas;


import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

public class TestAccount {

    public static void main(String[] args) {
        Account accountUnsafe = new AccountUnsafe(10000);
        Account accountSafe = new AccountSafe(10000);

        Account.test(accountUnsafe);
        Account.test(accountSafe);

    }
}

class AccountSafe implements Account{
    private AtomicInteger balance;

    public AccountSafe(int balance) {
        this.balance=new AtomicInteger(balance);
    }


    @Override
    public int getBalance() {
        return balance.get();
    }

    @Override
    public void withDraw(int amount) {
        while (true){
            int prev=balance.get();
            int next = prev - amount;
            boolean success = balance.compareAndSet(prev, next);
            if(success){
                break;
            }
        }
    }
}
class AccountUnsafe implements Account {
    private Integer balance;

    public AccountUnsafe(int balance) {
        this.balance = balance;
    }

    @Override
    public int getBalance() {
        return this.balance;
    }

    @Override
    public void withDraw(int amount) {
        this.balance -= amount;
    }
}

interface Account {
    int getBalance();

    void withDraw(int amount);

    static void test(Account account) {
        ArrayList<Thread> threads = new ArrayList<>();
        //创建一千个线程,每个调用withDraw(10)

        long start = System.currentTimeMillis();

        int n = 1000;
        for (int i = 0; i < n; i++) {
            threads.add(new Thread(() -> {
                account.withDraw(10);
            }));

        }
        threads.forEach(Thread::start);
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        long end = System.currentTimeMillis();
        System.out.println("balance:" + account.getBalance());
        System.out.println("time:" + (end - start) + " ms");
        System.out.println("--------------------------");

    }
}

AtomicReference

package com.huangbei.cas;


import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;

public class TestAccount2 {

    public static void main(String[] args) throws InterruptedException {

        BigDecimalAccount account = new BigDecimalAccountImpl(new BigDecimal("10000"));

        ArrayList<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 999; i++) {

            threads.add(new Thread(()->{
                account.withDraw("10");
            }));

        }
        threads.forEach(Thread::start);
        for (Thread t:threads){
            t.join();
        }
        System.out.println(account.getBalance());

    }
}

class BigDecimalAccountImpl implements BigDecimalAccount{
    private AtomicReference<BigDecimal> balance;

    public BigDecimalAccountImpl(BigDecimal initialBalance) {
        this.balance=new AtomicReference<>(initialBalance);
    }

    @Override
    public BigDecimal getBalance() {
        return balance.get();
    }

    @Override
    public void withDraw(String amount) {
        BigDecimal prev,next;
        do{
            prev=balance.get();
            next=prev.subtract(new BigDecimal(amount));
        }while (!balance.compareAndSet(prev,next));
    }
}

interface BigDecimalAccount{
    void withDraw(String amount);
    BigDecimal getBalance();
}

注意

需要说明的是AtomicReference<V>在compare and set时,比较的是预期值和实际值的引用地址是否相等。

AtomicStampedReference

AtomicReference<V>使用时,只会比较expectedReference和内存中ref实际的值是否相等,如果执行CAS操作之前其他线程对reference进行了形如”A–>B–>A”形式的修改,导致修改后ref没有变,CAS操作会成功。

如果这种变化在实际程序中不可接受,那么就要使用AtomicStampedReference<V>。 它内置了一个int类型的变量stamp,用来标记每一次CAS操作,只要每次都给stamp一个不同的值,当在线程进行CAS操作之前,其他线程做了”ABA“形式的修改,这个线程的CAS操作就会失败,等到下一次获取最新的stamp值,在其基础上进行CAS就会成功。 由于stamp是一个int类型的变量,根据stamp还可以得到共享的reference被修改过多少次。

package com.huangbei.cas;

import com.huangbei.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicStampedReference;

/*
    AtomicStampedReference<V>
 */
@Slf4j(topic = "c.Test-AtomicStampedReference")
public class Test3 {

    public static void main(String[] args) {
        AtomicStampedReference<String> ref=new AtomicStampedReference<>("A",0);
        //在主线程对ref进行更新之前,有其他的线程对ref进行了"A->B->A"的修改

        String expRef = ref.getReference();
        int stamp = ref.getStamp();

        updateByNewThread(ref);
        Sleeper.sleep(1);


        boolean success = ref.compareAndSet(expRef, "C", stamp, stamp + 1);
        log.debug("A->C:{}",success);

    }
    public static void updateByNewThread(AtomicStampedReference<String> reference){

        new Thread(()->{
            //对共享对象进行修改
            String  ref = reference.getReference();
            int stamp = reference.getStamp();
            log.debug("A->B:{}",reference.compareAndSet(ref,"B",stamp,stamp+1));
            //B->A
            ref=reference.getReference();
            stamp = reference.getStamp();
            log.debug("B->A:{}",reference.compareAndSet(ref,"A",stamp,stamp+1));

        }).start();
    }
}

原子数组AtomicIntegerArray

package com.huangbei.cas;

/*
原子数组
 */

import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

public class Test5 {
    public static void main(String[] args) {
        //线程不安全
        demo(
                () -> new int[10],
                arr -> arr.length,
                (arr, index) -> arr[index]++,
                arr -> System.out.println(Arrays.toString(arr))
        );

        //线程安全
        demo(
                () -> new AtomicIntegerArray(10),
                arr -> arr.length(),
                (arr, index) -> arr.incrementAndGet(index),
                arr -> System.out.println(arr)
        );
    }

    public static <T> void demo(
            Supplier<T> supplier,
            Function<T, Integer> lengthFunction,
            BiConsumer<T, Integer> putValueConsumer,
            Consumer<T> printConsumer
    ) {
        //supplier
        T array = supplier.get();
        //function
        Integer length = lengthFunction.apply(array);

        List<Thread> threads = new ArrayList<>();
        //putValueConsumer
        //创建10个线程,每个线程对T做10000次操作
        for (int i = 0; i < length; i++) {
            threads.add(new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    putValueConsumer.accept(array, j % length);
                }
            }));
        }
        //线程开始
        threads.forEach(t -> t.start());
        //等待所有线程结束
        threads.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        //printConsumer
        printConsumer.accept(array);

    }
}

原子字段更新器AtomicReferenceFieldUpdater<T,V>

AtomicReferenceFieldUpdater<T,V>可以对类型T的对象的类型为V的字段进行CAS操作。 注意:字段必须用关键字volatile修饰,因为要保证变量在线程间的可见性。

package com.huangbei.cas;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

public class Test6 {
    public static void main(String[] args) {
        Student stu = new Student();
        AtomicReferenceFieldUpdater<Student, String> nameUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
        System.out.println(nameUpdater.compareAndSet(stu, null, "张三"));

        System.out.println(stu);

    }
}
class Student{
    volatile String name;

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + '\'' +
                '}';
    }
}

原子累加器(LongAdder)

原子累加器提升性能的原理:设置了多个累加单元,最后将所有累加单元的结果汇总。当出现竞争时对其他的累加单元进行操作,减少了CAS失败的次数,提高了性能。累加单元的数量跟CPU核心数有关(不会超过核心数)。

LongAdder使用了分段锁的思想。


上一篇 Cas

下一篇 Unsafe类

Comments

Content