共享模型之管程
共享问题
临界区
多个线程访问共享资源,且在多个线程对共享资源读写操作时发生指令交错,就会出现问题
临界区:存在对共享资源的多线程读写操作的代码块
竞态条件
多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
解决方式
- 非阻塞式解决方案:原子变量
- 阻塞式解决方案:Lock、synchronized
synchronized 初识
synchronized 概述
synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。
synchronized 语法
synchronized(对象) {临界区
}
方法上的 synchronized
对于成员方法,锁当前对象
class Test{public synchronized void test() {}
}// 等价于class Test{public void test() {synchronized(this) {}}
}
对于非成员方法,锁 Class 类对象
class Test{public synchronized static void test() {}
}// 等价于class Test{public static void test() {synchronized(Test.class) {}}
}
线程安全分析
线程安全场景分析
成员变量
多个线程访问一个共享资源会出现线程安全问题。
class ThreadUnsafe {ArrayList<String> list = new ArrayList<>();public void method1(int loopNumber) {for (int i = 0; i < loopNumber; i++) {// { 临界区, 会产生竞态条件method2();method3();// } 临界区}}private void method2() {// 不是原子操作list.add("1");}private void method3() {// 不是原子操作list.remove(0);}
}
局部变量
每个线程都使用各自的资源,不会出现问题。
class ThreadSafe {public final void method1(int loopNumber) {ArrayList<String> list = new ArrayList<>();for (int i = 0; i < loopNumber; i++) {method2(list);method3(list);}}private void method2(ArrayList<String> list) {list.add("1");}private void method3(ArrayList<String> list) {list.remove(0);}
}
常见线程安全类
- String、Integer
- StringBuffer
- Vector、HashTable
- juc 包的类
对于线程安全类,多个线程调用单个实例的某个方法时,是线程安全的。方法是原子操作,但多个方法组合不是原子的。
String、Integer 等不可变类是线程安全的。
无状态(没有成员变量)的对象也是线程安全的。
String 类为什么设计成 final 修饰的?
避免子类继承后重写方法后,破坏线程安全性。
Monitor
Java 对象头
Java 对象的组成部分
Mark Word 结构
锁状态 | 25bit | 4bit | 1bit | 2bit | |
---|---|---|---|---|---|
23bit | 2bit | 是否偏向锁 | 锁标志位 | ||
无锁 | 对象的HashCode | 分代年龄 | 0 | 01 | |
偏向锁 | 线程ID | Epoch | 分代年龄 | 1 | 01 |
轻量级锁 | 指向栈中锁记录的指针 | 00 | |||
重量级锁 | 指向重量级锁的指针 | 10 | |||
GC标记 | 空 | 11 |
Monitor 原理
Monitor 是监视器或管程。
每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针。
- 刚开始 Monitor 中 Owner 为 null
- 当 Thread-2 执行
synchronized(obj)
就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一个 Owner - 在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行
synchronized(obj)
,就会进入 EntryList,线程进入 BLOCKED 状态 - Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争是非公平的
- 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足,进入 WAITING 状态的线程,用于 wait-notify 机制
synchronized 原理
锁升级过程
轻量级锁
无竞争线程时使用轻量级锁,有竞争会膨胀为重量级锁。
1、创建锁记录 Object,每个线程都的栈帧都包含一个锁记录,内部可以存储锁定对象的 Mark Word

2、让锁记录中 Object reference 指向锁对象,并尝试使用 CAS 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录

3、如果 CAS 替换成功,对象头中存储了锁记录地址和状态 00 ,表示由该线程给对象加锁

4、如果 CAS 失败,有两种情况
- 如果是其它线程已经持有了该对象的轻量级锁。这表明有竞争,进入锁膨胀,升级成重量级锁
- 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数

5、当退出 synchronized 代码块时,如果有取值为 null 的锁记录(重入锁),这时重置锁记录,表示重入计数 -1

6、当退出 synchronized 代码块,锁记录的值不为 null,尝试使用 CAS 将 Mark Word 的值恢复给对象 头
- 恢复成功,则解锁成功
- 恢复失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
锁膨胀
尝试加轻量级锁的过程中,CAS 操作无法成功(有其它线程已经为此对象加上了轻量级锁),会进行锁膨胀,变成重量级锁。
1、当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
2、这时 Thread-1 加轻量级锁失败,进入锁膨胀流程。
- 为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
- 然后自己进入 Monitor 的 EntryList 中,线程进入 BLOCKED 状态
3、当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,恢复失败。这时会进入重量级解锁流程,按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 的线程
自旋优化
锁竞争的时可以使用自旋来进行优化。如果当前线程自旋成功(即这时候持锁线程已经释放了锁),当前线程就可以避免阻塞。
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势
线程加入 EntryList 进入 BLOCKED 阻塞状态需要进行上下文切换,消耗较多资源。
偏向锁
偏向状态
问题:轻量级锁在没有竞争时(就当前线程使用),每次重入仍然需要执行 CAS 操作。
偏向锁:第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有。
处于偏向锁的对象解锁后,线程 id 仍存储于对象头中
- 如果开启了偏向锁(默认开启),那么对象创建后,markword 值最后 3 位为 101,这时它的 thread、epoch、age 都为 0
- 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数
- XX:BiasedLockingStartupDelay=0
来禁用延迟 - 如果没有开启偏向锁,那么对象创建后,markword 值最后 3 位为 001,这时它的 hashcode、 age 都为 0。正常状态对象一开始是没有 hashCode 的,第一次调用才生成
撤销偏向锁 - hashCode
调用了对象的 hashCode()
方法,但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode()
会导致偏向锁被撤销
轻量级锁会在锁记录中记录 hashCode(CAS 交换到锁对象中)
重量级锁会在 Monitor 中记录 hashCode
撤销偏向锁 - 其它线程使用对象
当有其它线程使用偏向锁对象时(不是竞争锁),会将偏向锁升级为轻量级锁
撤销偏向锁 - 调用 wait/notify
public static void main(String[] args) throws InterruptedException {Dog d = new Dog();new Thread(() -> {log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));synchronized (d) {log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));try {d.wait();} catch (InterruptedException e) {e.printStackTrace();}log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));}}, "t1").start();new Thread(() -> {try {Thread.sleep(6000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (d) {log.debug("notify");d.notify();}}, "t2").start();
}
[t1] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000101
[t1] - 00000000 00000000 00000000 00000000 00011111 10110011 11111000 00000101
[t2] - notify
[t1] - 00000000 00000000 00000000 00000000 00011100 11010100 00001101 11001010
批量重偏向 & 批量撤销
当其他线程使用导致撤销偏向锁阈值超过 20 次后,jvm 会在给这些对象加锁时重新偏向至加锁线程
当撤销偏向锁阈值超过 40 次后,整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的
锁消除
JVM 会在运行时检测到某些代码段中的锁实际上并不需要被持有,从而消除这些不必要的锁。
wait & notify
wait & notify 概述
obj.wait()
:让当前线程进入 object 监视器的线程到 waitSet 等待。会释放对象的锁,从而让其他线程就机会获取对象的锁,无限制等待,直到被notify()
为止obj.notify()
:在 object 上正在 WaitSet 等待的线程中挑一个唤醒obj.notifyAll()
:让 object 上正在 WaitSet 等待的线程全部唤醒
sleep(long n)
和wait(long n)
的区别:
- sleep 是 Thread 方法,而 wait 是 Object 的方法
- sleep 不需要强制和 synchronized 配合使用,但 wait 必须和 synchronized 一起用
- sleep 在睡眠时不会释放对象锁的,但 wait 在等待的时会释放对象锁
- 它们的状态都是 TIMED_WAITING
wait & notify 最佳实践
static boolean ok = false;// 线程1
synchronized(lock) {while(!ok) {lock.wait();}// 干活
}// 线程2
synchronized(lock) {ok = true;lock.notifyAll();
}
同步:保护性暂停
保护性暂停:用于在一个线程等待另一个线程的执行结果
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(生产者/消费者模型)
class GuardedObject {private Object res;public synchronized Object get() {while (res == null) {try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}return res;}public synchronized Object get(long timeout) {long begin = System.currentTimeMillis();long passed = 0;while (res == null) {long wait = timeout - passed;if (wait <= 0) {break;}try {this.wait(wait);} catch (InterruptedException e) {e.printStackTrace();}passed = System.currentTimeMillis() - begin;}return res;}public synchronized void set(Object res) {this.res = res;this.notifyAll();}
}
join() 的原理
join()
:调用者轮询检查线程 alive 的状态
public final synchronized void join(final long millis)throws InterruptedException {if (millis > 0) {if (isAlive()) {final long startTime = System.nanoTime();long delay = millis;do {wait(delay);} while (isAlive() && (delay = millis -TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);}} else if (millis == 0) {while (isAlive()) {wait(0);}} else {throw new IllegalArgumentException("timeout value is negative");}
}
异步:生产者消费者
- 不需要产生和消费的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK 中各种阻塞队列,采用的就是这种模式
class Message {private int id;private Object message;public Message(int id, Object message) {this.id = id;this.message = message;}public int getId() {return id;}public Object getMessage() {return message;}
}class MessageQueue {private LinkedList<Message> queue;private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;queue = new LinkedList<>();}public Message take() {synchronized (queue) {while (queue.isEmpty()) {try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}Message message = queue.removeFirst();queue.notifyAll();return message;}}public void put(Message message) {synchronized (queue) {while (queue.size() == capacity) {try {queue.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.addLast(message);queue.notifyAll();}}
}
park & unpark
park & unpark 概述
LockSupport 类中的方法,先 park 再 unpark。
// 暂停当前线程
LockSupport.park(); // 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
特点:
- wait,notify 和 notifyAll 必须配合 Object Monitor(synchronized 代码块) 一起使用,而 park,unpark 不需要
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
- park & unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,没那么精确
park & unpark 原理
park()
- 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为 0,获得_mutex
互斥锁 - 线程进入
_cond
条件变量阻塞 - 设置
_counter = 0
park() -> unpark()
- 调用
Unsafe.unpark(Thread_0)
方法,设置_counter
为 1 - 唤醒
_cond
条件变量中的 Thread_0 - Thread_0 恢复运行
- 设置
_counter
为 0
unpark() -> park()
- 调用
Unsafe.unpark(Thread_0)
方法,设置_counter
为 1 - 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为 1,这时线程无需阻塞,继续运行 - 设置
_counter
为 0
线程状态转换
-
New -> Runnable:调用
t.start()
方法 -
Runnable <–> Waiting:
- 在
synchronized
获取锁之后,调用obj.wait()
方法 - 调用
obj.notify()
、obj.notifyAll()
、t.interrupt()
方法
- 在
-
Runnable <–> Waiting:
- 调用
t.join()
方法,等待指定线程结束 - 线程运行结束,或调用了当前线程的
interrupt()
时
- 调用
-
Runnable <–> Waiting:
- 调用
LockSupport.park()
方法 - 调用
LockSupport.unpark()
方法,或调用了当前线程的interrupt()
时
- 调用
-
Runnable <–> Timed_Waiting:
- 在 synchronized 获取锁之后,调用
obj.wait(long n)
方法 - 到达指定时间后,或调用
obj.notify()
、obj.notifyAll()
、t.interrupt()
方法
- 在 synchronized 获取锁之后,调用
-
Runnable <–> Timed_Waiting:
- 调用
t.join(long n)
方法 - 线程运行结束、到达指定时间后,或调用了当前线程的
interrupt()
时
- 调用
-
Runnable <–> Timed_Waiting:
- 调用
LockSupport.parkNanos(long nanos)
方法 - 到达指定时间后、调用
LockSupport.unpark()
方法,或调用了当前线程的interrupt()
时
- 调用
-
Runnable <–> Timed_Waiting:
- 调用
Thread.sleep(long n)
方法 - 到达指定时间后
- 调用
-
Runnable <–> Blocked:
- 等待
synchronized
代码块并竞争锁 - 竞争成功,获取到锁
- 等待
-
Runnable -> Terminated:线程运行结束
活跃性
死锁
死锁:t1 线程获得A对象锁,接下来想获取B对象的锁;t2 线程获得B对象锁,接下来想获取A对象的锁。
定位死锁:检测死锁可以使用 jconsole工具,或使用 jps 定位进程 id,再用 jstack 定位死锁。
哲学家就餐问题

- 有五位哲学家,围坐在圆桌旁。
- 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
- 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
- 如果筷子被身边的人拿着,自己就得等待。
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。
举例:t1 线程 a++,t2 线程 a-- 且两者频率相同。
饥饿
按顺序获取锁时,已获取到多个锁的线程更容易被调度运行。
ReentrantLock
ReentrantLock 概述
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 可重入
// 获取锁
reentrantLock.lock();
try {// 临界区
} finally {// 释放锁reentrantLock.unlock();
}
可重入
同一个线程如果首次获得了这把锁后,可以再次获取这把锁
static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) {lock.lock();try {log.debug("main 获取锁");func();} finally {lock.unlock();}
}static void func() {lock.lock();try {log.debug("func 获取锁");} finally {lock.unlock();}
}
15:22:43 [main] c.App - main 获取锁
15:22:43 [main] c.App - func 获取锁
可中断
等待锁时可以使用 interrupt()
打断等待。
lock.lockInterruptibly();
static ReentrantLock lock = new ReentrantLock();public static void main(String[] args) throws InterruptedException {Thread t = new Thread(() -> {log.debug("开始执行");try {lock.lockInterruptibly();} catch (InterruptedException e) {log.debug("被打断");return;}try {log.debug("获取到锁");} finally {lock.unlock();}}, "t");lock.lock();try {t.start();Thread.sleep(1000);t.interrupt();} finally {lock.unlock();}
}
15:27:59 [t] c.App - 开始执行
15:28:00 [t] c.App - 被打断
锁超时
// 立即失败
lock.tryLock();// 超时失败
lock.tryLock(1, TimeUnit.SECONDS);
public static void main(String[] args) throws InterruptedException {ReentrantLock lock = new ReentrantLock();Thread t1 = new Thread(() -> {log.debug("启动...");if (!lock.tryLock()) {log.debug("获取立刻失败,返回");return;}try {log.debug("获得了锁");} finally {lock.unlock();}}, "t1");lock.lock();try {log.debug("获得了锁");t1.start();Thread.sleep(2);} finally {lock.unlock();}
}
15:42:23 [main] c.App - 获得了锁
15:42:23 [t1] c.App - 启动...
15:42:23 [t1] c.App - 获取立刻失败,返回
公平锁
ReentrantLock 默认是不公平的
// 创建公平锁
ReentrantLock lock = new ReentrantLock(true);
条件变量
synchronized
中也有条件变量,当条件不满足时(调用wait()
方法)进入 waitSet 等待
ReentrantLock
支持多个条件变量的。
- await 前需要先获得锁
- await 执行后,会释放锁,进入 condition 等待
- await 的线程被唤醒(或打断、或超时)会重新竞争锁
- 竞争 lock 锁成功后,从 await 后继续执行
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;public static void main(String[] args) {new Thread(() -> {try {lock.lock();while (!hasCigrette) {try {waitCigaretteQueue.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("等到了它的烟");} finally {lock.unlock();}}).start();new Thread(() -> {try {lock.lock();while (!hasBreakfast) {try {waitbreakfastQueue.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("等到了它的早餐");} finally {lock.unlock();}}).start();sleep(1);sendBreakfast();sleep(1);sendCigarette();
}private static void sendCigarette() {lock.lock();try {log.debug("送烟来了");hasCigrette = true;waitCigaretteQueue.signal();} finally {lock.unlock();}
}private static void sendBreakfast() {lock.lock();try {log.debug("送早餐来了");hasBreakfast = true;waitbreakfastQueue.signal();} finally {lock.unlock();}
}
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟