前言 并发场景下通常会使用 AtomicLong
原子类进行计数等操作,在 JDK1.8 中提供了 LongAdder
来实现类似功能。相对于 AtomicLong
,LongAdder
有着更高的性能,可以完全替代 AtomicLong
的计数操作。
下面我们先对 AtomicLong
简单介绍,然后重点分析 LongAdder
的实现。
AtomicLong AtomicLong 相关的类图如下:
AtomicLong 关键的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class AtomicLong extends Number implements java .io .Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicLong.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile long value; public final long incrementAndGet () { return unsafe.getAndAddLong(this , valueOffset, 1L ) + 1L ; } }
AtomicLong 底层是使用 Unsafe
的 CAS 来实现的,当操作失败时会以自旋的方式重试,直到成功才会退出自旋。因此在高并发场景下,可能会有很多线程处于重试状态,导致性能下降。具体表现在源码中的部分如下:
1 2 3 4 5 6 7 8 9 +--- Unsafe public final long getAndAddLong (Object var1, long var2, long var4) { long var6; do { var6 = this .getLongVolatile(var1, var2); } while (!this .compareAndSwapLong(var1, var2, var6, var6 + var4)); return var6; }
可以看到,使用 AtomicLong
进行计数操作时,多个线程竞争修改共享资源 value
值时是通过自旋来完成的。在高并发环境下,同一时刻只有一个线程可以 CAS 操作成功,其它线程都会 CAS 失败,从而会不断重试直到成功才会退出自旋,这就是对性能造成影响的原因。
LongAdder LongAdder 相关类图如下:
LongAdder 本质上是使用空间换时间的思想 来进行设计的,不过消耗的内存空间可以忽略不计。它维护了一个基础变量 base
作为基础计数器,在竞争比较小的情况下 CAS 操作该数据即可;为了应对竞争激烈的场景,它里面还维护了一组按需分配的计数单元数组 Cell[]
,处理计数请求时,不同线程可以映射到不同的计算单元上进行计数,采用分而治之 的思想减小了线程竞争,将操作单个共享资源的压力分摊到多个计算单元上,提高了并发度。
通过将请求计数压力分摊,LongAdder 可以提供比 AtomicLong 更好的性能。获取计数值时,只要将 base
与 Cell[]
数组中的计数单元累加即可。当然,在并发很低的情况,使用 AtomicLong 更简单直接一些,并且效率稍微高一些。
源码分析 LongAdder 源码整体实现如下图所示:
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 abstract class Striped64 extends Number { static final int NCPU = Runtime.getRuntime().availableProcessors(); transient volatile Cell[] cells; transient volatile long base; transient volatile int cellsBusy; }
可以看到,LongAdder 本身中没有相关属性,它是通过直接继承 Striped64 类中属性。以上四个属性已经详细注释,在对应的场景下再详细介绍。
计数单元 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 +--- Striped64 @sun .misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas (long cmp, long val) { return UNSAFE.compareAndSwapLong(this , valueOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class ; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value" )); } catch (Exception e) { throw new Error(e); } } }
计数单元 Cell
是 Striped64 的内部类,用于某个/某些线程处理计数请求的,也就是一个 Cell
可以对应多个线程。多个线程操作 Cell
数组的原理如下:
通过定义一个 volatile
类型的 value
属性作为计数值,使用 Unsafe 的 CAS 来修改它的值。LongAdder 计数器的值就是所有 Cell 的值加上 base 的值。
注意: 该类使用了 @sun.misc.Contended
注解,这个注解的作用是用来解决伪共享问题的,关于伪共享可参考 「伪共享」 。
计数方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class LongAdder extends Striped64 implements Serializable { private static final long serialVersionUID = 7249069246863182397L ; public LongAdder () { } public void increment () { add(1L ); } public void decrement () { add(-1L ); } }
可以看到,计数时无论是递增还是递减,都是调用了 add()
方法,它是计数的核心。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 +--- LongAdder public void add (long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null , uncontended); } }
上述方法的分支逻辑有点多,不过总体上是根据是否出现过竞争 来处理的。下面我们进行详细分析:
没有出现竞争的情况: 没有出现竞争就意味着 cells 数组为空,此时线程处理计数请求只需要 CAS 操作 base 值即可,操作成功直接结束;操作失败说明存在竞争,此时需要通过计数单元分摊计数请求,此时 as == null
成立,会进入到 longAccumulate
方法;
出现过竞争的情况: 已经出现过竞争就意味着 cells 数组已经存在,此时线程处理计算请求需要映射寻找对应的计数单元完成计数,也就不会操作 base 值了。此时会通过 a = as[getProbe() & m])
来映射对应的计数单元,如果当前线程没有对应的计数单元,或者 CAS 操作计算单元失败「有冲突」,都会进入到 longAccumulate
方法。
需要说明的是,线程在映射计数单元时,和 HashMap 中定位哈希桶一样的方式,只是这个 hash
值是线程相关的值。
关于以上两种情况的详细说明,已经在代码中详细注释,就不再展开说明。
longAccumulate 通过前文我们知道,进入 longAccumulate
方法的情况总得来说有两种,CAS 操作 base 值失败,需要初始化 Cell
计数单元数组;线程没有映射到计数单元或 CAS 操作计数单元失败。也就是该方法集计数单元数组初始化、创建新的计数单元、竞争更新、扩容计数数组 操作于一体,总体比较复杂。下面我们将该方法分成已初始化计数单元数组 和未初始化计数单元数组 进行分析。
已初始化计数单元数组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 +--- Striped64 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (; ; ) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { boolean created = false ; try { Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (a.cas(v = a.value, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; else if (n >= NCPU || cells != as) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { Cell[] rs = new Cell[n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { } } }
上述方法总体上还是比较复杂的,下面对主要的分支逻辑进行说明:
当前线程映射的位置没有对应的计数单元 Cell,则创建 Cell 并加入到对应位置上。加入的过程使用 spin lock;
当前线程调用 add() 方法 CAS 操作映射的 Cell 失败,那么通过重置当前线程 probe 然后自旋重试。注意: 此时不着急扩容 Cell 数组,这种情况下继续重试;
使用 CAS 尝试更新当前线程映射的 Cell 的值,执行到这里,至少已经经历过了第 2 步。更新成功则结束,更新失败则继续下一步判断;
CAS 尝试更新当前线程映射的 Cell 的值失败,说明多个线程映射到了同一个 Cell 导致冲突,按理说此时最好扩容,但实际上并没有扩容,而是判断 cells 数组的长度是否达到了 CPU 核心数,或者 cells 数组是否已经扩容了。针对这种情况,还是不着急扩容,继续自旋重试。这种情况是合理的,下文会重点分析下;
如果是发生碰撞竞争失败,即多个线程映射到了同一个 Cell 导致更新失败,那么解除碰撞标志,这种情况再给个机会重试下;
最多重试三次后仍然失败,那么就需要扩容 Cell 数组了。扩容的时候使用 CAS 抢占扩容资格 cellsBusy,抢占成功进行扩容;注意: 扩容并没有完成计数操作,扩容完成后需要继续自旋完成计数,由于扩容了,自旋一次就能成功的可性能很大;
特别说明: 如果 cells 数组的长度达到了 CPU 核心数就不会扩容了,即使竞争激烈。我么知道 cells 数组的大小是 2^n ,这就意味着 cells 数组最大只能达到 >=
NCPU 的最小2次方;比如服务器是 8 核的,那么 cells 数组的最大只会到 8 ,达到 8 就不会扩容了。因为同一个 CPU 核心同时只能运行一个线程,并发执行的时候,服务器同一时刻能运行的线程数是固定的。
未初始化计数单元数组
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 final void longAccumulate (long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0 ) { ThreadLocalRandom.current(); h = getProbe(); wasUncontended = true ; } boolean collide = false ; for (; ; ) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0 ) { } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false ; try { if (cells == as) { Cell[] rs = new Cell[2 ]; rs[h & 1 ] = new Cell(x); cells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (casBase(v = base, ((fn == null ) ? v + x : fn.applyAsLong(v, x)))) break ; } }
这种情况比较简单,具体流程如上,代码中已经详细注释。需要说明的是,如果初始化的时候也有竞争,那么竞争失败的线程会尝试更新基础计数器 base,成功直接返回,失败了才会继续重试 。
sum() 方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 +--- LongAdder public long sum () { Cell[] as = cells; Cell a; long sum = base; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
LongAdder 加法器的值包含两部分,将 base
和所有 Cell
的值相加就是结果。
这里需要注意,如果前面已经累加过的 Cell
的 value
有修改,那么就无法计算到了。由此看出,LongAdder 不是强一致性的。
LongAdder 小结 JDK 不仅提供了支持 long 类型计数器,也提供了 double 类型的 DoubleAdder 计数器,它们都继承了 Striped64
,原理是类似的。
小结 LongAdder 通过 base
和 Cell
数组来存储值。当没有竞争的情况下直接 CAS 操作 base 即可;当存在竞争时,采用分而治之 思想将不同线程处理计数请求映射到不同的 Cell 上以增加并发度。
在并发较小时,使用 AtomicLong 更简单高效;在并发较大时,LongAdder 性能更高,随着扩容 Cell 数组,最终可以达到一种无竞争的状态。