0%

Java 并发 - 多线程:线程池的使用

1 如何配置线程池

如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。

但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

注:上下文切换的解释

  • 多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
  • 上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
  • Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。

需要肯定的一点是:线程池肯定是不是越大越好。

通常我们是需要根据这批任务执行的性质来确定的。

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 由于IO密集型任务的线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数 * 2。

这里还有一种参考的IO 密集型的线程池大小设置方式:IO密集型,即任务需要大量的IO,即大量的阻塞。在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行。故需要·多配置线程数:参考公式:

  • CPU核数/(1-阻塞系数 ) 阻塞系数在(0.8-0.9)之间
  • 比如8核CPU:8/(1-0.9) = 80个线程数

举个简单的实例如下:

假如一分钟内要写 1 万个 1M 的文件到磁盘,core 和 maximum 怎么设?

  • 这个是IO密集型的任务,如果是四核,core 和 maximum 就设置为8个;

要对一个数 0 加到一亿,要怎么设 core 和 maxim?

  • 加法操作,CPU利用率高,这是CPU密集型的任务,如果是四核,则core 和 maximum 就设置为5个;
  • 如果按照阻塞系数的方程来解决的话:四核CPU,则:4 / (1 - 0.9) = 40,则core 和 maximum 需要设置为40;

2 如何优雅的关闭线程池

如果线程池需要执行的任务完成了之后,我们也应该着手一下进行关闭线程池,那么有一些什么办法呢?无非就两种方法:shutdown()/shutdownNow()

那么这两者的区别是什么呢?

  • shutdown() :执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow() :也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。

两个方法都会中断线程,用户可自行判断是否需要响应中断。

shutdownNow() 要更简单粗暴,可以根据实际场景选择不同的方法。

这里借鉴一个方法,可以采取以下的方法关闭线程池:

1
2
3
4
5
6
7
8
9
10
long start = System.currentTimeMillis();
for (int i = 0; i <= 5; i++) {
pool.execute(new Job());
}
pool.shutdown();
while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.info("线程还在执行。。。");
}
long end = System.currentTimeMillis();
LOGGER.info("一共处理了【{}】", (end - start));

pool.awaitTermination(1, TimeUnit.SECONDS) 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),当从 while 循环退出时就表明线程池已经完全终止了。


3 线程池使用的两个Demo

为了更好的理解线程池的参数的设置与如何使用,这里有两个Demo:

3.1 Runnable+ThreadPoolExecutor

首先创建一个 Runnable 接口的实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import java.util.Date;

/**
* 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {

private String command;

// 构造函数
public MyRunnable(String s) {
this.command = s;
}

// 重写run方法
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// 重写toString方法
@Override
public String toString() {
return this.command;
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ThreadPoolExecutorDemo {

private static final int CORE_POOL_SIZE = 5; // 核心线程为5
private static final int MAX_POOL_SIZE = 10; // 最大线程数为10
private static final int QUEUE_CAPACITY = 100; // 队列数为100
// 只有当线程池中的线程数大于corePoolSize时,这个参数才会起作用。
// 当线程数大于corePoolSize时,终止前多余的空闲线程等待新任务的最长时间,等待时间为 1L。
private static final Long KEEP_ALIVE_TIME = 1L;

public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY), // 任务队列为 ArrayBlockingQueue,并且容量为 100;
new ThreadPoolExecutor.CallerRunsPolicy()); // 饱和策略为 CallerRunsPolicy

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}

//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

3.2 Callable+ThreadPoolExecutor

首先创建一个 Callable 接口的实现类

1
2
3
4
5
6
7
8
9
10
import java.util.concurrent.Callable;

public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
//返回执行当前 Callable 的线程名字
return Thread.currentThread().getName();
}
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class CallableDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;

public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

// 申请一个List,用来装返回的数据
List<Future<String>> futureList = new ArrayList<>();
// 开始进行实例的创建
Callable<String> callable = new MyCallable();

for (int i = 0; i < 10; i++) {
//提交任务到线程池
Future<String> future = executor.submit(callable);
//将返回值 future 添加到 list,我们可以通过 future 获得 执行 Callable 得到的返回值
futureList.add(future);
}
// 进行返回数据的遍历输出
for (Future<String> fut : futureList) {
try {
// 使用fut.get()得到数据
System.out.println(new Date() + "::" + fut.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
//关闭线程池
executor.shutdown();
}
}