手撕J.U.C与多线程


发表于 修改于 后端知识整理 9380 字 1 小时

常用类

线程

线程状态

graph LR
NEW[新建<br>NEW] --"Thread.start()"--> RUNNABLE[运行<br>RUNNABLE]
RUNNABLE --"synchronized<br>Lock.lock()"--> BLOCKED[阻塞<br>BLOCKED]--异常--> TERMINATED
RUNNABLE --"Object.wait()<br>Thread.join()<br>LockSupport.park()"--> WAITING[等待<br>WAITING]--异常--> TERMINATED
RUNNABLE --"Thread.sleep(long millis)<br>Object.wait(long timeout)<br>Thread.join(long timeout)<br>LockSupport.parkNanos()<br>LockSupport.parkUntil()"--> TIMED_WAITING[限时等待<br>TIMED_WAITING]--异常-->TERMINATED
RUNNABLE --"运行结束/异常"--> TERMINATED[终止<br>TERMINATED]
public enum State {
/* 【新建】
* 尚未启动的线程处于此状态。
*/
NEW,
/* 【运行】
* 在Java虚拟机中执行的线程处于此状态。
*/
RUNNABLE,
/* 【阻塞】
* 一个阻塞线程在等待获取monitor锁
* tips:当线程调用Object.wait()方法进入一个synchronized块/方法
* 或重进入一个synchronized锁/方法时会等待获取monitor锁。
*/
BLOCKED,
/* 【等待】
* 线程处于WAITING状态的场景。
* 调用Object.wait(),未指定超时值。
* 调用Thread.join(),未指定超时值。
* 调用LockSupport.park()。
*/
WAITING,
/* 【限时等待】
* 调用Thread.sleep方法
* 调用Object.wait(long timeout),指定超时值。
* 调用Thread.join(long timeout),指定超时值。
* 调用LockSupport.parkNanos()。
* 调用LockSupport.parkUntil()。
*/
TIMED_WAITING,
/* 【终止】
* 线程已完成执行。
*/
TERMINATED;
}

Runnable&Callable

//无返回值
Runnable r = () -> System.out.print("Hello world!");
new Thread(r).start();
//有返回值
Callable<String> c = () -> "Hello world!";
FutureTask<String> ft = new FutureTask<>(c);
new Thread(ft).start();
System.out.print(ft.get());

线程方法

方法描述
thread.yield()线程礼让:让其他线程获的CPU使用的优先权,正在运行的线程暂停
thread.join()等待直到thread运行完毕
thread.interrupt()中断线程等待,使正在等待的线程抛出InterruptedException
Thread.sleep()线程休眠,暂停执行指定时间让出CPU给其他线程

wait()和sleep()

对象锁

每个对象都有一个唯一与之对应的内部锁(monitor)。 JVM会为每个对象维护两个等待池,一个叫Entry Set(入口集),另外一个叫Wait Set(等待集)

对于任意的对象object

  • object的Entry Set用于存储 等待获取object的内部锁monitor的所有线程(当锁被释放之后有机会竞争获锁)
  • object的Wait Set 用于存储 执行了object.wait()的线程(调用notify()/notifyAll()唤醒后进入Entry Set池,才可竞争获锁)

区别

-sleep()wait()
所属Thread(属于线程操作)Object(属于锁本身操作)
调用调用sleep()方法使线程暂停执行指定时间让出CPU给其他线程
但他的监控状态依然保持,时间到了又会自动恢复运行状态
此过程不会释放对象锁
调用wait()方法,线程会放弃对象锁,进入此对象的Wait Set
调用notify()/notifyAll()重新唤醒后进入Entry Set,才可竞争获锁

锁是并发情况下保证资源顺序性访问的一种机制。

synchronized与Lock的区别

  1. synchronized是关键字,而Lock是一个接口。
  2. synchronized会自动释放锁,而Lock必须手动释放锁finally{unlock()}
  3. synchronized是不可中断的,Lock可以使用lockInterruptibly()中断(可以用来解决死锁问题)。
  4. 通过Lock可以知道线程有没有拿到锁,而synchronized不能。
  5. synchronized能锁住方法和代码块,而Lock只能锁住代码块。
  6. Lock可以使用读锁提高多线程读效率。
  7. synchronized是非公平锁,ReentrantLock可以控制是否是公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

synchornized

可重入性

public void run() {
synchronized (this) {
synchronized (this) {
}
}
}

synchornized原理

  • synchronized的锁对象会关联一个monitor,这个monitor不是我们主动创建的,是JVM的线程执行到这个同步代码块发现锁对象没有monitor就会创建monitor,monitor内部有两个重要的成员变量: ·owner:拥有这把锁的线程 ·recursions:会记录线程拥有锁的次数,当一个线程拥有monitor后其他线程只能等待

  • 在使用 synchronized 来同步代码块的时候,经编译后,会在代码块的起始位置插入monitorenter指令,在结束或异常处插入monitorexit指令。 当执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor的所有权,即尝试获得对象的锁。而 synchronized 用的锁是存放在 Java对象头 中的。

对象头

  • 查看对象信息,可以导入以下依赖
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
</dependency>
  • 使用
class User {
int age;
boolean isDead;
}
User user = new User();
System.err.println(user); //16进制hashCode
System.out.println(ClassLayout.parseInstance(user).toPrintable()); //对象信息
  • 查看结果
User@15db9742
User object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) (Mark Word) 01 42 97 db (00000001 01000010 10010111 11011011) (-610844159)
4 4 (object header) (Mark Word) 15 00 00 00 (00010101 00000000 00000000 00000000) (21)
8 4 (object header) (Klass pointer) 43 c0 00 f8 (01000011 11000000 00000000 11111000) (-134168509)
12 4 int User.age (对象属性) 0
16 1 boolean User.isDead (对象属性) false
17 7 (loss due to the next object alignment) 对齐填充
Instance size: 24 bytes
Space losses: 0 bytes internal + 7 bytes external = 7 bytes total
  • 对象头组成
|-------------------------------------------------------------------------------------------------------|
|                                             Object Header (128 bits)                                  |
|-------------------------------------------------------------------------------------------------------|
|                       Mark Word (64 bits)                                    |  Klass Word (64 bits)  |       
|-------------------------------------------------------------------------------------------------------|
| unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | 无锁
|---------------------------------------------------------------------|--------|------------------------|
| thread:54 |         epoch:2      | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | 偏向锁
|---------------------------------------------------------------------|--------|------------------------|
|                    ptr_to_lock_record:62                            | lock:2 | OOP to metadata object | 轻量锁
|---------------------------------------------------------------------|--------|------------------------|
|                    ptr_to_heavyweight_monitor:62                    | lock:2 | OOP to metadata object | 重量锁
|---------------------------------------------------------------------|--------|------------------------|
|                                                                     | lock:2 | OOP to metadata object | GC标记
|-------------------------------------------------------------------------------------------------------|
组成描述
biased_lock偏向锁标记,为1时表示对象启用偏向锁,为0时表示对象没有偏向锁。
lock锁状态标记位,该标记的值不同,整个mark word表示的含义不同。
ageJava GC标记位对象年龄,4位的表示范围为0-15,因此对象经过了15次垃圾回收后如果还存在,则肯定会移动到老年代中。
identity_hashcode对象标识Hash码,采用延迟加载技术。当对象使用hashCode()计算后,并会将结果写到该对象头中。当对象被锁定时,该值会移动到线程Monitor中。
thread持有偏向锁的线程ID和其他信息。这个线程ID并不是JVM分配的线程ID号,和Java Thread中的ID是两个概念。
epoch偏向时间戳。
ptr_to_lock_record指向栈中锁记录的指针。
ptr_to_heavyweight_monitor指向线程Monitor的指针。

锁升级过程

偏向锁

加锁方式:首次获取到时 设置对象头的thread、biased_lock ->      下次获取锁时 检查thread是否和自身线程一致,一致则认为当前线程已经获取了锁。

偏向锁假定将来只有第一个申请锁的线程会使用锁而不会有任何线程再来申请锁。 也就是不存在多个线程的竞争时,该线程在后续访问时会自动获得锁,略过了轻量级锁和重量级锁的加锁阶段,提高性能。 偏向锁是单线程下的锁优化。

Thread.sleep(5000); //JVM加载超过约4s以后,对代码块加上偏向锁
User user = new User();
System.out.println(ClassLayout.parseInstance(user).toPrintable());
// 偏向锁
// ↓-↓
//05 00 00 00 (00000101 00000000 00000000 00000000) (5)
//00 00 00 00 (00000000 00000000 00000000 00000000) (0)
//43 c0 00 f8 (01000011 11000000 00000000 11111000) (-134168509)

轻量级锁

加锁方式:CAS(设置对象头轻量级锁标志)-> 执行代码块 -> CAS(重置对象头轻量级锁标志)

当有多个 线程竞争同一个临界资源,这个时候偏向锁就会被撤(这个步骤也是十分消耗资源的),然后升级为轻量级锁,这是一个基于CAS的乐观锁。轻量级锁的目标是,减少无实际竞争情况下 使用重量级锁产生的性能消耗

锁自旋

锁自旋就是线程自己做一些空任务,避免线程被挂起阻塞

(阻塞和唤醒是十分消耗性能的行为,涉及到用户态和核心态的操作系统问题,一般我们操作的都是用户态,但是线程的挂起阻塞是需要从用户态切换到核心态,同样,线程唤醒也一样,这个步骤会造成巨大的性能消耗,能避免尽量避免)

重量级锁

加锁方式:monitorenter -> 执行同步代码块 -> monitorexit

重量级锁是再一次的锁升级。这个时候线程就是进行锁自旋也获取不到锁(锁自旋也是需要消耗一定资源的,所以它不可能一直自旋)自旋失败了,那么就进行锁膨胀,升级为重量级锁。 内置锁在Java中被抽象为监视器锁(monitor)。在JDK1.6之前,监视器锁可以认为直接对应底层操作系统中的互斥量(mutex)。这种同步方式成本非常高,包括系统调用引起的内核态与用户态切换、线程阻塞造成的线程切换等。因此称这种锁为重量级锁。

Lock

可重入锁ReentrantLock

Condition

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。

生产者消费者

import lombok.SneakyThrows;
public class ProduceConsume {
public static void main(String[] args) {
//生产者与消费者问题
Data data = new Data();
new Thread(loop(5, data::increment), "thread-A").start();
new Thread(loop(5, data::decrement), "thread-B").start();
new Thread(loop(5, data::increment), "thread-C").start();
new Thread(loop(5, data::decrement), "thread-D").start();
}
public static Runnable loop(int count, Runnable run) {
return () -> { for (int i = 0; i < count; i++) run.run(); };
}
}
class Data {
int num = 0;
@SneakyThrows
public synchronized void increment() {
while (num != 0) {
wait();
}
num++;
System.out.println(Thread.currentThread().getName() + " => " + num);
notifyAll();
}
@SneakyThrows
public synchronized void decrement() {
while (num == 0) {
wait();
}
num--;
System.out.println(Thread.currentThread().getName() + " <= " + num);
notifyAll();
}
}
class Data {
int num = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
@SneakyThrows
public void increment() {
lock.lock();
while (num != 0) {
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() + " => " + num);
condition.signalAll();
lock.unlock();
}
@SneakyThrows
public void decrement() {
lock.lock();
while (num == 0) {
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() + " <= " + num);
condition.signalAll();
lock.unlock();
}
}

读写锁ReadWriteLock

ReentrantLock保证了只有一个线程可以执行临界区代码: 但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,也就是调用increment()/decrement()方法时必须获取锁,但是,get()方法只读取数据,不修改数据,它实际上允许多个线程同时调用。 实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待

public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];
public void increment(int index) {
wlock.lock(); //加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); //释放写锁
}
}
public int[] get() {
rlock.lock(); //加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); //释放读锁
}
}
}

工具类

CountDownLatch

CountDownLatch(计数器)让某一条线程等待其他线程执行完毕后再执行

final CountDownLatch count = new CountDownLatch(5);
//守护线程,等待计数器到达5完成
new Thread(() -> {
count.await();
System.out.println("all work-thread finished.");
}, "daemon-thread").start();
//开启5个线程,结束后调用count.countDown()计数递增
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " finished.");
count.countDown();
}, "work-thread-" + i).start();
}
//结果
//work-thread-3 finished.
//work-thread-5 finished.
//work-thread-4 finished.
//work-thread-1 finished.
//work-thread-2 finished.
//all work-thread finished.

CyclicBarrier

CyclicBarrier(可循环屏障)让多个线程相互等待,当达到一个共同点时同时执行

final CyclicBarrier barrier = new CyclicBarrier(5);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 到达栅栏:" + System.currentTimeMillis());
barrier.await();
System.out.println(Thread.currentThread().getName() + " 冲破栅栏:" + System.currentTimeMillis());
}, "work-thread-" + i).start();
}
//结果
//work-thread-1 到达栅栏:1647175690103
//work-thread-3 到达栅栏:1647175690104
//work-thread-2 到达栅栏:1647175690103
//work-thread-5 到达栅栏:1647175690104
//work-thread-4 到达栅栏:1647175690105
//work-thread-4 冲破栅栏:1647175690105
//work-thread-1 冲破栅栏:1647175690105
//work-thread-2 冲破栅栏:1647175690105
//work-thread-3 冲破栅栏:1647175690105
//work-thread-5 冲破栅栏:1647175690105

Semaphore

Semaphore(信号量):是一种计数器,用来保护一个或者多个共享资源的访问。如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0.当信号量使用完时,必须释放。

final Semaphore semaphore = new Semaphore(2);
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
//消费一个许可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 开始,获得许可,当前可用的许可数:" + semaphore.availablePermits());
TimeUnit.SECONDS.sleep(1);
//创建一个许可
semaphore.release();
System.out.println(Thread.currentThread().getName() + " 结束,发布许可,当前可用的许可数:" + semaphore.availablePermits());
}, "work-thread-" + i).start();
}
//结果
//work-thread-1 开始,获得许可,当前可用的许可数:0
//work-thread-2 开始,获得许可,当前可用的许可数:0
//work-thread-2 结束,发布许可,当前可用的许可数:2
//work-thread-1 结束,发布许可,当前可用的许可数:2
//work-thread-4 开始,获得许可,当前可用的许可数:0
//work-thread-3 开始,获得许可,当前可用的许可数:1
//work-thread-3 结束,发布许可,当前可用的许可数:2
//work-thread-5 开始,获得许可,当前可用的许可数:1
//work-thread-4 结束,发布许可,当前可用的许可数:2
//work-thread-5 结束,发布许可,当前可用的许可数:2

线程池ThreadPoolExcutor

队列

阻塞队列BlockingQueue

  • 遵循FIFO原则(先入先出) 写入:如果队列满了,就必须阻塞等待 读取:如果队列为空,必须阻塞等待生产。

  • 四组API |方式|抛出异常|不抛出异常|阻塞等待|超时等待| |-|-|-|-|-| |添加|add|offer|put|offer(,,)| |移除|remove|poll|take|poll(,)| |检测队首元素|element|peek|-|-|

同步队列SynchronousQueue

没有容量,进去一个元素必须等待取出来后才能往里面放一个元素。(put、take)

双端队列Deque

Deque(Double-ended queue)双端队列是一个两端都是结尾的队列。 队列的每一端都能够插入数据项和移除数据项。

ThreadPoolExcutor

Executors

阿里巴巴Java开发手册》中不推荐使用 【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下: 1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。 2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

{% tabs ‘executors’ %}

单线程化线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

固定大小线程池,可控制线程最大并发数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

固定大小线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

{% endtabs %}

七大参数

参数作用
corePoolSize核心线程数量(此线程池常驻线程)
maximumPoolSize最大线程数量
keepAliveTime空闲线程存活时间(等待工作的超时时间)
unit空闲线程存活时间单位
workQueue工作队列
ArrayBlockingQueue    有界队列:数组实现的有界阻塞队列,按FIFO排序任务
LinkedBlockingQuene 链表队列,链表实现的阻塞队列,按FIFO排序任务
newFixedThreadPool使用了此队列
DelayQueue 延迟队列:任务定时周期的延迟执行的队列,
根据指定的执行时间从小到大/插入到队列的先后排序。
newScheduledThreadPool使用了此队列
SynchronousQuene 同步队列:一个不存储元素的阻塞队列,
每个插入操作必须等到另一个线程调用移除操作,
否则插入操作一直处于阻塞状态,
吞吐量通常要高于LinkedBlockingQuene
newCachedThreadPool使用了此队列
PriorityBlockingQueue 优先级队列:有优先级的无界阻塞队列
threadFactory线程工厂
handler拒绝策略
AbortPolicy         丢弃任务并抛出RejectedExecutionException异常
DiscardPolicy 丢弃任务,但是不抛出异常。
DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
CallerRunsPolicy 由调用线程处理该任务(哪来的回哪去)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

CPU密集型/IO密集型

int N_CPUS = Runtime.getRuntime().availableProcessor();

CPU密集型也叫计算密集型,指的是系统的硬盘/内存性能比CPU更好,此时,系统运作大部分的状况是CPU Loading 100%,但I/O次数不高。 CPU密集型任务,就需要尽量压榨CPU,线程池大小参考值可以设为 Ncpu + 1

  • 这个+1意义何在? 计算密集型的线程恰好在某时因为发生一个页错误或者因其他原因而暂停,刚好有一个“额外”的线程,可以确保在这种情况下CPU周期不会中断工作。

IO密集型指的是系统的CPU性能相对硬盘/内存更好多,此时,系统运作大部分的状况是CPU在等硬盘/内存的I/O操作,但CPU的使用率不高。 IO密集型任务,线程池大小参考值可以设置为 2 * Ncpu

ForkJoin

递归拆分任务,最后合并。(大事化小)

/**
* 这是一个简单的Join/Fork计算过程,将1—1001数字相加
*/
public class TestForkJoinPool {
private static final Integer MAX = 200;
static class MyForkJoinTask extends RecursiveTask<Integer> {
// 子任务开始计算的值
private Integer startValue;
// 子任务结束计算的值
private Integer endValue;
public MyForkJoinTask(Integer startValue , Integer endValue) {
this.startValue = startValue;
this.endValue = endValue;
}
@Override
protected Integer compute() {
// 如果条件成立,说明这个任务所需要计算的数值分为足够小了
// 可以正式进行累加计算了
if(endValue - startValue < MAX) {
System.out.println("开始计算的部分:startValue = " + startValue + ";endValue = " + endValue);
Integer totalValue = 0;
for(int index = this.startValue ; index <= this.endValue ; index++) {
totalValue += index;
}
return totalValue;
}
// 否则再进行任务拆分,拆分成两个任务
else {
MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2);
subTask1.fork();
MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) {
// 这是Fork/Join框架的线程池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> taskFuture = pool.submit(new MyForkJoinTask(1,1001));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
}
}

volatile

JMM概述

JMM(Java Memory Model)是Java虚拟机所定义的一种抽象规范,用来屏蔽不同硬件和操作系统的内存访问差异,让java程序在各种平台下都能达到一致的内存访问效果。

  1. 主内存(Main Memory) 主内存可以简单理解为计算机当中的内存,但又不完全等同。主内存被所有的线程所共享,对于一个共享变量(比如静态变量,或是堆内存中的实例)来说,主内存当中存储了它的“本尊”。
  2. 工作内存(Working Memory) 工作内存可以简单理解为计算机当中的CPU高速缓存,但又不完全等同。每一个线程拥有自己的工作内存,对于一个共享变量来说,工作内存当中存储了它的“副本”。 线程对共享变量的所有操作都必须在工作内存进行,不能直接读写主内存中的变量。不同线程之间也无法访问彼此的工作内存,变量值的传递只能通过主内存来进行。

直接操作主内存太慢了,所以jvm利用了性能较高的工作内存

JMM赋值工作流程

假设主内存中有一个变量static int s = 0

  • 对其进行赋值s=3操作,JMM的工作流程是

通过一系列内存读写的操作指令,线程A把静态变量 s=0 从主内存读到工作内存,工作内存进行更新,再把 s=3 的更新结果同步到主内存当中。

  • 但是,如果引入线程B,在线程A启动之后启动,输出s的值System.out.println("s=" + s);,JMM的工作流程是

在线程A尚未完成更新操作时,因为工作内存所更新的变量并不会立即同步到主内存,有较小的几率打印出主内存中还未同步的值 s=0

  • 同步锁synchronized虽然可以保证线程安全,但是对程序性能的影响太大了,有一种轻量级的解决方法,也就是volatile。

volatile可见性

当一个线程修改了变量的值,新的值会立即同步到主内存当中。而其他线程读取这个变量的时候,也会从主内存中拉取最新的变量值。

volatile关键字的这个特性得益于java语言的先行发生原则(happens-before)

在计算机科学中,先行发生原则是两个事件的结果之间的关系,如果一个事件发生在另一个事件之前,结果必须反映,即使这些事件实际上是乱序执行的(通常是优化程序流程)。

先行发生原则作用于很多场景下,包括同步锁、线程启动、线程终止、volatile。我们这里只列举出volatile相关的规则:

  • 对于一个volatile变量的写操作先行发生于后面对这个变量的读操作。

如果在上述操作的静态变量s之前加上volatile修饰符volatile static int s = 0

线程A先执行的时候,把s = 3写入主内存的事件必定会先于读取s的事件。所以线程B的输出一定是s = 3。

但是,volatile并不能保证线程安全,因为volatile只能保证变量的可见性,而不能保证原子性

例如,开启10个线程,每个线程当中让静态变量count自增100次。执行之后会发现,最终count的结果值未必是1000,有可能小于1000。

使用volatile修饰的变量,为什么并发自增的时候会出现这样的问题呢? 这是因为count++这一行代码本身并不是原子性操作,在字节码层面可以拆分成如下指令:

getstatic //读取静态变量(count)
iconst_1 //定义常量1
iadd //count增加1
putstatic //把count结果同步到主内存

虽然每一次执行 getstatic 的时候,获取到的都是主内存的最新变量值,但是进行iadd的时候,由于并不是原子性操作,其他线程在这过程中很可能让count自增了很多次。这样一来本线程所计算更新的是一个陈旧的count值,自然无法做到线程安全。

// 线程A 主内存
getstatic // 0 0
iconst_1 // 0 8(其他线程自增的值已同步到主内存)
iadd // 1 8
putstatic // 1 1(期望结果是9)

什么时候适用volatile

1. 运行结果并不依赖变量的当前值,或者能够确保只有单一的线程修改变量的值。(上面的例子) 2. 变量不需要与其他的状态变量共同参与不变约束。(下面的例子)

volatile static int start = 3;
volatile static int end = 6;
//线程A执行如下代码:
while (start < end){
}
//线程B执行如下代码:
start += 3;
end += 3;

一旦在线程A的循环中执行了线程B,理想情况下应该是个死循环 但是,由于volatile立即更新的特性,有可能先造成start先更新成6,造成了一瞬间 start == end,从而跳出while循环。

指令重排

指令重排是指JVM在编译Java代码的时候,或者CPU在执行JVM字节码的时候,对现有的指令顺序进行重新排序。 指令重排的目的是为了在不改变程序执行结果的前提下,优化程序的运行效率。需要注意的是,这里所说的不改变执行结果,指的是不改变单线程下的程序执行结果。 然而,指令重排是一把双刃剑,虽然优化了程序的执行效率,但是在某些情况下,会影响到多线程的执行结果。我们来看看下面的例子:(这里java代码的重排只是为了简单示意,真正的指令重排是在字节码指令的层面。)

boolean contextReady = false;
//线程A执行如下代码:
context = loadContext();
contextReady = true;
//线程B执行如下代码:
while(!contextReady){
sleep(200);
}
doAfterContextReady(context);

以上程序看似没有问题。线程B循环等待上下文context的加载,一旦context加载完成,contextReady == true的时候,才执行doAfterContextReady方法。 但是,如果线程A执行的代码发生了指令重排,初始化和contextReady的赋值交换了顺序:

boolean contextReady = false;
//线程A的代码发生了指令重排
contextReady = true;
context = loadContext();
//线程B执行如下代码:
while(!contextReady){
sleep(200);
}
doAfterContextReady(context);

这个时候,很可能context对象还没有加载完成,变量contextReady已经为true,线程B直接跳出了循环等待,开始执行doAfterContextReady方法,结果自然会出现错误。 而内存屏障可以用来解决指令重排的问题。

内存屏障

内存屏障也称为内存栅栏或栅栏指令,是一种屏障指令,它使CPU或编译器对屏障指令之前和之后发出的内存操作执行一个排序约束。 这通常意味着在屏障之前发布的操作被保证在屏障之后发布的操作之前执行。

内存屏障共分为四种类型:

  • LoadLoad屏障: 抽象场景:Load1; LoadLoad; Load2 Load1 和 Load2 代表两条读取指令。在Load2要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
  • StoreStore屏障: 抽象场景:Store1; StoreStore; Store2 Store1 和 Store2代表两条写入指令。在Store2写入执行前,保证Store1的写入操作对其它处理器可见
  • LoadStore屏障: 抽象场景:Load1; LoadStore; Store2 在Store2被写入前,保证Load1要读取的数据被读取完毕。
  • StoreLoad屏障: 抽象场景:Store1; StoreLoad; Load2 在Load2读取操作执行前,保证Store1的写入对所有处理器可见。 StoreLoad屏障的开销是四种屏障中最大的。

volatile做了什么?

在一个变量被volatile修饰后,JVM会为我们做两件事:

  1. 在每个volatile写操作前插入StoreStore屏障,在写操作后插入StoreLoad屏障。
  2. 在每个volatile读操作前插入LoadLoad屏障,在读操作后插入LoadStore屏障。
volatile boolean contextReady = false;
//线程A执行如下代码:
context = loadContext();
//StoreStore屏障 contextReady被赋值之前保证context=loadContext()可见,成功阻止了指令重排。
contextReady = true;
//StoreLoad屏障
//线程B执行如下代码:
while(!contextReady){
sleep(200);
}
doAfterContextReady(context);
  • 内存屏障和先行发生原则(happens-before)的关系: happens-before是JSR- 133规范之一,内存屏障是CPU指令。可以简单认为前者是最终目的,后者是实现手段。

总结

  • volatile特性之一:保证变量在线程之间的可见性。可见性的保证是基于CPU的内存屏障指令,被JSR-133抽象为 先行发生原则(happens-before)原则。
  • volatile特性之二:阻止编译时和运行时的指令重排。编译时JVM编译器遵循内存屏障的约束,运行时依靠CPU屏障指令来阻止重排。

补充

  1. 在使用volatile引入内存屏障的时候,普通读、普通写、volatile读、volatile写会排列组合出许多不同的场景。我们这里只简单列出了其中一种。
  2. volatile除了保证可见性和阻止指令重排,还解决了long类型和double类型数据的8字节赋值问题。

单例模式

懒汉模式

public class Singleton {
private Singleton() {} //私有构造函数
private static Singleton instance = null; //单例对象
public static Singleton getInstance() { //静态工厂方法
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}

如果单例初始值是null,还未构建,则构建单例对象并返回。这个写法属于单例模式当中的懒汉模式。 如果单例对象一开始就被new Singleton()主动构建,则不再需要判空操作,则属于饿汉模式。

第一版的代码不是线程安全的。 假设Singleton类刚刚被初始化,instance对象还是空,这时候两个线程同时访问getInstance方法: 因为Instance是空,所以两个线程同时通过了条件判断,开始执行new操作: 这样一来,显然instance被构建了两次。 让我们对代码做一下修改:

public class Singleton {
private Singleton() {}
private static Singleton instance = null;
public static Singleton getInstance() {
if (instance == null) { //双重检测机制
synchronized (Singleton.class){ //同步锁
if (instance == null) { //双重检测机制
instance = new Singleton();
}
}
}
return instance;
}
}

为什么这样写呢?我们来解释几个关键点:

  1. 为了防止new Singleton被执行多次,因此在new操作之前加上Synchronized 同步锁,锁住整个类(注意,这里不能使用对象锁)。
  2. 进入Synchronized 临界区以后,还要再做一次判空。因为当两个线程同时访问的时候,线程A构建完对象,线程B也已经通过了最初的判空验证,不做第二次判空的话,线程B还是会再次构建instance对象。 像这样两次判空的机制叫做双重检测机制

假设这样的场景,当两个线程一先一后访问getInstance方法的时候,当A线程正在构建对象,B线程刚刚进入方法: 这种情况表面看似没什么问题,要么Instance还没被线程A构建,线程B执行 if(instance == null)的时候得到true;要么Instance已经被线程A构建完成,线程B执行 if(instance == null)的时候得到false。 真的如此吗?答案是否定的。这里涉及到了JVM编译器的指令重排。 指令重排是什么意思呢?比如java中简单的一句 instance = new Singleton,会被编译器编译成如下JVM指令:

memory =allocate(); //1:分配对象的内存空间
ctorInstance(memory); //2:初始化对象
instance =memory; //3:设置instance指向刚分配的内存地址

但是这些指令顺序并非一成不变,有可能会经过JVM和CPU的优化,指令重排成下面的顺序:

memory =allocate(); //1:分配对象的内存空间
instance =memory; //3:设置instance指向刚分配的内存地址
ctorInstance(memory); //2:初始化对象

当线程A执行完1,3,时,instance对象还未完成初始化,但已经不再指向null。此时如果线程B抢占到CPU资源,执行 if(instance == null)的结果会是false,从而返回一个没有初始化完成的instance对象。 如何避免这一情况呢?我们需要在instance对象前面增加一个修饰符volatile。

public class Singleton {
private Singleton() {}
private volatile static Singleton instance = null; //volatile单例对象
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class){
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}

volatile修饰符组织了变量访问前后的指令重排,保证了指令执行顺序。 经过volatile的修饰,当线程A执行instance = new Singleton的时候,JVM执行始终保证是下面的顺序:

memory =allocate(); //1:分配对象的内存空间
ctorInstance(memory); //2:初始化对象
instance =memory; //3:设置instance指向刚分配的内存地址

如此在线程B看来,instance对象的引用要么指向null,要么指向一个初始化完毕的Instance,而不会出现某个中间态,保证了安全。

public class Singleton {
private static class LazyHolder {
private static final Singleton INSTANCE = new Singleton();
}
private Singleton (){}
public static Singleton getInstance() {
return LazyHolder.INSTANCE;
}
}

这里有几个需要注意的点:

  1. 从外部无法访问静态内部类LazyHolder,只有当调用Singleton.getInstance方法的时候,才能得到单例对象INSTANCE。
  2. INSTANCE对象初始化的时机并不是在单例类Singleton被加载的时候,而是在调用getInstance方法,使得静态内部类LazyHolder被加载的时候。因此这种实现方式是利用classloader的加载机制来实现懒加载,并保证构建单例的线程安全。
//获得构造器
Constructor con = Singleton.class.getDeclaredConstructor();
//设置为可访问
con.setAccessible(true);
//构造两个不同的对象
Singleton singleton1 = (Singleton)con.newInstance();
Singleton singleton2 = (Singleton)con.newInstance();
//验证是否是不同对象
System.out.println(singleton1.equals(singleton2));

代码可以简单归纳为三个步骤:

第一步,获得单例类的构造器。 第二步,把构造器设置为可访问。 第三步,使用newInstance方法构造对象。

最后为了确认这两个对象是否真的是不同的对象,我们使用equals方法进行比较。毫无疑问,比较结果是false。

public enum SingletonEnum {
INSTANCE;
}

让我们来做一个实验,仍然执行刚才的反射代码:

//获得构造器
Constructor con = SingletonEnum.class.getDeclaredConstructor();
//设置为可访问
con.setAccessible(true);
//构造两个不同的对象
SingletonEnum singleton1 = (SingletonEnum)con.newInstance();
SingletonEnum singleton2 = (SingletonEnum)con.newInstance();
//验证是否是不同对象
System.out.println(singleton1.equals(singleton2));

执行获得构造器这一步的时候,抛出了如下异常:

Exception in thread "main" java.lang.NoSuchMethodException: com.xiaohui.singleton.test.SingletonEnum.<init>()
at java.lang.Class.getConstructor0(Class.java:2892)
at java.lang.Class.getDeclaredConstructor(Class.java:2058)
at com.xiaohui.singleton.test.SingletonTest.main(SingletonTest.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

几点补充:

  1. volatile关键字不但可以防止指令重排,也可以保证线程访问的变量值是主内存中的最新值。
  2. 使用枚举实现的单例模式,不但可以防止利用反射强行构建单例对象,而且可以在枚举类对象被反序列化的时候,保证反序列的返回结果是同一对象。
  3. 对于其他方式实现的单例模式,如果既想要做到可序列化,又想要反序列化为同一对象,则必须实现readResolve方法。

CAS

CAS(CompareAndSwap)比较并交换。如果一个值是预期的值,则设置新的值。

final AtomicInteger atomicInteger = new AtomicInteger();
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInteger.incrementAndGet();
}
});
thread.start();
thread.join();
}
System.out.println(atomicInteger.get());
//结果:5000
/*
* AtomicInteger.incrementAndGet
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
/*
* Unsafe.getAndAddInt
* CAS操作
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//获取当前值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
//如果当前值是预期的var5,则设置成 var5(当前值)+ var4(1)
return var5;
}

ABA问题

final AtomicReference<String> str = new AtomicReference<>("A");
Thread t1 = new Thread(() -> {
Thread.sleep(1000);
str.compareAndSet("A", "B"); //预期是A,修改成B
}, "线程1");
Thread t2 = new Thread(() -> {
str.compareAndSet("A", "C");
str.compareAndSet("C", "A"); //把A改为C又改回A
}, "线程2");
t2.start();
t1.start();
t1.join();
t2.join();
System.err.println(str.get()); //结果:B

解决方式:使用AtomicStamped,乐观锁的方式,添加上版本号进行控制

final AtomicStampedReference<String> str = new AtomicStampedReference<>("A", 0);
Thread t1 = new Thread(() -> {
Thread.sleep(1000);
str.compareAndSet("A", "B", 0, 1); //预期值是A,并且stamp是0才修改成B
}, "线程1");
Thread t2 = new Thread(() -> {
str.compareAndSet("A", "C", 0, 1); //stamp=1
str.compareAndSet("C", "A", 1, 2); //把A改为C又改回A stamp=2
}, "线程2");
t2.start();
t1.start();
t1.join();
t2.join();
System.err.println(str.getReference()); //结果:A

自旋锁

//简单的实现自旋锁
public class SpinLock {
final AtomicReference<Object> cas = new AtomicReference<>();
public void lock() {
Thread current = Thread.currentThread();
// 利用CAS
while (!cas.compareAndSet(null, current)) ;
}
public void unlock() {
Thread current = Thread.currentThread();
cas.compareAndSet(current, null);
}
}

评论