0%

Java 并发 - 多线程:线程间通信交替打印

1.前言

这一部分,我通过一个经典的问题来引出几个线程间通信的方法,即:三个线程如何实现交替打印ABC?

2.代码实现

2.1 Synchronized

基本思路:使用同步块和wait、notify的方法控制三个线程的执行次序。具体方法如下:从大的方向上来讲,该问题为三线程间的同步唤醒操作,主要的目的就是ThreadA->ThreadB->ThreadC->ThreadA循环执行三个线程。为了控制线程执行的顺序,那么就必须要确定唤醒、等待的顺序,所以每一个线程必须同时持有两个对象锁,才能进行打印操作。一个对象锁是prev,就是前一个线程所对应的对象锁,其主要作用是保证当前线程一定是在前一个线程操作完成后(即前一个线程释放了其对应的对象锁)才开始执行。还有一个锁就是自身对象锁。主要的思想就是,为了控制执行的顺序,必须要先持有prev锁(也就前一个线程要释放其自身对象锁),然后当前线程再申请自己对象锁,两者兼备时打印。之后首先调用self.notify()唤醒下一个等待线程(注意notify不会立即释放对象锁,只有等到同步块代码执行完毕后才会释放),再调用prev.wait()立即释放prev对象锁,当前线程进入休眠,等待其他线程的notify操作再次唤醒。

下面程序可以看到程序一共定义了a,b,c三个对象锁,分别对应A、B、C三个线程。A线程最先运行,A线程按顺序申请c,a对象锁,打印操作后按顺序释放a,c对象锁,并且通过notify操作唤醒线程B。线程B首先等待获取A锁,再申请B锁,后打印B,再释放B,A锁,唤醒C。线程C等待B锁,再申请C锁,后打印C,再释放C,B锁,唤醒A。看起来似乎没什么问题,但如果你仔细想一下,就会发现有问题,就是初始条件,三个线程必须按照A,B,C的顺序来启动,但是这种假设依赖于JVM中线程调度、执行的顺序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class UseSynchronized {
public static class ThreadPrinter implements Runnable {
private String name;
private Object prev;
private Object self;

private ThreadPrinter(String name, Object prev, Object self) {
this.name = name;
this.prev = prev;
this.self = self;
}

@Override
public void run() {
int count = 10;
while (count > 0) {// 多线程并发,不能用if,必须使用while循环
synchronized (prev) { // 先获取 prev 锁
synchronized (self) {// 再获取 self 锁
System.out.print(name);// 打印
count--;
self.notifyAll();// 唤醒其他线程竞争self锁,注意此时self锁并未立即释放。
}
// 此时执行完self的同步块,这时self锁才释放。
try {
if (count == 0) { // 如果count==0,表示这是最后一次打印操作,通过notifyAll操作释放对象锁。
prev.notifyAll();
} else {
prev.wait(); // 立即释放 prev锁,当前线程休眠,等待唤醒
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

public static void main(String[] args) throws Exception {
Object a = new Object();
Object b = new Object();
Object c = new Object();
ThreadPrinter pa = new ThreadPrinter("A", c, a);
ThreadPrinter pb = new ThreadPrinter("B", a, b);
ThreadPrinter pc = new ThreadPrinter("C", b, c);

new Thread(pa).start();
Thread.sleep(10);// 保证初始ABC的启动顺序
new Thread(pb).start();
Thread.sleep(10);
new Thread(pc).start();
Thread.sleep(10);
}
}

输出如下:

1
ABCABCABCABCABCABCABCABCABCABC

从这里,我们也可以得出wait和notify操作的异同:

  1. wait()notify/notifyAll() 是Object类的方法,在执行两个方法时,要先获得锁。
  2. 当线程执行wait()时,会把当前的锁释放,然后让出CPU,进入等待状态。
  3. 当执行notify/notifyAll方法时,会唤醒一个处于等待该 对象锁 的线程,然后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁。

从这里还可以看出,notify/notifyAll()执行后,并不立即释放锁,而是要等到执行完临界区中代码后,再释放。所以在实际编程中,我们应该尽量在线程调用notify/notifyAll()后,立即退出临界区。即不要在notify/notifyAll()后面再写一些耗时的代码。


2.2 Lock锁方法

基本思路:通过ReentrantLock我们可以很方便的进行显式的锁操作,即获取锁和释放锁,对于同一个对象锁而言,同一时刻只可能有一个线程拿到了这个锁,此时其他线程通过lock.lock()来获取对象锁时都会被阻塞,直到这个线程通过lock.unlock()操作释放这个锁后,其他线程才能拿到这个锁。

值得注意的是ReentrantLock是可重入锁,它持有一个锁计数器,当已持有锁的线程再次获得该锁时计数器值加1,每调用一次lock.unlock()时所计数器值减一,直到所计数器值为0,此时线程释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class UseLock {

// 通过Lock锁来保证线程的访问的互斥
private static Lock lock = new ReentrantLock();
// 通过state的值来确定是否打印
private static int state = 0;

static class ThreadA extends Thread {
@Override
public void run() {
for (int i = 0; i < 10;) {
try {
lock.lock();
// 多线程并发,不能用if,必须用循环测试等待条件,避免虚假唤醒
while (state % 3 == 0) {
System.out.print("A");
state++;
i++;
}
} finally {
// unlock()操作必须放在finally块中
lock.unlock();
}
}
}
}

static class ThreadB extends Thread {
@Override
public void run() {
for (int i = 0; i < 10;) {
try {
lock.lock();
while (state % 3 == 1) {
System.out.print("B");
state++;
i++;
}
} finally {
lock.unlock();
}
}
}
}

static class ThreadC extends Thread {
@Override
public void run() {
for (int i = 0; i < 10;) {
try {
lock.lock();
while (state % 3 == 2) {
System.out.print("C");
state++;
i++;
}
} finally {
lock.unlock();
}
}
}
}

public static void main(String[] args) {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}

输出如下:

1
ABCABCABCABCABCABCABCABCABCABC

2.3 ReentrantLock结合Condition

与ReentrantLock搭配的通行方式是Condition,Condition是被绑定到Lock上的,必须使用lock.newCondition()才能创建一个Condition。从下面的代码可以看出,Synchronized能实现的通信方式,Condition都可以实现,功能类似的代码写在同一行中。这样解题思路就和第一种方法基本一致,只是采用的方法不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class UseCondition {
// 创建一个可重入锁lock
private static Lock lock = new ReentrantLock();
// 再在上面创建的lock 进行三个条件的绑定
private static Condition A = lock.newCondition();
private static Condition B = lock.newCondition();
private static Condition C = lock.newCondition();

private static int count = 0;

static class ThreadA extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
// 注意这里是不等于0,也就是说在count % 3为0之前,当前线程一直阻塞状态
while (count % 3 != 0)
// ConditionA 进入等待状态
A.await();
System.out.print("A");
count++;
// A执行完唤醒B线程
B.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

static class ThreadB extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
// 条件不满足时,ConditionB进行等待
// 当前面A线程执行后会通过B.signal()唤醒该线程,才会往下走
while (count % 3 != 1)
B.await();
System.out.print("B");
count++;
C.signal(); // B执行完唤醒C线程
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

static class ThreadC extends Thread {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 10; i++) {
// 同上不再解释
while (count % 3 != 2)
C.await();
System.out.print("C");
count++;
// C执行完唤醒A线程
A.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}

输出如下:

1
ABCABCABCABCABCABCABCABCABCABC

2.4 Semaphore信号量方式

Semaphore又称信号量,是操作系统中的一个概念,在Java并发编程中,信号量控制的是线程并发的数量。public Semaphore (int permits),其中参数permits就是允许同时运行的线程数目;

Semaphore是用来保护一个或者多个共享资源的访问,Semaphore内部维护了一个计数器,其值为可以访问的共享资源的个数。一个线程要访问共享资源,先获得信号量,如果信号量的计数器值大于1,意味着有共享资源可以访问,则使其计数器值减去1,再访问共享资源。如果计数器值为0,线程进入休眠。当某个线程使用完共享资源后,释放信号量,并将信号量内部的计数器加1,之前进入休眠的线程将被唤醒并再次试图获得信号量。

Semaphore使用时需要先构建一个参数来指定共享资源的数量,Semaphore构造完成后即是获取Semaphore、共享资源使用完毕后释放Semaphore。

1
2
3
4
Semaphore semaphore = new Semaphore(10, true);
semaphore.acquire();
// do something here
semaphore.release();

Demo 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class UseSemaphore {
// 以A开始的信号量,初始信号量数量为1
private static Semaphore A = new Semaphore(1);
// B、C信号量,A完成后开始,初始信号数量为0
private static Semaphore B = new Semaphore(0);
private static Semaphore C = new Semaphore(0);

static class ThreadA extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
A.acquire(); // A获取信号执行,A信号量减1,当A为0时将无法继续获得该信号量
System.out.print("A");
B.release(); // B释放信号,B信号量加1(初始为0),此时可以获取B信号量
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class ThreadB extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
B.acquire();
System.out.print("B");
C.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

static class ThreadC extends Thread {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
C.acquire();
System.out.println("C");
A.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
// 三个线程开启
new ThreadA().start();
new ThreadB().start();
new ThreadC().start();
}
}

输出如下:

1
2
3
4
5
6
7
8
9
10
ABC
ABC
ABC
ABC
ABC
ABC
ABC
ABC
ABC
ABC

2.5 使用join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class UseJoin {
static class WorkersA implements Runnable {
public void run() {
System.out.println("A");
}
}
static class WorkersB implements Runnable {
public void run() {
System.out.println("B");
}
}
static class WorkersC implements Runnable {
public void run() {
System.out.println("C");
}
}

public static void main(String[] args) throws Exception{
for (int i = 0; i < 10; i++) {
Thread thread1 = new Thread(new WorkersA());
thread1.start();
thread1.join();
Thread thread2 = new Thread(new WorkersB());
thread2.start();
thread2.join();
Thread thread3 = new Thread(new WorkersC());
thread3.start();
thread3.join();
}
}
}

输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C
A
B
C