基本类型原子类:AtomicInteger、AtomicBoolean、AtomicLong,常见API:
数组类型原子类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray,用法和基本类型原子类类似
AtomicIntegerArray atomicIntegerArray =new AtomicIntegerArray(new int[]{1,2,3,4});
引用类型原子类
对象的属性修改原子类
原子操作增强类
CountDownLatch使用场景
线程计数器 用于线程执行任务,计数 等待线程结束
例如:
public static void test() throws Exception{
AtomicInteger ato = new AtomicInteger(0);
Integer size = 100;
//100个线程用于计算
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int a = 0; a < size; a++) {
new Thread(()->{
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (int b =0;b <100;b++){
//自增
ato.getAndIncrement();
}
//计算完了就减1
countDownLatch.countDown();
}).start();
}
//只有 countDownLatch 中的数量为0 才能继续执行,避免 Thread.sleep()的操作,计算完成能够即时执行
countDownLatch.await();
System.out.println("最终计算结果:"+ato.get());
}
对象的属性修改原子类,可以用线程安全的方式操作非线程安全对象内的某些字段,例如:某个对象有多个属性,但是只有少量字段需要频繁更新,加锁虽然可以保证线程安全,但是有可能会锁住整个对象,所以可以用原子更新代替加锁
AtomicIntegerFieldUpdater,基于反射,原子更新对象中,被volatile修饰的 int类型字段的值
AtomicLongFieldUpdater基于反射,原子更新对象中,被volatile修饰的 long类型字段的值
AtomicReferenceFieldUpdater,基于反射,原子更新引用类型,被volatile修饰的字段的值
使用原子更新的要:
示例:如下出了金额以外的字段是不会经常更新的,针对金额字段加锁,肯定是可以保证线程安全的,但是很重量级,不加锁的话,结果会不确定,线程不安全
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AtomicEntity {
//账号
private String account;
//联系方式
private String phone;
//姓名
private String name;
//金额
private volatile Integer money;
public synchronized void addMoney(Integer add) {
money = money + add;
}
public synchronized void subMoney(Integer sub) {
money = money - sub;
}
}
private static void test3() throws Exception {
AtomicEntity atomic = new AtomicEntity("123456789", "123456", "张三", 0);
CountDownLatch countDownLatch = new CountDownLatch(200);
for (int a = 0; a < 100; a++) {
new Thread(() -> {
for (int b = 0; b < 100; b++) {
atomic.addMoney(1);
}
countDownLatch.countDown();
}, "线程" + a).start();
}
for (int a = 0; a < 100; a++) {
new Thread(() -> {
for (int b = 0; b < 100; b++) {
atomic.subMoney(1);
}
countDownLatch.countDown();
}, "线程" + a).start();
}
countDownLatch.await();
//这里执行结果肯定是正确的,如果不加锁会导致线程错乱
System.out.println("最终结果" + atomic.getMoney());
}
如果不想加锁,使用对象的属性修改原子类,AtomicIntegerFieldUpdater示例
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AtomicEntity {
//账号
private String account;
//联系方式
private String phone;
//姓名
private String name;
//金额 ,修饰符必须是 public volatile
public volatile int money;
public AtomicEntity(String account, String phone, String name, Integer money) {
this.account = account;
this.phone = phone;
this.name = name;
this.money = money;
}
AtomicIntegerFieldUpdater<AtomicEntity> ato =
AtomicIntegerFieldUpdater.newUpdater(AtomicEntity.class, "money");
public void atoAddMoney(AtomicEntity atomic, Integer add) {
ato.getAndAdd(atomic, add);
}
public void atoSubMoney(AtomicEntity atomic, Integer sub) {
ato.getAndAdd(atomic, -sub);
}
}
private static void test4() throws Exception {
AtomicEntity atomic = new AtomicEntity("123456789", "123456", "李四", 0);
CountDownLatch countDownLatch = new CountDownLatch(200);
for (int a = 0; a < 100; a++) {
new Thread(() -> {
for (int b = 0; b < 100; b++) {
atomic.atoAddMoney(atomic,1);
}
countDownLatch.countDown();
}, "线程" + a).start();
}
for (int a = 0; a < 100; a++) {
new Thread(() -> {
for (int b = 0; b < 100; b++) {
atomic.atoSubMoney(atomic,1);
}
countDownLatch.countDown();
}, "线程" + a).start();
}
countDownLatch.await();
//不加锁也可以保证线程安全
System.out.println("最终结果" + atomic.getMoney());
}
LongAdder
LongAccumulator
能够自定义计算规则
@FunctionalInterface
public interface LongBinaryOperator {
//left是初始值
long applyAsLong(long left, long);
}
private static void test1() {
//实现函数式接口 LongBinaryOperator ,自定义计算规则,第二个参数是初始值
LongAccumulator longAccumulator = new LongAccumulator((x, y) -> {
return x * y;
}, 1);
//这里的参数 y:5,和原本的值1,通过自定义的计算规则,得到两者的计算结果
longAccumulator.accumulate(5);
longAccumulator.accumulate(5);
//结果为25
System.out.println(longAccumulator.longValue());
}
原子操作和同步方法的性能比较
/**
* 计数器
*/
@NoArgsConstructor
@AllArgsConstructor
@Data
public class LikeCounter {
public volatile long num;
public LikeCounter(long num) {
this.num = num;
}
/**
* 第一种同步方法
*/
public synchronized void add() {
num++;
}
/**
* atomicLong原子操作
*/
AtomicLong atomicLong = new AtomicLong(num);
public void addAto() {
atomicLong.getAndIncrement();
}
public long getAto() {
return atomicLong.get();
}
/**
* 对象的属性修改原子类 atomicLongFieldUpdater
*/
AtomicLongFieldUpdater<LikeCounter> atomicLongFieldUpdater = AtomicLongFieldUpdater.
newUpdater(LikeCounter.class, "num");
public void addAtoField(LikeCounter likeCounter) {
atomicLongFieldUpdater.getAndIncrement(likeCounter);
}
/**
* 原子扩展类 longAdder
*/
LongAdder longAdder = new LongAdder();
public void addLongAdder() {
longAdder.increment();
}
public long getLongAdder() {
return longAdder.longValue();
}
/**
* 原子扩展类 LongAccumulator
*/
LongAccumulator longAccumulator =new LongAccumulator((x,y)->{
return x+y;
},num);
public void addlongAccumulatorr() {
longAccumulator.accumulate(1);
}
public long getlongAccumulator() {
return longAccumulator.get();
}
}
public static void main(String[] args) {
try {
test1();
test2();
test3();
test4();
test5();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static void test1() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.add();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用synchronized方法,执行结果为:"+like1.getNum()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test2() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addAto();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用atomicLong方法,执行结果为:"+like1.getAto()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test3() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addAtoField(like1);
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用atomicLongFieldUpdater方法,执行结果为:"+like1.getNum()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test4() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addLongAdder();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用longAdder方法,执行结果为:"+like1.getLongAdder()+
",耗时:"+(System.currentTimeMillis()-l1));
}
private static void test5() throws Exception {
long l1 = System.currentTimeMillis();
LikeCounter like1 = new LikeCounter(0);
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int a = 0; a < 10000; a++) {
new Thread(() -> {
for (int b = 0; b < 10000; b++) {
like1.addlongAccumulatorr();
}
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
System.out.println("调用longAccumulator方法,执行结果为:"+like1.getlongAccumulator()+
",耗时:"+(System.currentTimeMillis()-l1));
}
调用结果如下:
调用synchronized方法,执行结果为:100000000,耗时:4621
调用atomicLong方法,执行结果为:100000000,耗时:1522
调用atomicLongFieldUpdater方法,执行结果为:100000000,耗时:2611
调用longAdder方法,执行结果为:100000000,耗时:368
调用longAccumulator方法,执行结果为:100000000,耗时:333
Striped64 内部结构
内部变量 base :并发低的时候,直接累加到base上
另外还有一个内部类cell ,和 cell[],并发高的时候,会把线程分散到自己的槽 cell[i] 中
longAdder 为什么快
longAdder 和 AtomicLong对比
方法入参是要增加的值
刚开始 cell[] 等于null,尝试用cas操作更新base值,cas执行成功,把base值改为了期望值,本次add就结束了
随着并发的升高,cas操作失败,就需要执行 longAccumulate方法,去初始化cell数组分散压力
一旦 cell[] 数组初始化完成,就需要判断当前线程所在的 cell 是否为null
为 null,执行 Striped64 的 longAccumulate方法,来初始化对应位置的cell
不为null,就执行对应 cell 的 cas操作,
public void add(long x) {
//b获取的base值,v表示期望值 ,m为数组长度,a表示当前线程命中的数组单元格
Cell[] as; long b, v; int m; Cell a;
//刚开始cells等于null,执行casBase,结果取反
if ((as = cells) != null || !casBase(b = base, b + x)) {
//这个boolean值代表cell数组有没有冲突
boolean uncontended = true;
//判断数组有没有初始化完成
if (as == null || (m = as.length - 1) < 0 ||
//判断线程有没有冲突,等于null说明没有冲突,getProbe()计算hash值
(a = as[getProbe() & m]) == null ||
//如果线程映射的对应位置不为null,就执行对应 cell 的 cas操作,执行成功返回true,取反得到false表没有冲突,结束本次add操作
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
方法属于 Striped64,方法入参:
getProbe()方法,获取线程的hash值,如果返回0,会重新计算一个hash值,重新计算后,认为线程本次操作没有竞争关系,把竞争标志改为 true ,也就是不存在冲突
advanceProbe(h),重置当前线程的hash值,让线程再次参与竞争
longAccumulate 方法分为三大部分
数组没有初始化
数组正在初始化,也计是其他未拿到锁的线程,作为兜底方案,会再这个分支把值直接累加到base上
数组初始化完成
如果数组初始化完成,但是线程对应位置的cell对象为null,就需要初始化cell对象
如果线程竞争标志为false 存在冲突,就把竞争标识改为 true,然后重置当前线程的hash值,重新计算线程的槽位,让线程重新循环参与竞争
如果通过cas操作重新竞争成功,就跳出循环
如果数组的长度 n大于当前cpu的核数,就不可扩容,然后重置当前线程的hash值,让线程重新循环参与竞争
如果cell[] 需要扩容,同样需要拿到cas锁,新数组的长度是原数组的两倍,把原本的cell拷贝到新数组,数组的引用指向新数组后,释放锁
//x 是需要增加的值,fn 默认是null, wasUncontended 是竞争标志,只有cell[]初始化完成,且cas竞争失败从才会是false
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//数组初始化完成
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
//重制竞争标志
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//重新竞争cas操作
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重置hash值
h = advanceProbe(h);
}
//数组没有初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//如果cell为null,初始化cell数组,默认长度为2
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//数组正在初始化,作为一种兜底,把值直接累加到base上
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
sum方法
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}