0%

提到volatile首先想到就是:

  • 保证此变量对所有线程的可见性,这里的 “可见性”是指当一个线程修改了这个变量的值,新值对于其他线程来说是可以立即得知的。
  • 禁止指令重排序优化。

到这里大家感觉自己对volatile理解了吗?

如果理解了,大家考虑这么一个问题:ReentrantLock(或者其它基于AQS实现的锁)是如何保证代码段中变量(变量主要是指共享变量,存在竞争问题的变量)的可见性?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static ReentrantLock reentrantLock = new ReentrantLock();
private static int count = 0;
//...
// 多线程 run 如下代码
reentrantLock.lock();
try
{
count++;
}
finally
{
reentrantLock.unlock();
}
//...

既然提到了可见性,那就先熟悉几个概念:

JMM

JMM:Java Memory Model 即 Java 内存模型

The Java Memory Model describes what behaviors are legal in multithreaded code, and how threads may interact through memory.

It describes the relationship between variables in a program and the low-level details of storing and retrieving them to and from memory or registers in a real computer system.

It does this in a way that can be implemented correctly using a wide variety of hardware and a wide variety of compiler optimizations.

Java内存模型的主要目标是定义程序中各个变量的访问规则,即在虚拟机中将变量存储到内存和从内存中取出变量这样的底层细节。此处的变量主要是指共享变量,存在竞争问题的变量。Java内存模型规定所有的变量都存储在主内存中,而每个线程还有自己的工作内存,线程的工作内存中保存了该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在工作内存中进行,而不能直接读写主内存中的变量(根据Java虚拟机规范的规定,volatile变量依然有共享内存的拷贝,但是由于它特殊的操作顺序性规定——从工作内存中读写数据前,必须先将主内存中的数据同步到工作内存中,所有看起来如同直接在主内存中读写访问一般,因此这里的描述对于volatile也不例外)。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值得传递均需要通过主内存来完成。

重排序

在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。重排序分3种类型:

  • 编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
  • 指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
  • 内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

从Java源代码到最终实际执行的指令序列,会分别经历下面3种重排序:

对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排序(不是所有的编译器重排序都要禁止)。对于处理器重排序,JMM的处理器重排序规则会要求Java编译器在生成指令序列时,插入特定类型的内存屏障(Memory Barriers,Intel称之为Memory Fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序。

JMM属于语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,通过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证。

happens-before

  • 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。
  • 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
  • volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。(对一个volatile变量的读,总是能看到【任意线程】对这个volatile变量最后的写入)
  • 传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。

两个操作之间具有happens-before关系,并不意味着前一个操作必须要在后一个操作之前执行!happens-before仅仅要求前一个操作(执行的结果)对后一个操作可见,且前一个操作按顺序排在第二个操作之前(the first is visible to and ordered before the second)。

内存屏障

  • 硬件层的内存屏障分为两种:Load Barrier 和 Store Barrier即读屏障和写屏障。
  • 对于Load Barrier来说,在指令前插入Load Barrier,可以让高速缓存中的数据失效,强制从新从主内存加载数据;
  • 对于Store Barrier来说,在指令后插入Store Barrier,能让写入缓存中的最新数据更新写入主内存,让其他线程可见。
  • 内存屏障有两个作用:
    • 阻止屏障两侧的指令重排序;
    • 强制把写缓冲区/高速缓存中的数据等写回主内存,让缓存中相应的数据失效。

volatile的内存语义

从JSR-133开始(即从JDK5开始),volatile变量的写-读可以实现线程之间的通信。

从内存语义的角度来说,volatile的写-读与锁的释放-获取有相同的内存效果

  • volatile写和锁的释放有相同的内存语义;
  • volatile读与锁的获取有相同的内存语义。

volatile仅仅保证对单个volatile变量的读/写具有原子性,而锁的互斥执行的特性可以确保对整个临界区代码的执行具有原子性。在功能上,锁比volatile更强大;在可伸缩性和执行性能上,volatile更有优势。

volatile变量自身具有下列特性:

  • 可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
  • 原子性:对任意单个volatile变量的读/写具有原子性,即使是64位的long型和double型变量,只要它是volatile变量,对该变量的读/写就具有原子性。如果是多个volatile操作或类似于volatile++这种复合操作,这些操作整体上不具有原子性。

volatile写和volatile读的内存语义:

  • 线程A写一个volatile变量,实质上是线程A向接下来将要读这个volatile变量的某个线程发出了(其对共享变量所做修改的)消息。
  • 线程B读一个volatile变量,实质上是线程B接收了之前某个线程发出的(在写这个volatile变量之前对共享变量所做修改的)消息。
  • 线程A写一个volatile变量,随后线程B读这个volatile变量,这个过程实质上是线程A通过主内存向线程B发送消息。

JMM针对编译器制定的volatile重排序规则表

  • 当第二个操作是volatile写时,不管第一个操作是什么,都不能重排序。这个规则确保volatile写之前的操作不会被编译器重排序到volatile写之后。
  • 当第一个操作是volatile读时,不管第二个操作是什么,都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前。
  • 当第一个操作是volatile写,第二个操作是volatile读时,不能重排序。

为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障几乎是不可能的。为此,JMM采取保守策略。下面是基于保守策略的JMM内存屏障插入策略。

  • 在每个volatile写操作的前面插入一个StoreStore屏障。
  • 在每个volatile写操作的后面插入一个StoreLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadLoad屏障。
  • 在每个volatile读操作的后面插入一个LoadStore屏障。

LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。

StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。 LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。

StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。
上述内存屏障插入策略非常保守,但它可以保证在任意处理器平台,任意的程序中都能得到正确的volatile内存语义。

下面是保守策略下,volatile写插入内存屏障后生成的指令序列示意图.

图中的StoreStore屏障可以保证在volatile写之前,其前面的所有普通写操作已经对任意处理器可见了。这是因为StoreStore屏障将保障上面所有的普通写在volatile写之前刷新到主内存。

这里比较有意思的是,volatile写后面的StoreLoad屏障。此屏障的作用是避免volatile写与后面可能有的volatile读/写操作重排序。因为编译器常常无法准确判断在一个volatile写的后面是否需要插入一个StoreLoad屏障(比如,一个volatile写之后方法立即return)。为了保证能正确实现volatile的内存语义,JMM在采取了保守策略:在每个volatile写的后面,或者在每个volatile读的前面插入一个StoreLoad屏障。从整体执行效率的角度考虑,JMM最终选择了在每个volatile写的后面插入一个StoreLoad屏障。因为volatile写-读内存语义的常见使用模式是:一个写线程写volatile变量,多个读线程读同一个volatile变量。当读线程的数量大大超过写线程时,选择在volatile写之后插入StoreLoad屏障将带来可观的执行效率的提升。从这里可以看到JMM在实现上的一个特点:首先确保正确性,然后再去追求执行效率。

下面是在保守策略下,volatile读插入内存屏障后生成的指令序列示意图:

图中的LoadLoad屏障用来禁止处理器把上面的volatile读与下面的普通读重排序。LoadStore屏障用来禁止处理器把上面的volatile读与下面的普通写重排序。

上述volatile写和volatile读的内存屏障插入策略非常保守。在实际执行时,只要不改变volatile写-读的内存语义,编译器可以根据具体情况省略不必要的屏障。

AQS

对于AQS需要了解这么几点:

  • 锁的状态通过volatile int state来表示。
  • 获取不到锁的线程会进入AQS的队列等待。
  • 子类需要重写tryAcquire、tryRelease等方法。

AQS 详解参见:面试必备:Java AQS 实现原理(图文)分析

ReentrantLock

以公平锁为例,看看 ReentrantLock 获取锁 & 释放锁的关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
// 释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);// 释放锁的最后,写volatile变量state
return free;
}
// 获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// 获取锁的开始,首先读volatile变量state
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

通过ReentrantLock实例调用lock()、unlock()时,acquires、releases的值都是1。

公平锁在释放锁的最后写volatile变量state,在获取锁时首先读这个volatile变量。根据volatile的happens-before规则,释放锁的线程在写volatile变量之前可见的共享变量,在获取锁的线程读取同一个volatile变量后将立即变得对获取锁的线程可见。从而保证了代码段中变量(变量主要是指共享变量,存在竞争问题的变量)的可见性。

小结

如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式。

  • 首先,声明共享变量为volatile。
  • 然后,使用CAS的原子条件更新来实现线程之间的同步。
  • 同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

前文我们提到过,编译器不会对volatile读与volatile读后面的任意内存操作重排序;编译器不会对volatile写与volatile写前面的任意内存操作重排序。组合这两个条件,意味着为了同时实现volatile读和volatile写的内存语义,编译器不能对CAS与CAS前面和后面的任意内存操作重排序。

推荐阅读:

https://jiankunking.com/java-volatile-keyword.html

本文参考:

1、《Java并发编程的艺术》 方腾飞 魏鹏 程晓明 著

2、Java 可重入锁内存可见性分析

AQS:AbstractQueuedSynchronizer

1、AQS设计简介

  • AQS的实现是基于一个FIFO的等待队列。
  • 使用单个原子变量来表示获取、释放锁状态(final int)改变该int值使用的是CAS。(思考:为什么一个int值可以保证内存可见性?)
  • 子类应该定义一个非公开的内部类继承AQS,并实现其中方法。
  • AQS支持exclusive与shared两种模式。
  • 内部类ConditionObject用于支持子类实现exclusive模式
  • 子类需要重写:
    • tryAcquire
    • tryRelease
    • tryReleaseShared
    • isHeldExclusively等方法,并确保是线程安全的。

贯穿全文的图(核心):

模板方法设计模式:定义一个操作中算法的骨架,而将一些步骤的实现延迟到子类中。

2、类结构

  • ConditionObject类
  • Node类
  • N多方法

3、FIFO队列

等待队列是CLH(Craig, Landin, and Hagersten)锁队列。

通过节点中的“状态”字段来判断一个线程是否应该阻塞。当该节点的前一个节点释放锁的时候,该节点会被唤醒。

1
2
3
4
5
private transient volatile Node head;
private transient volatile Node tail;
//The synchronization state.
//在互斥锁中它表示着线程是否已经获取了锁,0未获取,1已经获取了,大于1表示重入数。
private volatile int state;

AQS维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

state的访问方式有三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。

自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后续动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现:

  • tryAcquire-tryRelease
  • tryAcquireShared-tryReleaseShared

中的一种即可。

当然AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

以下部分来自源码注释:

每次进入CLH队列时,需要对尾节点进入队列过程,是一个原子性操作。在出队列时,我们只需要更新head节点即可。在节点确定它的后继节点时, 需要花一些功夫,用于处理那些,由于等待超时时间结束或中断等原因, 而取消等待锁的线程。

节点的前驱指针,主要用于处理,取消等待锁的线程。如果一个节点取消等待锁,则此节点的前驱节点的后继指针,要指向,此节点后继节点中,非取消等待锁的线程(有效等待锁的线程节点)。

我们用next指针连接实现阻塞机制。每个节点均持有自己线程,节点通过节点的后继连接唤醒其后继节点。

CLH队列需要一个傀儡结点作为开始节点。我们不会再构造函数中创建它,因为如果没有线程竞争锁,那么,努力就白费了。取而代之的方案是,当有第一个竞争者时,我们才构造头指针和尾指针。

线程通过同一节点等待条件,但是用另外一个连接。条件只需要放在一个非并发的连接队列与节点关联,因为只有当线程独占持有锁的时候,才会去访问条件。当一个线程等待条件的时候,节点将会插入到条件队列中。当条件触发时,节点将会转移到主队列中。用一个状态值,描述节点在哪一个队列上。

4、Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static final class Node {
//该等待节点处于共享模式
static final Node SHARED = new Node();
//该等待节点处于独占模式
static final Node EXCLUSIVE = null;

//表示节点的线程是已被取消的
static final int CANCELLED = 1;
//表示当前节点的后继节点的线程需要被唤醒
static final int SIGNAL = -1;
//表示线程正在等待某个条件
static final int CONDITION = -2;
//表示下一个共享模式的节点应该无条件的传播下去
static final int PROPAGATE = -3;

//状态位 ,分别可以使CANCELLED、SINGNAL、CONDITION、PROPAGATE、0
volatile int waitStatus;

volatile Node prev;//前驱节点
volatile Node next;//后继节点
volatile Thread thread;//等待锁的线程

//ConditionObject链表的后继节点或者代表共享模式的节点。
//因为Condition队列只能在独占模式下被能被访问,我们只需要简单的使用链表队列来链接正在等待条件的节点。
//然后它们会被转移到同步队列(AQS队列)再次重新获取。
//由于条件队列只能在独占模式下使用,所以我们要表示共享模式的节点的话只要使用特殊值SHARED来标明即可。
Node nextWaiter;
//Returns true if node is waiting in shared mode
final boolean isShared() {
return nextWaiter == SHARED;
}
.......
}

waitStatus不同值含义:

  • SIGNAL(-1):当前节点的后继节点已经 (或即将)被阻塞(通过park) , 所以当当前节点释放或则被取消时候,一定要unpark它的后继节点。为了避免竞争,获取方法一定要首先设置node为signal,然后再次重新调用获取方法,如果失败,则阻塞。
  • CANCELLED(1):当前节点由于超时或者被中断而被取消。一旦节点被取消后,那么它的状态值不在会被改变,且当前节点的线程不会再次被阻塞。
  • CONDITION(-2) :该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为0,重新进入阻塞状态.
  • PROPAGATE(-3:)共享模式下的释放操作应该被传播到其他节点。该状态值在doReleaseShared方法中被设置的。
  • 0:以上都不是

该状态值为了简便使用,所以使用了数值类型。非负数值意味着该节点不需要被唤醒。所以,大多数代码中不需要检查该状态值的确定值。

一个正常的Node,它的waitStatus初始化值是0。如果想要修改这个值,可以使用AQS提供CAS进行修改。

5、独占模式与共享模式

在锁的获取时,并不一定只有一个线程才能持有这个锁(或者称为同步状态),所以此时有了独占模式和共享模式的区别,也就是在Node节点中由nextWaiter来标识。比如ReentrantLock就是一个独占锁,只能有一个线程获得锁,而WriteAndReadLock的读锁则能由多个线程同时获取,但它的写锁则只能由一个线程持有。

5.1、独占模式

5.1.1 独占模式同步状态的获取

1
2
3
4
5
6
7
8
9
//忽略中断的(即不手动抛出InterruptedException异常)独占模式下的获取方法。
//该方法在成功返回前至少会调用一次tryAcquire()方法(该方法是子类重写的方法,如果返回true则代表能成功获取).
//否则当前线程会进入队列排队,重复的阻塞和唤醒等待再次成功获取后返回,
//该方法可以用来实现Lock.lock
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该方法首先尝试获取锁(tryAcquire(arg)的具体实现定义在了子类中),如果获取到,则执行完毕,否则通过addWaiter(Node.EXCLUSIVE), arg)方法把当前节点添加到等待队列末尾,并设置为独占模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Node addWaiter(Node mode) {
//把当前线程包装为node,设为独占模式
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队,即无竞争条件下肯定成功。如果失败,则进入enq自旋重试入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS替换当前尾部。成功则返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//插入节点到队列中,如果队列未初始化则初始化,然后再插入。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果tail节点为空,执行enq(node);重新尝试,最终把node插入.在把node插入队列末尾后,它并不立即挂起该节点中线程,因为在插入它的过程中,前面的线程可能已经执行完成,所以它会先进行自旋操作acquireQueued(node, arg),尝试让该线程重新获取锁!当条件满足获取到了锁则可以从自旋过程中退出,否则继续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果它的前继节点为头结点,尝试获取锁,获取成功则返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断当前节点的线程是否应该被挂起,如果应该被挂起则挂起。
//等待release唤醒释放
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//在队列中取消当前节点
cancelAcquire(node);
}
}

如果没获取到锁,则判断是否应该挂起,而这个判断则得通过它的前驱节点的waitStatus来确定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//该节点如果状态如果为SIGNAL。则返回true,然后park挂起线程
if (ws == Node.SIGNAL)
return true;
//表明该节点已经被取消,向前循环重新调整链表节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//执行到这里代表节点是0或者PROPAGATE,然后标记他们为SIGNAL,但是
//还不能park挂起线程。需要重试是否能获取,如果不能,则挂起。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

//挂起当前线程,且返回线程的中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

最后,我们对获取独占式锁过程对做个总结:

AQS的模板方法acquire通过调用子类自定义实现的tryAcquire获取同步状态失败后->将线程构造成Node节点(addWaiter)->将Node节点添加到同步队列对尾(addWaiter)->节点以自旋的方法获取同步状态(acquirQueued)。在节点自旋获取同步状态时,只有其前驱节点是头节点的时候才会尝试获取同步状态,如果该节点的前驱不是头节点或者该节点的前驱节点是头节点单获取同步状态失败,则判断当前线程需要阻塞,如果需要阻塞则需要被唤醒过后才返回。

获取锁的过程:

  • 当线程调用acquire()申请获取锁资源,如果成功,则进入临界区。
  • 当获取锁失败时,则进入一个FIFO等待队列,然后被挂起等待唤醒。
  • 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则进入临界区,否则继续挂起等待。

5.1.2 独占模式同步状态的释放

既然是释放,那肯定是持有锁的该线程执行释放操作,即head节点中的线程释放锁.

AQS中的release释放同步状态和acquire获取同步状态一样,都是模板方法,tryRelease释放的具体操作都有子类去实现,父类AQS只提供一个算法骨架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//如果node的后继节点不为空且不是作废状态,则唤醒这个后继节点,
//否则从末尾开始寻找合适的节点,如果找到,则唤醒
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

过程:首先调用子类的tryRelease()方法释放锁,然后唤醒后继节点,在唤醒的过程中,需要判断后继节点是否满足情况,如果后继节点不为空且不是作废状态,则唤醒这个后继节点,否则从tail节点向前寻找合适的节点,如果找到,则唤醒。

释放锁过程:

  • 当线程调用release()进行锁资源释放时,如果没有其他线程在等待锁资源,则释放完成。
  • 如果队列中有其他等待锁资源的线程需要唤醒,则唤醒队列中的第一个等待节点(先入先出)。

5.2、共享模式

5.2.1 共享模式同步状态的获取

  • 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
  • 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
  • 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

5.2.2 共享模式同步状态的释放

  • 当线程调用releaseShared()进行锁资源释放时,如果释放成功,则唤醒队列中等待的节点,如果有的话。

6. AQS小结

java.util.concurrent中的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。

JDK中AQS被广泛使用,基于AQS实现的同步器包括:

  • ReentrantLock
  • Semaphore
  • ReentrantReadWriteLock(后续会出文章讲解)
  • CountDownLatch
  • FutureTask

每一个基于AQS实现的同步器都会包含两种类型的操作,如下:

  • 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。
  • 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。

基于“复合优先于继承”的原则,基于AQS实现的同步器一般都是:声明一个内部私有的继承于AQS的子类Sync,对同步器所有公有方法的调用都会委托给这个内部子类。

7.后续

后面会推出以下有关AQS的文章,已加深对于AQS的理解

8.思考

多人抢锁

多个线程同时取争取一个锁(在争取之前资源未被锁定),这时候如何保证,只有一个人能获取到?
下面以非公平锁来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
    /**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
return STATE.compareAndSet(this, expect, update);
}

/**
* Atomically sets the value of a variable to the {@code newValue} with the
* memory semantics of {@link #setVolatile} if the variable's current value,
* referred to as the <em>witness value</em>, {@code ==} the
* {@code expectedValue}, as accessed with the memory semantics of
* {@link #getVolatile}.
*
* <p>The method signature is of the form {@code (CT1 ct1, ..., CTn ctn, T expectedValue, T newValue)boolean}.
*
* <p>The symbolic type descriptor at the call site of {@code
* compareAndSet} must match the access mode type that is the result of
* calling {@code accessModeType(VarHandle.AccessMode.COMPARE_AND_SET)} on
* this VarHandle.
*
* @param args the signature-polymorphic parameter list of the form
* {@code (CT1 ct1, ..., CTn ctn, T expectedValue, T newValue)}
* , statically represented using varargs.
* @return {@code true} if successful, otherwise {@code false} if the
* witness value was not the same as the {@code expectedValue}.
* @throws UnsupportedOperationException if the access mode is unsupported
* for this VarHandle.
* @throws WrongMethodTypeException if the access mode type does not
* match the caller's symbolic type descriptor.
* @throws ClassCastException if the access mode type matches the caller's
* symbolic type descriptor, but a reference cast fails.
* @see #setVolatile(Object...)
* @see #getVolatile(Object...)
*/
public final native
@MethodHandle.PolymorphicSignature
@HotSpotIntrinsicCandidate
boolean compareAndSet(Object... args);

从代码中可以看出通过compareAndSetState来保证只会有一个线程获取到锁。

LockSupport(park/unpark)

实现

Unsafe.park和Unsafe.unpark的底层实现原理
在Linux系统下,是用的Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。
mutex和condition保护了一个_counter的变量,当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。

Object类的wait/notify和LockSupport(park/unpark)s的区别

park函数是将当前调用Thread阻塞,而unpark函数则是将指定线程Thread唤醒。

与Object类的wait/notify机制相比,park/unpark有两个优点:

  • 以thread为操作对象更符合阻塞线程的直观定义
  • 操作更精准,可以准确地唤醒某一个线程。

区别是:notify随机唤醒一个线程,notifyAll唤醒所有等待的线程,增加了灵活性

synchronized 实现

可以参考下这个:

https://xiaomi-info.github.io/2020/03/24/synchronized/

9.感谢

本文很多内容整理自网络,参考文献:
https://segmentfault.com/a/1190000011376192
https://segmentfault.com/a/1190000011391092
https://zhuanlan.zhihu.com/p/27134110
https://blog.csdn.net/wojiaolinaaa/article/details/50070031
https://www.cnblogs.com/waterystone/p/4920797.html

FIFO队列:https://www.cnblogs.com/waterystone/p/4920797.html

本文整理自:《图解TCP/IP 第5版》
作者:[日] 竹下隆史,[日] 村山公保,[日] 荒井透,[日] 苅田幸雄 著
译者:乌尼日其其格
出版时间:2013-07

阅读全文 »

Java部分

  1. java比较 icompare
  2. tomcat 热部署 加载方式与双亲委派模型?
  3. java io api 过滤器模式?
  4. threadLocal 实现原理?
  5. tcp ip协议
  6. 服务端如何确定seesion是同一个?
  7. 内存屏障(Memory Barriers)
  8. lock synchronized ReentrantLock
  9. jvm JVM的年轻代分为哪几代?年轻代什么时候会进入老年代?
  10. jvm JVM 垃圾回收算法?(注意年轻代与老年代是不一样的)?
  11. jvm内存模型 一个变量初始化 怎么分配内存 分配到什么地方?
  12. 不使用双亲委派模型的缺点?
  13. java 开源序列化框架有哪些?彼此之间有什么区别(优缺点)?
  14. java.util.concurrent hashmap 相关问题
  15. JAVA线程sleep和wait方法区别
    https://jiankunking.blog.csdn.net/article/details/79824353
  16. PriorityQueue(优先级队列) 堆相关问题
  17. 常见的负载均衡算法
  18. java 阻塞队列 相关问题,阻塞具体是如何实现的?
  19. 静态代码块. 构造代码块. 构造函数以及Java类初始化顺序
  20. java 枚举的实现,内部如何进行存储的?
  21. 静态内部类与普通内部类,在用法. 初始化方面的区别?
  22. java 原子性 可见性 顺序性是通过什么来保证的?
  23. java 多线程内共享的模型
  24. 阻塞非阻塞与同步异步
  25. java nio原理
  26. 读写锁 自旋锁 尝试锁(cas) cas如何保证,查询到修改这个过程是原子的?
  27. 一个类中的静态变量是在类加载的哪个步骤加载的?
  28. synchronized与ReentrantLock 实现原理区别?
  29. threadlocal 实现原理?应用场景?
  30. 常见的设计模式
  31. 分布式事务
  32. 线程池工作原理及机制
  33. 线程挂了 保活
  34. keepalive 保活策略?
  35. Protocol Buffers 适用场景?
  36. http tcp 相比多了些什么?有什么不一样的地方?
  37. http与https区别?加密算法是?
  38. wait 是释放锁?为什么释放了锁,线程就挂起了。为什么线程wait了就挂起了?
  39. CMS 垃圾回收
  40. hashmap 线程不安全 什么时候会出现?会出现什么问题?(hashmap为啥线程不安全?)
  41. equals 比较原理?
  42. jvm 内存分布
  43. arraylist linklist
  44. interger 为null 转int 会发生什么?
  45. hashmap与hashset的关系?
  46. 线程与协程的区别?协程的优势?
  47. JDK8 如何实现协程?
  48. java lambda 实现原理
  49. java stream 实现原理
  50. 永久代(permanent generation)与Metaspace
  51. 如何保证GC ROOTS找的全?(比如中G1中)
  52. G1清理老年代. 年轻代是遍历所有吗?
  53. 可重入锁和不可重入锁?不可重入锁有啥缺陷?
  54. CPU密集型 Java线程池大小为何会大多被设置成CPU核心数+1?
  55. 什么情况下会出现ClassNotFoundException?
  56. 线程有几种状态?
  57. 如何动态上报JVM信息,以便后期排查OOM等问题?
  58. ConcurrentHashMap put的时候加锁的是数组上的元素 还是啥?
  59. Concurrenthashmap中用到的优化技巧?
  60. LRU如何实现?
  61. 为什么Concurrenthashmap扩容是安全的?
  62. LinkedHashMap和HashMap 区别?
  63. CompletableFuture get(long timeout, TimeUnit unit) throws TimeoutException, ExecutionException实现
    https://medium.com/@sergeykuptsov/how-it-works-in-java-completablefuture-3031dbbca66d
    64、Java time-based map/cache with expiring keys
    https://stackoverflow.com/questions/3802370/java-time-based-map-cache-with-expiring-keys
    65、jmap 其实是多个线程 他们之间是怎么通信 dump出数据的?(jmap命令的实现原理)
    66、GC的年轻代Survivor区,为什么是2个,而不是1个?
    https://stackoverflow.com/questions/10695298/java-gc-why-two-survivor-regions
    简单来说2个Survivor区,就是整理内存碎片的时候方便。
    67、类加载器及类加载机制

MySQL部分

  1. MySQL 时间 比较无效 原因?
  2. MySQL 数据库 索引 是以什么数据结构形式存储的?
  3. MySQL与sql server 异同点? 原理上?
  4. 索引顺序对于索引效果的影响?
  5. 数据库索引如何优化(从哪几个方面)?
  6. MySQL优化有哪些?
  7. 比如一个表中有100条数据,a字段的值,是从1到100,我要更新其中的数据,where条件时a>10
    MySQL通过innodb引擎的话,是通过表锁还是行锁?
  8. MySQL mvcc多版本并发控制
  9. MySQL为什么选中B+ TREE而不是B TREE?两种数据结构有什么区别?

B+ 树继承于 B 树,都限定了节点中数据数目和子节点的数目。B 树所有节点都可以映射数据,B+ 树只有叶子节点可以映射数据。
单独看这部分设计,看不出 B+ 树的优势。为了只有叶子节点可以映射数据,B+ 树创造了很多冗余的索引(所有非叶子节点都是冗余索引),这些冗余索引让 B+ 树在插入、删除的效率都更高,而且可以自动平衡,因此 B+ 树的所有叶子节点总是在一个层级上。所以 B+ 树可以用一条链表串联所有的叶子节点,也就是索引数据,这让 B+ 树的范围查找和聚合运算更快。

  1. MySQL 范围查询?索引的数据结构是如何处理的?
  2. MySQL事务提交原理?
  3. 聚集索引 非聚集索引 查询效率?
  4. MySQL 乐观锁 悲观锁
  5. 数据库分库分表

基础

  1. 进程间通信方式有哪些?
  2. 有些信号你能捕获,有些信号你是捕获不了的,捕获不了的信号有哪些?
  3. zookeeper 可以通过watch,用来做进程间通信,那么zk底层是使用什么方式来实现进程间通信的?依赖操作系统如何实现的?
  4. socket通信
  5. keepalive时间限制
  6. tcp 如何处理粘包问题
  7. http协议 如何区分header头还有body体
  8. tcp 协议网络段 协议簇
  9. 一次完整的http请求
  10. http code 302 304含义

线上

  1. 如何线上debug?比如线上的cpu爆了,这个步骤是?
  2. 线上fd耗光了,如何排查?
  3. 如何定位OOM 问题?

Kakfa

  1. kafka某个broker上是否可以有无限个topic?或者万级别的topic?
  2. kafka 设计,还有broker上文件存储
  3. kakfa是否支持顺序消费消息?
  4. zk在kafka集群中的作用
  5. kafka 消费时候可以批量拉取?
  6. 消息队列 选型 为什么选择kafka?
  7. kafka增加. 删除节点时如何迁移数据?新的数据如何分配?
  8. kafka写入消息 如何保证回滚或者保证不被消费
  9. kafka 如何确保消息消费且只消费一次?
  10. kafka 大批量写入 是怎么传输的?
    对象缓存池
    https://www.sohu.com/a/346950666_100123073
    https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
  11. Kafka和RocketMQ存储区别
    https://www.cnblogs.com/lewis09/p/11168902.html

ElasticSearch

  1. 在ElasticSearch中,集群(Cluster),节点(Node),分片(Shard),Indices(索引),replicas(备份)之间是什么关系?
  2. elasticsearch整个建索引. 查询的过程
  3. elasticsearch如何选举
  4. ik 是如何进行分词的?
  5. es Scroll 原理? Search After原理?
  6. es 副本作用?
  7. MySQL elasticsearch 查询对比?(比如整个搜索流程)
  8. elasticsearch match filter 差异点?
  9. es 评分机制/原理

OpenTSDB

  1. OpenTSDB与HBase 关系

脑经急转弯

  1. 判断一个整数是2的N次方?
  2. 二叉树拷贝(非递归)
  3. BitMap算法(应用)

其他

  1. 分布式锁有哪些实现方式?
  2. 分布式事务
  3. 异地多活
  4. zookeeper集群 当一个节点挂了一天,当再次启动的时候,如何识别哪个是leader?
  5. 有什么知名的开源apm(Application Performance Management)工具吗?
  6. pinpoint 原理?
  7. consul template作用?如何与prometheus交互的?

金融

  1. 同业拆借
  2. 信用卡消费一笔钱,是如何到收款人的账户的?(整个流转过程)
  3. 复式记账

Spring

  1. spring 注入 接口即如何注入一个接口的多个实现类?
  2. spring 中用到的设计模式?spring中一次完整的http请求链路?
  3. 手写stater
  4. spring 类自动加载机制
  5. spring fegin 接口相互调用异常问题解决,有没有熔断啥的配置?

一、什么是代理?

代理是一种常用的设计模式,其目的就是为其他对象提供一个代理以控制对某个对象的访问。代理类负责为委托类预处理消息,过滤消息并转发消息,以及进行消息被委托类执行后的后续处理。

代理模式UML图:


简单结构示意图:


为了保持行为的一致性,代理类和委托类通常会实现相同的接口,所以在访问者看来两者没有丝毫的区别。通过代理类这中间一层,能有效控制对委托类对象的直接访问,也可以很好地隐藏和保护委托类对象,同时也为实施不同控制策略预留了空间,从而在设计上获得了更大的灵活性。Java 动态代理机制以巧妙的方式近乎完美地实践了代理模式的设计理念。

二、Java 动态代理类

Java动态代理类位于java.lang.reflect包下,一般主要涉及到以下两个类:

(1)Interface InvocationHandler:该接口中仅定义了一个方法

1
public object invoke(Object obj,Method method, Object[] args)

在实际使用时,第一个参数obj一般是指代理类,method是被代理的方法,如上例中的request(),args为该方法的参数数组。这个抽象方法在代理类中动态实现。

(2)Proxy:该类即为动态代理类,其中主要包含以下内容:

protected Proxy(InvocationHandler h):构造函数,用于给内部的h赋值。

static Class getProxyClass(

ClassLoader loader,

Class[] interfaces):获得一个代理类,其中loader是类装载器,interfaces是真实类所拥有的全部接口的数组。

static Object newProxyInstance(ClassLoaderloader, Class[] interfaces,InvocationHandler h):返回代理类的一个实例,返回后的代理类可以当作被代理类使用(可使用被代理类的在Subject接口中声明过的方法)

所谓DynamicProxy是这样一种class:它是在运行时生成的class,在生成它时你必须提供一组interface给它,然后该class就宣称它实现了这些interface。你当然可以把该class的实例当作这些interface中的任何一个来用。当然,这个DynamicProxy其实就是一个Proxy,它不会替你作实质性的工作,在生成它的实例时你必须提供一个handler,由它接管实际的工作。

在使用动态代理类时,我们必须实现InvocationHandler接口

通过这种方式,被代理的对象(RealSubject)可以在运行时动态改变,需要控制的接口(Subject接口)可以在运行时改变,控制的方式(DynamicSubject类)也可以动态改变,从而实现了非常灵活的动态代理关系。

动态代理步骤:

  1. 创建一个实现接口InvocationHandler的类,它必须实现invoke方法

  2. 创建被代理的类以及接口

  3. 通过Proxy的静态方法

newProxyInstance(ClassLoaderloader,Class[]interfaces,InvocationHandler h)创建一个代理

  1. 通过代理调用方法

三、JDK的动态代理怎么使用?

1、需要动态代理的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package jiankunking;

/**
* 需要动态代理的接口
*/
public interface Subject {
/**
* 你好
*
* @param name
* @return
*/
public String SayHello(String name);

/**
* 再见
*
* @return
*/
public String SayGoodBye();
}

2、需要代理的实际对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

package jiankunking;

/**
* 实际对象
*/
public class RealSubject implements Subject {

/**
* 你好
*
* @param name
* @return
*/
@Override
public String SayHello(String name) {
return "hello " + name;
}

/**
* 再见
*
* @return
*/
@Override
public String SayGoodBye() {
return " good bye ";
}
}

3、调用处理器实现类(有木有感觉这里就是传说中的AOP啊)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package jiankunking;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;


/**
* 调用处理器实现类
* 每次生成动态代理类对象时都需要指定一个实现了该接口的调用处理器对象
*/
public class InvocationHandlerImpl implements InvocationHandler {

/**
* 这个就是我们要代理的真实对象
*/
private Object subject;

/**
* 构造方法,给我们要代理的真实对象赋初值
*
* @param subject
*/
public InvocationHandlerImpl(Object subject) {
this.subject = subject;
}

/**
* 该方法负责集中处理动态代理类上的所有方法调用。
* 调用处理器根据这三个参数进行预处理或分派到委托类实例上反射执行
*
* @param proxy 代理类实例
* @param method 被调用的方法对象
* @param args 调用参数
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//在代理真实对象前我们可以添加一些自己的操作
System.out.println("在调用之前,我要干点啥呢?");
System.out.println("Method:" + method);
//当代理对象调用真实对象的方法时,其会自动的跳转到代理对象关联的handler对象的invoke方法来进行调用
Object returnValue = method.invoke(subject, args);
//在代理真实对象后我们也可以添加一些自己的操作
System.out.println("在调用之后,我要干点啥呢?");
return returnValue;
}
}

4、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package jiankunking;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;

/**
* 动态代理演示
*/
public class DynamicProxyDemonstration {
public static void main(String[] args) {
//代理的真实对象
Subject realSubject = new RealSubject();
/**
* InvocationHandlerImpl 实现了 InvocationHandler 接口,并能实现方法调用从代理类到委托类的分派转发
* 其内部通常包含指向委托类实例的引用,用于真正执行分派转发过来的方法调用.
* 即:要代理哪个真实对象,就将该对象传进去,最后是通过该真实对象来调用其方法
*/
InvocationHandler handler = new InvocationHandlerImpl(realSubject);

ClassLoader loader = handler.getClass().getClassLoader();
Class<?>[] interfaces = realSubject.getClass().getInterfaces();
/**
* 该方法用于为指定类装载器、一组接口及调用处理器生成动态代理类实例
*/
Subject subject = (Subject) Proxy.newProxyInstance(loader, interfaces, handler);

System.out.println("动态代理对象的类型:" + subject.getClass().getName());

String hello = subject.SayHello("jiankunking");
System.out.println(hello);
// String goodbye = subject.SayGoodBye();
// System.out.println(goodbye);
}
}

5、输出结果如下:

四、动态代理怎么实现的?

从使用代码中可以看出,关键点在:

1
Subject subject = (Subject) Proxy.newProxyInstance(loader, interfaces, handler);

通过跟踪提示代码可以看出:当代理对象调用真实对象的方法时,其会自动的跳转到代理对象关联的handler对象的invoke方法来进行调用。

也就是说,当代码执行到:subject.SayHello(“jiankunking”)这句话时,会自动调用InvocationHandlerImpl的invoke方法。这是为啥呢?

下面是代码跟分析的过程,不想看的朋友可以直接看结论

以下代码来自:JDK1.8.0_92

既然生成代理对象是用的Proxy类的静态方newProxyInstance,那么我们就去它的源码里看一下它到底都做了些什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* Returns an instance of a proxy class for the specified interfaces
* that dispatches method invocations to the specified invocation
* handler.
*
* <p>{@code Proxy.newProxyInstance} throws
* {@code IllegalArgumentException} for the same reasons that
* {@code Proxy.getProxyClass} does.
*
* @param loader the class loader to define the proxy class
* @param interfaces the list of interfaces for the proxy class
* to implement
* @param h the invocation handler to dispatch method invocations to
* @return a proxy instance with the specified invocation handler of a
* proxy class that is defined by the specified class loader
* and that implements the specified interfaces
* @throws IllegalArgumentException if any of the restrictions on the
* parameters that may be passed to {@code getProxyClass}
* are violated
* @throws SecurityException if a security manager, <em>s</em>, is present
* and any of the following conditions is met:
* <ul>
* <li> the given {@code loader} is {@code null} and
* the caller's class loader is not {@code null} and the
* invocation of {@link SecurityManager#checkPermission
* s.checkPermission} with
* {@code RuntimePermission("getClassLoader")} permission
* denies access;</li>
* <li> for each proxy interface, {@code intf},
* the caller's class loader is not the same as or an
* ancestor of the class loader for {@code intf} and
* invocation of {@link SecurityManager#checkPackageAccess
* s.checkPackageAccess()} denies access to {@code intf};</li>
* <li> any of the given proxy interfaces is non-public and the
* caller class is not in the same {@linkplain Package runtime package}
* as the non-public interface and the invocation of
* {@link SecurityManager#checkPermission s.checkPermission} with
* {@code ReflectPermission("newProxyInPackage.{package name}")}
* permission denies access.</li>
* </ul>
* @throws NullPointerException if the {@code interfaces} array
* argument or any of its elements are {@code null}, or
* if the invocation handler, {@code h}, is
* {@code null}
*/
@CallerSensitive
public static Object newProxyInstance(ClassLoader loader,
Class<?>[] interfaces,
InvocationHandler h) throws IllegalArgumentException {
//检查h 不为空,否则抛异常
Objects.requireNonNull(h);

final Class<?>[] intfs = interfaces.clone();
final SecurityManager sm = System.getSecurityManager();
if (sm != null) {
checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
}

/*
* 获得与指定类装载器和一组接口相关的代理类类型对象
*/
Class<?> cl = getProxyClass0(loader, intfs);

/*
* 通过反射获取构造函数对象并生成代理类实例
*/
try {
if (sm != null) {
checkNewProxyPermission(Reflection.getCallerClass(), cl);
}
//获取代理对象的构造方法(也就是$Proxy0(InvocationHandler h))
final Constructor<?> cons = cl.getConstructor(constructorParams);
final InvocationHandler ih = h;
if (!Modifier.isPublic(cl.getModifiers())) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
cons.setAccessible(true);
return null;
}
});
}
//生成代理类的实例并把InvocationHandlerImpl的实例传给它的构造方法
return cons.newInstance(new Object[]{h});
} catch (IllegalAccessException|InstantiationException e) {
throw new InternalError(e.toString(), e);
} catch (InvocationTargetException e) {
Throwable t = e.getCause();
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new InternalError(t.toString(), t);
}
} catch (NoSuchMethodException e) {
throw new InternalError(e.toString(), e);
}
}

我们再进去getProxyClass0方法看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Generate a proxy class. Must call the checkProxyAccess method
* to perform permission checks before calling this.
*/
private static Class<?> getProxyClass0(ClassLoader loader,
Class<?>... interfaces) {
if (interfaces.length > 65535) {
throw new IllegalArgumentException("interface limit exceeded");
}

// If the proxy class defined by the given loader implementing
// the given interfaces exists, this will simply return the cached copy;
// otherwise, it will create the proxy class via the ProxyClassFactory
return proxyClassCache.get(loader, interfaces);
}

真相还是没有来到,继续,看一下 proxyClassCache

1
2
3
4
 /**
* a cache of proxy classes
*/
private static final WeakCache<ClassLoader, Class<?>[], Class<?>> proxyClassCache = new WeakCache<>(new KeyFactory(), new ProxyClassFactory());

奥,原来用了一下缓存啊

那么它对应的get方法啥样呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* Look-up the value through the cache. This always evaluates the
* {@code subKeyFactory} function and optionally evaluates
* {@code valueFactory} function if there is no entry in the cache for given
* pair of (key, subKey) or the entry has already been cleared.
*
* @param key possibly null key
* @param parameter parameter used together with key to create sub-key and
* value (should not be null)
* @return the cached value (never null)
* @throws NullPointerException if {@code parameter} passed in or
* {@code sub-key} calculated by
* {@code subKeyFactory} or {@code value}
* calculated by {@code valueFactory} is null.
*/
public V get(K key, P parameter) {
Objects.requireNonNull(parameter);
expungeStaleEntries();
Object cacheKey = CacheKey.valueOf(key, refQueue);
// lazily install the 2nd level valuesMap for the particular cacheKey
ConcurrentMap<Object, Supplier<V>> valuesMap = map.get(cacheKey);
if (valuesMap == null) {
//putIfAbsent这个方法在key不存在的时候加入一个值,如果key存在就不放入
ConcurrentMap<Object, Supplier<V>> oldValuesMap = map.putIfAbsent(cacheKey,valuesMap = new ConcurrentHashMap<>());
if (oldValuesMap != null) {
valuesMap = oldValuesMap;
}
}

// create subKey and retrieve the possible Supplier<V> stored by that
// subKey from valuesMap
Object subKey = Objects.requireNonNull(subKeyFactory.apply(key, parameter));
Supplier<V> supplier = valuesMap.get(subKey);
Factory factory = null;

while (true) {
if (supplier != null) {
// supplier might be a Factory or a CacheValue<V> instance
V value = supplier.get();
if (value != null) {
return value;
}
}
// else no supplier in cache
// or a supplier that returned null (could be a cleared CacheValue
// or a Factory that wasn't successful in installing the CacheValue)

// lazily construct a Factory
if (factory == null) {
factory = new Factory(key, parameter, subKey, valuesMap);
}

if (supplier == null) {
supplier = valuesMap.putIfAbsent(subKey, factory);
if (supplier == null) {
// successfully installed Factory
supplier = factory;
}
// else retry with winning supplier
} else {
if (valuesMap.replace(subKey, supplier, factory)) {
// successfully replaced
// cleared CacheEntry / unsuccessful Factory
// with our Factory
supplier = factory;
} else {
// retry with current supplier
supplier = valuesMap.get(subKey);
}
}
}
}

我们可以看到它调用了 supplier.get(); 获取动态代理类,其中supplier是Factory,这个类定义在WeakCach的内部。

来瞅瞅,get里面又做了什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public synchronized V get() { // serialize access
// re-check
Supplier<V> supplier = valuesMap.get(subKey);
if (supplier != this) {
// something changed while we were waiting:
// might be that we were replaced by a CacheValue
// or were removed because of failure ->
// return null to signal WeakCache.get() to retry
// the loop
return null;
}
// else still us (supplier == this)

// create new value
V value = null;
try {
value = Objects.requireNonNull(valueFactory.apply(key, parameter));
} finally {
if (value == null) { // remove us on failure
valuesMap.remove(subKey, this);
}
}
// the only path to reach here is with non-null value
assert value != null;

// wrap value with CacheValue (WeakReference)
CacheValue<V> cacheValue = new CacheValue<>(value);

// try replacing us with CacheValue (this should always succeed)
if (valuesMap.replace(subKey, this, cacheValue)) {
// put also in reverseMap
reverseMap.put(cacheValue, Boolean.TRUE);
} else {
throw new AssertionError("Should not reach here");
}

// successfully replaced us with new CacheValue -> return the value
// wrapped by it
return value;
}
}

发现重点还是木有出现,但我们可以看到它调用了valueFactory.apply(key, parameter)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
 /**
* A factory function that generates, defines and returns the proxy class given
* the ClassLoader and array of interfaces.
*/
private static final class ProxyClassFactory implements BiFunction<ClassLoader, Class<?>[], Class<?>> {
// prefix for all proxy class names
private static final String proxyClassNamePrefix = "$Proxy";

// next number to use for generation of unique proxy class names
private static final AtomicLong nextUniqueNumber = new AtomicLong();

@Override
public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {
Map<Class<?>, Boolean> interfaceSet = new IdentityHashMap<>(interfaces.length);
for (Class<?> intf : interfaces) {
/*
* Verify that the class loader resolves the name of this
* interface to the same Class object.
*/
Class<?> interfaceClass = null;
try {
interfaceClass = Class.forName(intf.getName(), false, loader);
} catch (ClassNotFoundException e) {
}
if (interfaceClass != intf) {
throw new IllegalArgumentException(
intf + " is not visible from class loader");
}
/*
* Verify that the Class object actually represents an
* interface.
*/
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException(
interfaceClass.getName() + " is not an interface");
}
/*
* Verify that this interface is not a duplicate.
*/
if (interfaceSet.put(interfaceClass, Boolean.TRUE) != null) {
throw new IllegalArgumentException(
"repeated interface: " + interfaceClass.getName());
}
}

String proxyPkg = null; // package to define proxy class in
int accessFlags = Modifier.PUBLIC | Modifier.FINAL;

/*
* Record the package of a non-public proxy interface so that the
* proxy class will be defined in the same package. Verify that
* all non-public proxy interfaces are in the same package.
*/
for (Class<?> intf : interfaces) {
int flags = intf.getModifiers();
if (!Modifier.isPublic(flags)) {
accessFlags = Modifier.FINAL;
String name = intf.getName();
int n = name.lastIndexOf('.');
String pkg = ((n == -1) ? "" : name.substring(0, n + 1));
if (proxyPkg == null) {
proxyPkg = pkg;
} else if (!pkg.equals(proxyPkg)) {
throw new IllegalArgumentException(
"non-public interfaces from different packages");
}
}
}

if (proxyPkg == null) {
// if no non-public proxy interfaces, use com.sun.proxy package
proxyPkg = ReflectUtil.PROXY_PACKAGE + ".";
}

/*
* Choose a name for the proxy class to generate.
*/
long num = nextUniqueNumber.getAndIncrement();
String proxyName = proxyPkg + proxyClassNamePrefix + num;

/*
* Generate the specified proxy class.
*/
byte[] proxyClassFile = ProxyGenerator.generateProxyClass(
proxyName, interfaces, accessFlags);
try {
return defineClass0(loader, proxyName, proxyClassFile, 0, proxyClassFile.length);
} catch (ClassFormatError e) {
/*
* A ClassFormatError here means that (barring bugs in the
* proxy class generation code) there was some other
* invalid aspect of the arguments supplied to the proxy
* class creation (such as virtual machine limitations
* exceeded).
*/
throw new IllegalArgumentException(e.toString());
}
}
}

通过看代码终于找到了重点:

1
2
//生成字节码
byte[] proxyClassFile = ProxyGenerator.generateProxyClass(proxyName, interfaces, accessFlags);

那么接下来我们也使用测试一下,使用这个方法生成的字节码是个什么样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package jiankunking;

import sun.misc.ProxyGenerator;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;

/**
* 动态代理演示
*/
public class DynamicProxyDemonstration {
public static void main(String[] args) {
//代理的真实对象
Subject realSubject = new RealSubject();

/**
* InvocationHandlerImpl 实现了 InvocationHandler 接口,并能实现方法调用从代理类到委托类的分派转发
* 其内部通常包含指向委托类实例的引用,用于真正执行分派转发过来的方法调用.
* 即:要代理哪个真实对象,就将该对象传进去,最后是通过该真实对象来调用其方法
*/
InvocationHandler handler = new InvocationHandlerImpl(realSubject);

ClassLoader loader = handler.getClass().getClassLoader();
Class[] interfaces = realSubject.getClass().getInterfaces();
/**
* 该方法用于为指定类装载器、一组接口及调用处理器生成动态代理类实例
*/
Subject subject = (Subject) Proxy.newProxyInstance(loader, interfaces, handler);
System.out.println("动态代理对象的类型:"+subject.getClass().getName());

String hello = subject.SayHello("jiankunking");
System.out.println(hello);
// 将生成的字节码保存到本地,
createProxyClassFile();
}
private static void createProxyClassFile(){
String name = "ProxySubject";
byte[] data = ProxyGenerator.generateProxyClass(name,new Class[]{Subject.class});
FileOutputStream out =null;
try {
out = new FileOutputStream(name+".class");
System.out.println((new File("hello")).getAbsolutePath());
out.write(data);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally {
if(null!=out) try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

可以看一下这里代理对象的类型:


我们用jd-jui 工具将生成的字节码反编译:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import jiankunking.Subject;

public final class ProxySubject extends Proxy implements Subject {
private static Method m1;
private static Method m3;
private static Method m4;
private static Method m2;
private static Method m0;

public ProxySubject(InvocationHandler paramInvocationHandler) {
super(paramInvocationHandler);
}

public final boolean equals(Object paramObject){
try {
return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();
} catch (Error|RuntimeException localError){
throw localError;
} catch (Throwable localThrowable) {
throw new UndeclaredThrowableException(localThrowable);
}
}

public final String SayGoodBye() {
try{
return (String)this.h.invoke(this, m3, null);
} catch (Error|RuntimeException localError) {
throw localError;
} catch (Throwable localThrowable) {
throw new UndeclaredThrowableException(localThrowable);
}
}

public final String SayHello(String paramString) {
try {
return (String)this.h.invoke(this, m4, new Object[] { paramString });
} catch (Error|RuntimeException localError){
throw localError;
} catch (Throwable localThrowable){
throw new UndeclaredThrowableException(localThrowable);
}
}

public final String toString() {
try {
return (String)this.h.invoke(this, m2, null);
} catch (Error|RuntimeException localError) {
throw localError;
} catch (Throwable localThrowable) {
throw new UndeclaredThrowableException(localThrowable);
}
}

public final int hashCode() {
try {
return ((Integer)this.h.invoke(this, m0, null)).intValue();
} catch (Error|RuntimeException localError) {
throw localError;
} catch (Throwable localThrowable) {
throw new UndeclaredThrowableException(localThrowable);
}
}

static
{
try {
m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });
m3 = Class.forName("jiankunking.Subject").getMethod("SayGoodBye", new Class[0]);
m4 = Class.forName("jiankunking.Subject").getMethod("SayHello", new Class[] { Class.forName("java.lang.String") });
m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
return;
} catch (NoSuchMethodException localNoSuchMethodException) {
throw new NoSuchMethodError(localNoSuchMethodException.getMessage());
} catch (ClassNotFoundException localClassNotFoundException) {
throw new NoClassDefFoundError(localClassNotFoundException.getMessage());
}
}
}

这就是最终真正的代理类,它继承自Proxy并实现了我们定义的Subject接口,也就是说:

1
Subject subject = (Subject) Proxy.newProxyInstance(loader, interfaces, handler);

这里的subject实际是这个类的一个实例,那么我们调用它的:

1
public final String SayHello(String paramString)

就是调用我们定义的InvocationHandlerImpl的 invoke方法:

上面是代码跟分析的过程,不想看的朋友可以直接看结论

五、结论

到了这里,终于解答了:

subject.SayHello(“jiankunking”)这句话时,为什么会自动调用InvocationHandlerImpl的invoke方法?

因为JDK生成的最终真正的代理类,它继承自Proxy并实现了我们定义的Subject接口,在实现Subject接口方法的内部,通过反射调用了InvocationHandlerImpl的invoke方法。

通过分析代码可以看出Java 动态代理,具体有如下四步骤:

  1. 通过实现 InvocationHandler 接口创建自己的调用处理器;

  2. 通过为 Proxy 类指定 ClassLoader 对象和一组 interface 来创建动态代理类;

  3. 通过反射机制获得动态代理类的构造函数,其唯一参数类型是调用处理器接口类型;

  4. 通过构造函数创建动态代理类实例,构造时调用处理器对象作为参数被传入。

演示代码下载:
https://github.com/jiankunking/DynamicProxyDemo