多线程学习—4-线程池的使用

前言

​ 写到线程池这块先回忆一段小插曲。13年12月的时候,差不多也是现在这个季节,刚实习一个月似懂非懂拿着活就干的我第一次接触到线程池,当时是做一个上报到的任务,差不多每天需要上传70万数据到一个接口。沈阳的大背景技术不是很强,工作了三四年的一些老程序员也很少接触到多线程。当时没用多线程之前差不多是

700000/(60*60*24*10)接近一天才能上报完,并且是程序不出错的情况下。于是乎白哥(我的第一个项目经理,这里要感谢白哥在我工作之初的时候谆谆教导!和第一份工作远在东北的同事们!)想到了用多线程来处理,也许是没时间研究底层,出来的效果也不尽人意,总是会丢数据,虽然时间加快到两个小时以内,不知天高地厚的我,当然也是不懂把失败策略,改了下在类似以下的代码中把原来的代码(原来代码不记得了)改为:

1
2
3
4
5
6
7
8
//线程池对拒绝任务的处理策略
final RejectedExecutionHandler Errorhandler = new RejectedExecutionHandler() {
public void rejectedExecution( Runnable r, ThreadPoolExecutor executor ) {
if (!executor.isShutdown()) {
r.run();
}
}
};

其实就是把if语句内替换成r.run()这一句,居然奇迹般的不丢数据了,当时我第一次感到程序和多线程的魅力,后来浑浑噩噩的这么久整整三年竟然还是似懂非懂,说来惭愧!这几天继续学习多线程,刚好看到线程池这边,回忆起往事真是感慨颇多,多年前定的计划能完成的真是少之又少,希望以后能分而划之的坚持下去!

线程池的作用

线程池的作用

线程池的作用就2个:

  • 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
  • 可以根据系统的承受能力,调整线程池中工作线程的数据,防止因为消耗过多的内存导致服务器崩溃

​ 使用线程池,要根据系统的环境情况,手动或自动设置线程数目。少了系统运行效率不高,多了系统拥挤、占用内存多。用线程池控制数量,其他线程排队等候。一个任务执行完毕,再从队列中取最前面的任务开始执行。若任务中没有等待任务,线程池这一资源处于等待。当一个新任务需要运行,如果线程池中有等待的工作线程,就可以开始运行了,否则进入等待队列。

​ 可以适当的修改文末代码中的参数来体验一下速度或者在excute里加上sleep方法,感受下失败策略(即丢数据的情况),减小线程池数量大小可以加快速度,因为减少了线程池创建的时间,但是相应的降低了系统的处理能力。

参数总结

  • 池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程

  • 池中线程数大于等于corePoolSize,workQueue未满,首选将新任务假如workQueue而不是添加新线程

  • 池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务

  • 池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务

    上面的四个总结是前辈所写,由于这里主要是记录本人学习过程,就不添加链接了。

Executors

JDK推荐的三个Executors:

  • 单线程线程池

    那么线程池中运行的线程数肯定是1。workQueue选择了无界的LinkedBlockingQueue,那么不管来多少任务都排队,前面一个任务执行完毕,再执行队列中的线程。从这个角度讲,第二个参数maximumPoolSize是没有意义的,因为maximumPoolSize描述的是排队的任务多过workQueue的容量,线程池中最多只能容纳maximumPoolSize个任务,现在workQueue是无界的,也就是说排队的任务永远不会多过workQueue的容量,那maximum其实设置多少都无所谓了。

  • newFixedThreadPool(int nThreads) 固定大小线程池

    固定大小的线程池和单线程的线程池异曲同工,无非是让线程池中能运行的线程编程了手动指定的nThreads罢了。同样,由于是选择了LinkedBlockingQueue,因此其实第二个参数maximumPoolSize同样也是无意义的

  • newCachedThreadPool() 无界线程池

    无界线程池,意思是不管多少任务提交进来,都直接运行。无界线程池采用了SynchronousQueue,采用这个线程池就没有workQueue容量一说了,只要添加进去的线程就会被拿去用。既然是无界线程池,那线程数肯定没上限,所以以maximumPoolSize为主了,设置为一个近似的无限大Integer.MAX_VALUE。 另外注意一下,单线程线程池和固定大小线程池线程都不会进行自动回收的,也即是说保证提交进来的任务最终都会被处理,但至于什么时候处理,就要看处理能力了。但是无界线程池是设置了回收时间的,由于corePoolSize为0,所以只要60秒没有被用到的线程都会被直接移除。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package MyThread;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.*;
/**
* Created by yuhang on 2016/12/4.
*/
public class MyThread9 {
//维护线程池最小的数量
private static final int CORE_POOL_SIZE = 100;//改完1速度更快了,因为开启线程数量变少,但是相应的系统处理能力就变弱了
//线程池维护的最大数量
private static final int MAX_POOL_SIZE = 100;
//线程池维护线程所允许的空余时间
private static final int KEEP_ALIVE_TIME = 0;
//缓冲队列的大小
private static final int WORK_QUEUE_SIZE = 2000;
static final List<Integer> l = new Vector<>();
//线程池对拒绝任务的处理策略
static final RejectedExecutionHandler Errorhandler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
//r.run();
System.out.println("注释掉 r.run() WORK_QUEUE_SIZE 为100时,会出现丢数据的情况!");
}
}
};
//JDK提倡用Executors三个工厂方法
//单线程线程池
// public static ExecutorService newSingleThreadExecutor() {
// return new FinalizableDelegatedExecutorService
// (new ThreadPoolExecutor(1, 1,
// 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<>()));
// }
//无界限
public static ExecutorService newCachedThreadPool(int corePoolSize) {
return new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>());
}
//固定大小线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
}
static final ThreadPoolExecutor tp =
new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(WORK_QUEUE_SIZE), Errorhandler);
// (ThreadPoolExecutor) MyThread9.newFixedThreadPool(1);
// (ThreadPoolExecutor) MyThread9.newCachedThreadPool(5);
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
for (int i = 0; i < 2000; i++) {
tp.execute(
new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
l.add(1);
}
}
);
System.out.println(i);
}
tp.shutdown();
try {
tp.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() - startTime);
System.out.println(l.size());
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章