CAS 介绍与分析
⭐ CAS 是什么?
CAS是英文单词CompareAndSwap 的缩写,中文意思是:比较并替换。CAS需要有3个操作数:内存地址V,旧的预期值A,即将要更新的目标值B。
CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。
在Java中有 AtomicInteger
为代表的都是 通过cas 进行控制,这里以 incrementAndGet()
为例,调用的代码如下:
Copy public final int incrementAndGet() {
return U . getAndAddInt ( this , VALUE , 1 ) + 1 ;
}
@ HotSpotIntrinsicCandidate
public final int getAndAddInt( Object o , long offset , int delta) {
int v;
do {
v = getIntVolatile(o , offset) ;
} while ( ! weakCompareAndSetInt(o , offset , v , v + delta) );
return v;
}
@ HotSpotIntrinsicCandidate
public final boolean weakCompareAndSetInt( Object o , long offset ,
int expected ,
int x) {
return compareAndSetInt(o , offset , expected , x) ;
}
@ HotSpotIntrinsicCandidate
public final native boolean compareAndSetInt( Object o , long offset ,
int expected ,
int x) ;
这里我们可以看出,最终调用的是Unsafe
类(如果对Unsafe类不了解,请阅读 Unsafe应用解析 )
compareAndSetInt
需要三个参数,分别是内存位置 offset,旧的预期值 expected和新的值 x。操作时,先从内存位置读取到值,然后和预期值expected比较。如果相等,则将此内存位置的值改为新值x,返回 true。如果不相等,说明和其他线程冲突了,则不做任何改变,返回 false。
这种机制在不阻塞其他线程的情况下避免了并发冲突,比独占锁的性能高很多。 CAS 在 Java 的原子类和并发包中有大量使用。
⭐ 底层实现
CAS 主要分三步,读取-比较-修改。其中比较是在检测是否有冲突,如果检测到没有冲突后,其他线程还能修改这个值,那么 CAS 还是无法保证正确性。所以最关键的是要保证比较-修改这两步操作的原子性。
CAS 底层是靠调用 CPU 指令集的 cmpxchg 完成的,它是 x86 和 Intel 架构中的 compare and exchange 指令。在多核的情况下,这个指令也不能保证原子性,需要在前面加上 lock 指令。lock 指令可以保证一个 CPU 核心在操作期间独占一片内存区域。那么 这又是如何实现的呢?
在处理器中,一般有两种方式来实现上述效果:总线锁和缓存锁。在多核处理器的结构中,CPU 核心并不能直接访问内存,而是统一通过一条总线访问。总线锁就是锁住这条总线,使其他核心无法访问内存。这种方式代价太大了,会导致其他核心停止工作。而缓存锁并不锁定总线,只是锁定某部分内存区域。当一个 CPU 核心将内存区域的数据读取到自己的缓存区后,它会锁定缓存对应的内存区域。锁住期间,其他核心无法操作这块内存区域。
CAS 就是通过这种方式实现比较和交换操作的原子性的。值得注意的是, CAS 只是保证了操作的原子性,并不保证变量的可见性,因此变量需要加上 volatile 关键字。
⭐ ABA 问题
上面提到,CAS 保证了比较和交换的原子性。但是从读取到开始比较这段期间,其他核心仍然是可以修改这个值的。如果核心将 A 修改为 B,CAS 可以判断出来。但是如果核心将 A 修改为 B 再修改回 A。那么 CAS 会认为这个值并没有被改变,从而继续操作。这是和实际情况不符的。解决方案是加一个版本号。
AQS 介绍与分析
AQS 全称 AbstractQueuedSynchronizer。AQS 中有两个重要的成员:
成员变量 state。用于表示锁现在的状态,用 volatile 修饰,保证内存一致性。同时所用对 state 的操作都是使用 CAS 进行的。state 为0表示没有任何线程持有这个锁,线程持有该锁后将 state 加1,释放时减1。多次持有释放则多次加减。
还有一个双向链表,链表除了头结点外,每一个节点都记录了线程的信息,代表一个等待线程。这是一个 FIFO 的链表。
下面以 ReentrantLock 非公平锁的代码看看 AQS 的原理。
Copy public ReentrantLock( boolean fair) {
sync = fair ? new FairSync() : new NonfairSync() ;
}
ReentrantLock
默认是非公平锁, 如果想实现公平锁,在创建时传入true 即可,从源码看,无论是 FairSync
还是NonfairSync
都继承了Sync
,而 Sync
有继承了 AbstractQueuedSynchronizer
即我们所说的AQS
请求锁时有三种可能:
如果没有线程持有锁,则请求成功,当前线程直接获取到锁。
如果当前线程已经持有锁,则使用 CAS 将 state 值加1,表示自己再次申请了锁,释放锁时减1。这就是可重入性的实现。
Copy public void lock() {
sync . lock ();
}
@ ReservedStackAccess
final void lock() {
if ( ! initialTryLock() )
acquire( 1 ) ;
}
// initialTryLock() 方法
final boolean initialTryLock() {
Thread current = Thread . currentThread ();
// 第一次尝试 cas
//如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。
//此操作具有volatile读写的内存语义
// compareAndSetState(int expect, int update)
// expect - 预期值 update - 新值
if ( compareAndSetState( 0 , 1 ) ) { // first attempt is unguarded
// 这里调用的是AbstractOwnableSynchronizer的方法
// 设置当前拥有独占访问权限的线程。
setExclusiveOwnerThread(current) ;
return true ;
}
// 返回最后由 setExclusiveOwnerThread设置的线程,如果从未设置,则返回 null 。
else if ( getExclusiveOwnerThread() == current) {
// getState() 返回同步状态的当前值。 此操作具有volatile读取的内存语义。
int c = getState() + 1 ;
if (c < 0 ) // overflow
throw new Error( "Maximum lock count exceeded" ) ;
// 设置同步状态的值。 此操作具有volatile写入的内存语义。
setState(c) ;
return true ;
} else
return false ;
}
// acquire(1);
// 以独占模式获取,忽略中断。 通过至少调用一次tryAcquire(int)实现 ,返回成功。
// 否则线程排队,可能反复阻塞和解除阻塞,调用tryAcquire(int)直到成功。
// 该方法可用于实现方法Lock.lock() 。
public final void acquire( int arg) {
if ( ! tryAcquire(arg) )
acquire( null , arg , false , false , false , 0L ) ;
}
// tryAcquire 需要根据 所看的Sync来决定
// 这里的代码与 initialTryLock 的逻辑类似,不做过多的讲解
protected final boolean tryAcquire( int acquires) {
if ( getState() == 0 && compareAndSetState( 0 , acquires) ) {
setExclusiveOwnerThread( Thread . currentThread()) ;
return true ;
}
return false ;
}
// acquire(null, arg, false, false, false, 0L);
final int acquire( Node node , int arg , boolean shared ,
boolean interruptible , boolean timed , long time) {
Thread current = Thread . currentThread ();
byte spins = 0 , postSpins = 0 ; // retries upon unpark of first thread
boolean interrupted = false , first = false ;
Node pred = null ; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if ( ! first && (pred = (node == null ) ? null : node . prev ) != null &&
! (first = (head == pred))) {
if ( pred . status < 0 ) {
//可能从tail重复遍历,取消已取消的节点,直到没有找到为止。
//将可能被重新链接成为下一个合格收购者的节点打开。
cleanQueue() ; // predecessor cancelled
continue ;
} else if ( pred . prev == null ) {
Thread . onSpinWait (); // ensure serialization
continue ;
}
}
if (first || pred == null ) {
boolean acquired;
// 尝试获取共享锁
try {
if (shared)
acquired = ( tryAcquireShared(arg) >= 0 );
else
acquired = tryAcquire(arg) ;
} catch ( Throwable ex) {
cancelAcquire(node , interrupted , false ) ;
throw ex;
}
if (acquired) {
if (first) {
// 将node 赋值给 head
node . prev = null ;
head = node;
pred . next = null ;
node . waiter = null ;
if (shared)
// 在共享模式下唤醒给定的节点
signalNextIfShared(node) ;
if (interrupted)
// 调用本线程的interrupt 进行线程中断
current . interrupt ();
}
return 1 ;
}
}
if (node == null ) { // allocate; retry before enqueue
if (shared)
node = new SharedNode() ;
else
node = new ExclusiveNode() ;
} else if (pred == null ) { // try to enqueue
node . waiter = current;
Node t = tail;
node . setPrevRelaxed (t); // avoid unnecessary fence
if (t == null )
tryInitializeHead() ;
else if ( ! casTail(t , node) )
node . setPrevRelaxed ( null ); // back out
else
t . next = node;
} else if (first && spins != 0 ) {
-- spins; // reduce unfairness on rewaits
Thread . onSpinWait ();
} else if ( node . status == 0 ) {
node . status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = ( byte )((postSpins << 1 ) | 1 );
if ( ! timed)
LockSupport . park ( this );
else if ((nanos = time - System . nanoTime ()) > 0L )
LockSupport . parkNanos ( this , nanos);
else
break ;
node . clearStatus ();
if ((interrupted |= Thread . interrupted ()) && interruptible)
break ;
}
}
return cancelAcquire(node , interrupted , interruptible) ;
}
加锁的逻辑基本上我们整体的看了一遍,接下来我们在看一下如何进行解锁。
Copy // 解锁操作
public void unlock() {
sync . release ( 1 );
}
// 解锁操作
public final boolean release( int arg) {
if ( tryRelease(arg) ) {
signalNext(head) ;
return true ;
}
return false ;
}
// 这里与 initialTryLock() 方法相同
@ ReservedStackAccess
protected final boolean tryRelease( int releases) {
int c = getState() - releases;
// 返回最后由setExclusiveOwnerThread设置的线程,如果从未设置,则返回null 。
// 此方法不会强制执行任何同步或volatile字段访问。
if ( getExclusiveOwnerThread() != Thread . currentThread ())
throw new IllegalMonitorStateException() ;
boolean free = (c == 0 );
if (free)
setExclusiveOwnerThread( null ) ;
setState(c) ;
return free;
}
private static void signalNext( Node h) {
Node s;
if (h != null && (s = h . next ) != null && s . status != 0 ) {
s . getAndUnsetStatus (WAITING);
// 如果给定线程尚不可用,则为其提供许可。
LockSupport . unpark ( s . waiter );
}
}