
【后端-java】各种锁的详解
java锁详解
Synchronized
synchronzied原理
对象头
Monitor
waitset(休息室):当前获得锁的线程(Owner)是屋子的主人,但是该线程可能因为条件不足了(可能等待I/O获取数据)无法继续往下运行了, 这个歌时候就会调用wait(),进入waitset等待条件好了才能继续运行;这个歌时候它就会让出锁,给entrylist排队等待的线程来竞争锁,谁抢到就是谁的;当条件满足了,当前获得锁的线程就会调用notify()唤醒waitset休息室的线程,它就会去重新跟别人一起排队,去竞争锁.
synchronized字节码分析
轻量级锁
1.栈帧的锁记录使用CAS尝试替换对象头的MarkWord
2.当前线程内继续加锁,(锁重入)计数器+1(相应地,解锁-1)
3.别的线程来加锁,发现已经存在轻量级锁了,就会转成重量级锁(锁膨胀)

上图描述了轻量级锁的加锁过程
(1)在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,官方称之为 Displaced Mark Word。这时候线程堆栈与对象头的状态如图2.1所示。
(2)拷贝对象头中的Mark Word复制到锁记录中。
(3)拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock record里的owner指针指向object mark word。如果更新成功,则执行步骤(4),否则执行步骤(5)。
(4)如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,即表示此对象处于轻量级锁定状态,这时候线程堆栈与对象头的状态如图2.2所示。
(5)如果这个更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行。否则说明多个线程竞争锁,轻量级锁就要膨胀为重量级锁,锁标志的状态值变为“10”,Mark Word中存储的就是指向重量级锁(互斥量)的指针,后面等待锁的线程也要进入阻塞状态。 而当前线程便尝试使用自旋来获取锁,自旋就是为了不让线程阻塞,而采用循环去获取锁的过程。
锁膨胀
自旋
偏向锁
偏向锁失效
偏向锁批量重偏向
刚开始偏向锁是加在thread1上的,这个时候30个线程thread2来了,20次以后 ,偏向锁就重定向指向thread2了
偏向锁批量撤销
锁消除
使用tryLock()解决哲学家就餐问题
synchronized,ReentrantLock都是非公平锁,可以抢锁,进行竞争
也可以设置为公平(true)的,但是这样会降低并发度。
volatile
指令重排
volatile原理
应用:单例模式双端检锁机制
无锁-乐观锁(非阻塞)
CAS(compare and set)
看一个转账的例子
分析:
原子类
取款案例:
AQS
UML图
同步队列(AQS)
同步队列,全称为:AbstractQueuedSynchronizer,又可以简称为AQS。在Lock中,这是一个非常重要的核心组件,J.U.C工具包中很多地方都使用到了AQS,所以,如果理解了AQS,那么再去理解ReentrantLock,Condition,CountDownLatch等工具的实现原理,就会非常轻松。
AQS的两种功能
从使用层面来说,AQS 的功能分为两种:独占和共享。
- 独占锁:每次只有一个线程持有锁,如:ReentrantLock 就是以独占方式实现的互斥锁
- 共享锁:允许多个线程同时获取锁 ,并发访问共享资源 , 如:ReentrantReadWriteLock
AQS 的内部实现
AQS依赖内部的一个FIFO双向队列来完成同步状态的管理,当前线程获取锁失败时,AQS会将当前线程以及等待状态等信息构造成为一个节点(Node对象)并将其加入AQS中,同时会阻塞当前线程,当锁被释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。AQS中有一个头(head)节点和一个尾(tail)节点,中间每个节点(Node)都有一个prev和next指针指向前一个节点和后一个节点,如下图:
Node对象组成
AQS中每一个节点就时一个Node对象,并且通过节点中的状态等信息来控制队列,Node对象是AbstractQueuedSynchronizer对象中的一个静态内部类,下面就是Node对象的源码:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;//表示当前线程状态是取消的
static final int SIGNAL = -1;//表示当前线程正在等待锁
static final int CONDITION = -2;//Condition队列有使用到,暂时用不到
static final int PROPAGATE = -3;//CountDownLatch等工具中使用到,暂时用不到
volatile int waitStatus;//节点中线程的状态,默认为0
volatile Node prev;//当前节点的前一个节点
volatile Node next;//当前节点的后一个节点
volatile Thread thread;//当前节点封装的线程信息
Node nextWaiter;//Condition队列中的关系,暂时用不到
final boolean isShared() {//暂时用不到
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {//获取当前节点的上一个节点
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {//构造一个节点:addWaiter方法中会使用,此时waitStatus默认等于0
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { //构造一个节点:Condition中会使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
上面代码中加上了注释,总来来说应该还是比较好理解,注意,Node对象并不是AQS才会使用,Condition队列以及其他工具中也会使用,所以有些状态和方法在这里是暂时用不上的本文就不会过多关注。
自定义锁
自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁
package cn.itcast.n8;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import static cn.itcast.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.TestAqs")
public class TestAqs {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
sleep(1);
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t2").start();
}
}
// 自定义锁(不可重入锁)
class MyLock implements Lock {
// 独占锁 同步器类
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)) {
// 加上了锁,并设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override // 加锁(不成功会进入等待队列)
public void lock() {
sync.acquire(1);
}
@Override // 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override // 尝试加锁(一次)
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override // 尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override // 解锁
public void unlock() {
sync.release(1);
}
@Override // 创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
ReentrantLock原理
synchronized 和 ReentrantLock 区别
-
synchronized 是和 if、else、for、while 一样的关键字*ReentrantLock 是类**,这是二者的本质区别。
-
既然 ReentrantLock 是类,那么它就提供了比synchronized 更多更灵活的特性,可以被继承、可以有方法、可以有各种各样的类变量synchronized 早期的实现比较低效,对比 ReentrantLock,大多数场景性能都相差较大,但是在 Java 6 中对 synchronized 进行了非常多的改进。
-
ReentrantLock 使用起来比较灵活,但是必须有释放锁的配合动作;
-
synchronized 不需要手动获取锁和释放锁,使用简单,发生异常会自动释放锁,不会造成死锁;而 lock 需要自己加锁和释放锁,如果使用不当没有 unLock()去释放锁就会造成死锁。
-
ReentrantLock 只适用于代码块锁,而 synchronized 可以修饰类、方法、代码块等。
-
二者的锁机制其实也是不一样的。ReentrantLock 底层调用的是 Unsafe 的park 方法加锁,synchronized 操作的应该是对象头中 mark word
-
通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
Java中每一个对象都可以作为锁,这是synchronized实现同步的基础:
- 普通同步方法,锁是当前实例对象
- 静态同步方法,锁是当前类的class对象
- 同步方法块,锁是括号里面的对象
公平锁与非公平锁
在Java并发编程中,公平锁与非公平锁是很常见的概念,ReentrantLock、ReadWriteLock默认都是非公平模式,非公平锁的效率为何高于公平锁呢?究竟公平与非公平有何区别呢?
首先先简单从名字上来理解,公平锁就是保障了多线程下各线程获取锁的顺序,先到的线程优先获取锁,而非公平锁则无法提供这个保障。看到网上很多说法说非公平锁获取锁时各线程的的概率是随机的,这也是一种很不确切的说法。非公平锁并非真正随机,其获取锁还是有一定顺序的,但其顺序究竟是怎样呢?先看图:
非公平锁实现原理
非公平锁加锁源码
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁实现
final void lock() {
// 首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果尝试失败,进入 ㈠
acquire(1);
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
// ㈡ tryAcquire
if (
!tryAcquire(arg) &&
// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// ㈡ 进入 ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ㈢ Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}
// ㈣ AQS 继承过来的方法, 方便阅读, 放在此处
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
// 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
// 尝试将 Node 加入 AQS, 进入 ㈥
enq(node);
return node;
}
// ㈥ AQS 继承过来的方法, 方便阅读, 放在此处
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
if (compareAndSetHead(new Node())) {
tail = head;
}
} else {
// cas 尝试将 Node 对象加入 AQS 队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// ㈤ AQS 继承过来的方法, 方便阅读, 放在此处
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
// 上一个节点 help GC
p.next = null;
failed = false;
// 返回中断标记 false
return interrupted;
}
if (
// 判断是否应当 park, 进入 ㈦
shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
parkAndCheckInterrupt()
) {
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈦ AQS 继承过来的方法, 方便阅读, 放在此处
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) {
// 上一个节点都在阻塞, 那么自己也阻塞好了
return true;
}
// > 0 表示取消状态
if (ws > 0) {
// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
非公平锁解锁源码
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// 解锁实现
public void unlock() {
sync.release(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
// 尝试释放锁, 进入 ㈠
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
if (
// 队列不为 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
// unpark AQS 中等待的线程, 进入 ㈡
unparkSuccessor(h);
}
return true;
}
return false;
}
// ㈠ Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// ㈡ AQS 继承过来的方法, 方便阅读, 放在此处
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
公平锁实现源码
公平锁就是保障了多线程下各线程获取锁的顺序,先到的线程优先获取锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
selfInterrupt();
}
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// ㈠ AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
(
// (s = h.next) == null 表示队列中还有没有老二
(s = h.next) == null ||
// 或者队列中老二线程不是此线程
s.thread != Thread.currentThread()
);
}
}
可重入锁(ReentrantLock)
ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次获取不会被阻塞。在java关键字synchronized隐式支持重入性,synchronized通过获取自增,释放自减的方式实现重入。要想支持重入性,就要解决两个问题:
1. 在线程获取锁的时候,如果已经获取锁的线程是当前线程的话则直接再次获取成功;
2. 由于锁会被获取n次,那么只有锁在被释放同样的n次之后,该锁才算是完全释放成功。
可重入现象
可重入原理
static final class NonfairSync extends Sync {
// ...
// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
可打断与不可打断锁
- 打断
- 超时失效
可打断模式源码
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁, 进入 ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// ㈠ 可打断的获取锁流程
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
不可打断模式
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// ...
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
// 还是需要获得锁后, 才能返回打断状态
return interrupted;
}
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (
!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
) {
// 如果打断状态为 true
selfInterrupt();
}
}
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
}
ReentrantReadWriteLock分析
简介
它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读者来访问共享资源,最大可能的读者数为实际的逻辑CPU数。写者是排他性的,一个读写锁同时只能有一个写者或多个读者(与CPU数相关)。
并发线程下,所有线程都执行读的操作 | 不会有问题 |
---|---|
并发线程下,所有线程都执行写会不会有问题 | 会发生写冲突 |
并发线程下,部分读部分写会不会有问题 | 会发生写冲突 |
总结:
读锁是共享的
写锁是排他的
示例:
public class ReadWriteLock {
//创建一个集合
static Map<String,String> map=new HashMap<String,String>();
//创建一个读写锁
static ReentrantReadWriteLock lock=new ReentrantReadWriteLock();
//获取读锁
static Lock readLock=lock.readLock();
//获取写锁
static Lock writeLock=lock.writeLock();
//写操作
public Object put(String key,String value){
writeLock.lock();
try {
System.out.println("Write正在执行写操作~");
Thread.sleep(100);
String put = map.put(key, value);
System.out.println("Write写操作执行完毕~");
return put;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
writeLock.unlock();
}
return null;
}
//写操作
public Object get(String key){
readLock.lock();
try {
System.out.println("Read正在执行读操作~");
Thread.sleep(100);
String value = map.get(key);
System.out.println("Read读操作执行完毕~");
return value;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
readLock.unlock();
}
return null;
}
public static void main(String[] args) {
ReadWriteLock lock=new ReadWriteLock();
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(()->{
try {
//写操作
lock.put(finalI +"","value"+finalI);
//读操作
System.out.println(lock.get(finalI+""));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
互斥原则:
- 读-读能共存,
- 读-写不能共存,
- 写-写不能共存。
特性
ReentrantReadWriteLock有如下特性:
-
获取顺序
- 非公平模式(默认) 当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。
- 公平模式 当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。 当有写线程持有写锁或者有等待的写线程时,一个尝试获取公平的读锁(非重入)的线程就会阻塞。这个线程直到等待时间最长的写锁获得锁后并释放掉锁后才能获取到读锁。
-
可重入 允许读锁可写锁可重入。写锁可以获得读锁,读锁不能获得写锁。
-
锁降级 允许写锁降低为读锁
-
中断锁的获取 在读锁和写锁的获取过程中支持中断
-
支持Condition 写锁提供Condition实现
-
监控 提供确定锁是否被持有等辅助方法
使用
下面一段代码展示了锁降低的操作:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
ReentrantReadWriteLock可以用来提高某些集合的并发性能。当集合比较大,并且读比写频繁时,可以使用该类。下面是TreeMap使用ReentrantReadWriteLock进行封装成并发性能提高的一个例子:
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}
源码分析
构造方法
ReentrantReadWriteLock有两个构造方法,如下:
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
可以看到,默认的构造方法使用的是非公平模式,创建的Sync是NonfairSync对象,然后初始化读锁和写锁。一旦初始化后,ReadWriteLock接口中的两个方法就有返回值了,如下:
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
从上面可以看到,构造方法决定了Sync是FairSync还是NonfairSync。Sync继承了AbstractQueuedSynchronizer,而Sync是一个抽象类,NonfairSync和FairSync继承了Sync,并重写了其中的抽象方法。
Sync分析
Sync中提供了很多方法,但是有两个方法是抽象的,子类必须实现。下面以FairSync为例,分析一下这两个抽象方法:
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
writerShouldBlock和readerShouldBlock方法都表示当有别的线程也在尝试获取锁时,是否应该阻塞。 对于公平模式,hasQueuedPredecessors()方法表示前面是否有等待线程。一旦前面有等待线程,那么为了遵循公平,当前线程也就应该被挂起。 下面再来看NonfairSync的实现:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
从上面可以看到,非公平模式下,writerShouldBlock直接返回false,说明不需要阻塞;而readShouldBlock调用了apparentFirstQueuedIsExcluisve()方法。该方法在当前线程是写锁占用的线程时,返回true;否则返回false。也就说明,如果当前有一个写线程正在写,那么该读线程应该阻塞。
继承AQS的类都需要使用state变量代表某种资源,ReentrantReadWriteLock中的state代表了读锁的数量和写锁的持有与否,整个结构如下:
可以看到state的高16位代表读锁的个数;低16位代表写锁的状态。
获取锁
读锁的获取
当需要使用读锁时,首先调用lock方法,如下:
public void lock() {
sync.acquireShared(1);
}
从代码可以看到,读锁使用的是AQS的共享模式,AQS的acquireShared方法如下:
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
当tryAcquireShared()方法小于0时,那么会执行doAcquireShared方法将该线程加入到等待队列中。 Sync实现了tryAcquireShared方法,如下:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//如果当前有写线程并且本线程不是写线程,不符合重入,失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//得到读锁的个数
int r = sharedCount(c);
//如果读不应该阻塞并且读锁的个数小于最大值65535,并且可以成功更新状态值,成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//如果当前读锁为0
if (r == 0) {
//第一个读线程就是当前线程
firstReader = current;
firstReaderHoldCount = 1;
}
//如果当前线程重入了,记录firstReaderHoldCount
else if (firstReader == current) {
firstReaderHoldCount++;
}
//当前读线程和第一个读线程不同,记录每一个线程读的次数
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//否则,循环尝试
return fullTryAcquireShared(current);
}
从上面的代码以及注释可以看到,分为三步:
- 如果当前有写线程并且本线程不是写线程,那么失败,返回-1
- 否则,说明当前没有写线程或者本线程就是写线程(可重入),接下来判断是否应该读线程阻塞并且读锁的个数是否小于最小值,并且CAS成功使读锁+1,成功,返回1。其余的操作主要是用于计数的
- 如果2中失败了,失败的原因有三,第一是应该读线程应该阻塞;第二是因为读锁达到了上线;第三是因为CAS失败,有其他线程在并发更新state,那么会调动fullTryAcquireShared方法。
fullTryAcquiredShared方法如下:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
//一旦有别的线程获得了写锁,返回-1,失败
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
}
//如果读线程需要阻塞
else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
}
//说明有别的读线程占有了锁
else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//如果读锁达到了最大值,抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//如果成功更改状态,成功返回
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
从上面可以看到fullTryAcquireShared与tryAcquireShared有很多类似的地方。 在上面可以看到多次调用了readerShouldBlock方法,对于公平锁,只要队列中有线程在等待,那么将会返回true,也就意味着读线程需要阻塞;对于非公平锁,如果当前有线程获取了写锁,则返回true。一旦不阻塞,那么读线程将会有机会获得读锁。
写锁的获取
写锁的lock方法如下:
public void lock() {
sync.acquire(1);
}
AQS的acquire方法如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
从上面可以看到,写锁使用的是AQS的独占模式。首先尝试获取锁,如果获取失败,那么将会把该线程加入到等待队列中。 Sync实现了tryAcquire方法用于尝试获取一把锁,如下:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
//得到调用lock方法的当前线程
Thread current = Thread.currentThread();
int c = getState();
//得到写锁的个数
int w = exclusiveCount(c);
//如果当前有写锁或者读锁
if (c != 0) {
// 如果写锁为0或者当前线程不是独占线程(不符合重入),返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果写锁的个数超过了最大值,抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁重入,返回true
setState(c + acquires);
return true;
}
//如果当前没有写锁或者读锁,如果写线程应该阻塞或者CAS失败,返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//否则将当前线程置为获得写锁的线程,返回true
setExclusiveOwnerThread(current);
return true;
}
从代码和注释可以看到,获取写锁时有三步:
- 如果当前有写锁或者读锁。如果只有读锁,返回false,因为这时如果可以写,那么读线程得到的数据就有可能错误;如果有写锁,但是线程不同,即不符合写锁重入规则,返回false
- 如果写锁的数量将会超过最大值65535,抛出异常;否则,写锁重入
- 如果没有读锁或写锁的话,如果需要阻塞或者CAS失败,返回false;否则将当前线程置为获得写锁的线程
从上面可以看到调用了writerShouldBlock方法,FairSync的实现是如果等待队列中有等待线程,则返回false,说明公平模式下,只要队列中有线程在等待,那么后来的这个线程也是需要记入队列等待的;NonfairSync中的直接返回的直接是false,说明不需要阻塞。从上面的代码可以得出,当没有锁时,如果使用的非公平模式下的写锁的话,那么返回false,直接通过CAS就可以获得写锁。
总结
从上面分析可以得出结论:
- 如果当前没有写锁或读锁时,第一个获取锁的线程都会成功,无论该锁是写锁还是读锁。
- 如果当前已经有了读锁,那么这时获取写锁将失败,获取读锁有可能成功也有可能失败
- 如果当前已经有了写锁,那么这时获取读锁或写锁,如果线程相同(可重入),那么成功;否则失败
释放锁
获取锁要做的是更改AQS的状态值以及将需要等待的线程放入到队列中;释放锁要做的就是更改AQS的状态值以及唤醒队列中的等待线程来继续获取锁。
读锁的释放
ReadLock的unlock方法如下:
public void unlock() {
sync.releaseShared(1);
}
调用了Sync的releaseShared方法,该方法在AQS中提供,如下:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
调用tryReleaseShared方法尝试释放锁,如果释放成功,调用doReleaseShared尝试唤醒下一个节点。 AQS的子类需要实现tryReleaseShared方法,Sync中的实现如下:
protected final boolean tryReleaseShared(int unused) {
//得到调用unlock的线程
Thread current = Thread.currentThread();
//如果是第一个获得读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
}
//否则,是HoldCounter中计数-1
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//死循环
for (;;) {
int c = getState();
//释放一把读锁
int nextc = c - SHARED_UNIT;
//如果CAS更新状态成功,返回读锁是否等于0;失败的话,则重试
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
从上面可以看到,释放锁的第一步是更新firstReader或HoldCounter的计数,接下来进入死循环,尝试更新AQS的状态,一旦更新成功,则返回;否则,则重试。 释放读锁对读线程没有影响,但是可能会使等待的写线程解除挂起开始运行。所以,一旦没有锁了,就返回true,否则false;返回true后,那么则需要释放等待队列中的线程,这时读线程和写线程都有可能再获得锁。
写锁的释放
WriteLock的unlock方法如下:
public void unlock() {
sync.release(1);
}
Sync的release方法使用的AQS中的,如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
调用tryRelease尝试释放锁,一旦释放成功了,那么如果等待队列中有线程再等待,那么调用unparkSuccessor将下一个线程解除挂起。 Sync需要实现tryRelease方法,如下:
protected final boolean tryRelease(int releases) {
//如果没有线程持有写锁,但是仍要释放,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//如果没有写锁了,那么将AQS的线程置为null
if (free)
setExclusiveOwnerThread(null);
//更新状态
setState(nextc);
return free;
}
从上面可以看到,写锁的释放主要有三步:
- 如果当前没有线程持有写锁,但是还要释放写锁,抛出异常
- 得到解除一把写锁后的状态,如果没有写锁了,那么将AQS的线程置为null
- 不管第二步中是否需要将AQS的线程置为null,AQS的状态总是要更新的
从上面可以看到,返回true当且只当没有写锁的情况下,还有写锁则返回false。
其他方法
看完了ReentrantReadWriteLock中的读锁的获取和释放,写锁的获取和释放,再来看一下其余的一些辅助方法来加深我们对读写锁的理解。
getOwner()
getOwner方法用于返回当前获得写锁的线程,如果没有线程占有写锁,那么返回null。实现如下:
protected Thread getOwner() {
return sync.getOwner();
}
可以看到直接调用了Sync的getOwner方法,下面是Sync的getOwner方法:
final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}
如果独占锁的个数为0,说明没有线程占有写锁,那么返回null;否则返回占有写锁的线程。
getReadLockCount()
getReadLockCount()方法用于返回读锁的个数,实现如下:
public int getReadLockCount() {
return sync.getReadLockCount();
}
Sync的实现如下:
final int getReadLockCount() {
return sharedCount(getState());
}
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
从上面代码可以看出,要想得到读锁的个数,就是看AQS的state的高16位。这和前面讲过的一样,高16位表示读锁的个数,低16位表示写锁的个数。
getReadHoldCount()
getReadHoldCount()方法用于返回当前线程所持有的读锁的个数,如果当前线程没有持有读锁,则返回0。直接看Sync的实现即可:
final int getReadHoldCount() {
//如果没有读锁,自然每个线程都是返回0
if (getReadLockCount() == 0)
return 0;
//得到当前线程
Thread current = Thread.currentThread();
//如果当前线程是第一个读线程,返回firstReaderHoldCount参数
if (firstReader == current)
return firstReaderHoldCount;
//如果当前线程不是第一个读线程,得到HoldCounter,返回其中的count
HoldCounter rh = cachedHoldCounter;
//如果缓存的HoldCounter不为null并且是当前线程的HoldCounter,直接返回count
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
//如果缓存的HoldCounter不是当前线程的HoldCounter,那么从ThreadLocal中得到本线程的HoldCounter,返回计数
int count = readHolds.get().count;
//如果本线程持有的读锁为0,从ThreadLocal中移除
if (count == 0) readHolds.remove();
return count;
}
从上面的代码中,可以看到两个熟悉的变量,firstReader和HoldCounter类型。这两个变量在读锁的获取中接触过,前面没有细说,这里细说一下。HoldCounter类的实现如下:
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
readHolds是ThreadLocalHoldCounter类,定义如下:
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
可以看到,readHolds存储了每一个线程的HoldCounter,而HoldCounter中的count变量就是用来记录线程获得的写锁的个数。所以可以得出结论:Sync维持总的读锁的个数,在state的高16位;由于读线程可以同时存在,所以每个线程还保存了获得的读锁的个数,这个是通过HoldCounter来保存的。 除此之外,对于第一个读线程有特殊的处理,Sync中有如下两个变量:
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
firstReader表示第一个得到读锁的线程,firstReaderHoldCount表示这个线程获得的写锁。所以可以得出结论:第一个获取到读锁的信息保存在firstReader中;其余获取到读锁的线程的信息保存在HoldCounter中。 看完了HoldCounter和firstReader,再来看一下getReadLockCount的实现,主要有三步:
- 当前没有读锁,那么自然每一个线程获得的读锁都是0;
- 如果当前线程是第一个获取到读锁的线程,那么返回firstReadHoldCount;
- 如果当前线程不是第一个获取到读锁的线程,得到该线程的HoldCounter,然后返回其count字段。如果count字段为0,说明该线程没有占有读锁,那么从readHolds中移除。获取HoldCounter分为两步,第一步是与cachedHoldCounter比较,如果不是,则从readHolds中获取。
getWriteLockCount()
getWriteLockCount()方法返回写锁的个数,Sync的实现如下:
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
可以看到如果没有线程持有写锁,那么返回0;否则返回AQS的state的低16位。
总结
从上面的分析可以得出:
- 如果当前是写锁被占有了,只有当写锁的数据降为0时才认为释放成功;否则失败。因为只要有写锁,那么除了占有写锁的那个线程,其他线程即不可以获得读锁,也不能获得写锁
- 如果当前是读锁被占有了,那么只有在写锁的个数为0时才认为释放成功。因为一旦有写锁,别的任何线程都不应该再获得读锁了,除了获得写锁的那个线程。