0%

Java 并发 - AQS:LockSupport阻塞唤醒线程

1.简介

LockSupport在AQS中经常被调用,可以经常看到LockSupport的出现,看了很久也不清楚大概是个什么作用,总结了一些,它大概就是AQS用来实现线程的阻塞和唤醒的,分别通过LockSupport .park()LockSupport .unpark()进行实现的。

使用LockSupport的线程会与一个许可关联,其实就像是一个二元信号量(意思就是只有一个许可证可以使用),如果这个许可没有被占用,那么当前线程可以获得许可并继续执行,如果许可以已经被占用,则当前线程就会被阻塞,然后等待许可的获取。注意:许可默认是被占用的!

可以看看如下的一个小的测试:

1
2
3
4
5
6
7
public static void main(String[] args)
{
Thread thread = Thread.currentThread();
LockSupport.unpark(thread);//释放许可
LockSupport.park();// 获取许可
System.out.println("b");
}

这样一来先释放许可,再获取许可,主线程是能够正常打印出字符的,但是如果没有先释放许可的话,就会被阻塞了:

1
2
3
4
5
public static void main(String[] args)
{
LockSupport.park();
System.out.println("block.");
}

运行该代码,可以发现主线程一直处于阻塞状态。因为 许可默认是被占用的 ,调用park()时获取不到许可,所以进入阻塞状态。

还需要注意的是LockSupport是不可重入 的,如果一个线程连续2次调用 LockSupport .park(),那么该线程一定会一直阻塞下去。

LockSupport定义了一系列以park开头的方法来阻塞当前线程,unpark(Thread thread)方法来唤醒一个被阻塞的线程。如下:

LockSupport的几个方法


2.源码

2.1 park

park(Object blocker)方法的blocker参数,主要是用来标识当前线程在等待的对象,该对象主要用于问题排查和系统监控。

1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}

setBlocker主要完成的也就是设置屏障,其源码如下:

1
2
3
4
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
U.putReference(t, PARKBLOCKER, arg);
}

需要注意的是:park方法和unpark(Thread thread)都是成对出现的,同时unpark必须要在park执行之后执行,当然并不是说没有不调用unpark线程就会一直阻塞,park有一个方法,它带了时间戳(parkNanos(long nanos):为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用)。


2.2 unpark

unpark(Thread thread)方法源码如下:

1
2
3
4
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}

U是什么?查看源码可以发现一系列UNSAFE相关的方法:

1
2
3
4
5
6
7
8
// Hotspot implementation via intrinsics API
private static final Unsafe U = Unsafe.getUnsafe();
private static final long PARKBLOCKER = U.objectFieldOffset
(Thread.class, "parkBlocker");
private static final long SECONDARY = U.objectFieldOffset
(Thread.class, "threadLocalRandomSecondarySeed");
private static final long TID = U.objectFieldOffset
(Thread.class, "tid");

可以发现park和unpark其内部都是通过UNSAFE(sun.misc.Unsafe UNSAFE)来实现的,其定义如下:

1
2
public native void park(boolean var1, long var2);
public native void unpark(Object var1);

关于Unusafe,我想后面还是要去了解一下,Unusafe是一个比较危险的类,主要是用于执行低级别、不安全的方法集合。


3.官方案例

JDK源码给了一个案例,理解一下其具体操作,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<>();

// 上锁操作
public void lock() {
boolean wasInterrupted = false;
// publish current thread for unparkers
waiters.add(Thread.currentThread());

// Block while not first in queue or cannot acquire lock
while (waiters.peek() != Thread.currentThread() ||
!locked.compareAndSet(false, true)) {
//
LockSupport.park(this);
// ignore interrupts while waiting
if (Thread.interrupted())
wasInterrupted = true;
}

waiters.remove();
// ensure correct interrupt status on return
if (wasInterrupted)
Thread.currentThread().interrupt();
}

public void unlock() {
locked.set(false);
//
LockSupport.unpark(waiters.peek());
}

static {
// Reduce the risk of "lost unpark" due to classloading
Class<?> ensureLoaded = LockSupport.class;
}
}

参考文章:

  1. 【死磕Java并发】—–J.U.C之AQS:阻塞和唤醒线程
  2. LockSupport的park和unpark的基本使用,以及对线程中断的响应性