上一篇已经讲过一些线程池的概念,现在我们来自定义一个线程池,实现的不是一般,就是一个个人理解的线程池实现,仅供参考。
创建变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 线程状态,空闲,运行中,阻塞,结束
private enum WorkerState {FREE, RUNNABLE, BLOCKED,TERMINATED;}
// 线程池是否被销毁
private volatile boolean destroy = false;
// 核心线程数
private final static int CORE_THREAD_SIZE = 2;
// 最大线程数
private final static int MAX_THREAD_SIZE = 10;
// 任务队列容器,也可以用Queue<Runnable> 遵循 FIFO 规则
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
// 设置它的边界值为11
private final static int TASK_QUEUE_MAX_SIZE = 11;
// 线程容器
private final static List<Worker> WORKERS = new ArrayList<>();- WorkerState:线程状态,一个线程新创建的时候为FREE,工作的时候为RUNNABLE,处于阻塞状态的时候为BLOCK(意味着当前任务队列没有任务了),而TERMINATED表示当前线程结束
- destroy:线程池状态,表示当前线程是否已经销毁,只有执行了shutdown方法才会销毁线程池,并发情况下要保证可见性
- TASK_QUEUE_MAX_SIZE:任务阻塞队列的边界值
编写submit方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26void submit(Runnable runnable) {
if (destroy) {
System.out.println("线程池已经销毁了,抛出异常!这里用日志代替");
return;
}
synchronized (TASK_QUEUE) {
if (WORKERS.size() < CORE_THREAD_SIZE) {
System.out.println("核心线程数还没有满,直接创建线程");
createWorkerTask();
} else if (TASK_QUEUE.size() < TASK_QUEUE_MAX_SIZE) {
System.out.println("核心线程数已满,但是队列没有满,把任务加到队列");
} else if (WORKERS.size() < MAX_THREAD_SIZE) {
System.out.println("核心线程数,队列满了,但是没有达到最大线程数,我们接着创建线程");
createWorkerTask();
} else if (WORKERS.size() >= MAX_THREAD_SIZE) {
System.out.println("达到了最大线程数,应该执行拒绝策略,这里我们就直接返回了");
return;
} else {
System.out.println("情况未知");
}
//加入任务队列
TASK_QUEUE.addLast(runnable);
// 唤醒所有线程
TASK_QUEUE.notifyAll();
}
}没有一个任务,我们都是通过submit交给线程池去处理的,先判断是否达到核心线程数,没有的话直接创建,如果核心线程数满了,就往队列里加,队列也满了就新建线程,如果达到最大线程数,就拒绝,这里有一点改变就是不管哪一种,我们都把人物放到任务队列中,让线程去任务队列里面取。
创建线程方法
1
2
3
4
5
6private void createWorkerTask() {
Worker worker = new Worker();
worker.workerState = WorkerState.FREE;
WORKERS.add(worker);
worker.start();
}线程创建之初,为空闲状态,并添加进线程集合中
创建线程类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public static class Worker extends Thread {
private WorkerState workerState;
// 线程编号
private static int threadInitNumber;
// 生成线程名
private static synchronized String nextThreadName() {
return "Thread-" + (++threadInitNumber);
}
Worker() {
nextThreadName();
}
public void run() {
System.out.println("开启了一个新的线程------>" + threadInitNumber);
Runnable targer;
OUTER:
while (this.workerState != WorkerState.TERMINATED) {
synchronized (TASK_QUEUE) {
while (this.workerState == WorkerState.FREE && TASK_QUEUE.isEmpty()) {
try {
this.workerState = WorkerState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
break OUTER;
}
}
targer = TASK_QUEUE.removeFirst();
System.out.println("处理任务");
}
if (targer != null) {
this.workerState = WorkerState.RUNNABLE;
targer.run();
this.workerState = WorkerState.FREE;
}
}
}
void close() {
this.workerState = WorkerState.TERMINATED;
this.interrupt();
}
}为了方便观看,我们给线程指定编号,如果当前线程没有关闭的话,先去判断队列是否为空且本身的状态是空闲的话,我们就让他wait,等新来的任务再去唤醒,如果获取到了我们就处理这个任务,并把他从队列的头部拿出来
关闭线程池方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15void shutdown() {
int size = WORKERS.size();
while (size > 0) {
for (Worker worker : WORKERS) {
if (worker.workerState == WorkerState.BLOCKED) {
worker.close();
size--;
}
}
}
this.destroy = true;
TASK_QUEUE.clear();
WORKERS.clear();
System.out.println("线程池已经关闭!");
}等所有线程的任务都完成了以后才关闭线程,同时把线程池状态标记为销毁,并清空任务队列和线程集合。
使用run()来控制释放线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void run() {
while (!destroy) {
try {
Thread.sleep(3000);
synchronized (WORKERS) {
Iterator<Worker> iterator = WORKERS.iterator();
while (iterator.hasNext()) {
Worker worker = iterator.next();
if (WORKERS.size() > CORE_THREAD_SIZE &&
TASK_QUEUE.size() < TASK_QUEUE_MAX_SIZE) {
if (worker.workerState != WorkerState.RUNNABLE && worker.workerState != WorkerState.TERMINATED) {
worker.close();
iterator.remove();
System.out.println("[回收了一个线程]");
}
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}每隔3秒去遍历一下所有的线程,如果线程数大于核心线程数,且任务队列不满的话,就删除线程,知道和核心线程数一样
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public static void main(String[] args) throws InterruptedException {
DemoThreadPool demoThreadPool = new DemoThreadPool();
IntStream.range(0, 5).forEach(i ->
demoThreadPool.submit(() -> {
System.out.printf("[线程] - [%s] 开始工作...\n",
Thread.currentThread().getName());
try {
Thread.sleep(2_000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("[线程] - [%s] 工作完毕...\n",
Thread.currentThread().getName());
})
);
Thread.sleep(3000);
IntStream.range(0, 30).forEach(i ->
demoThreadPool.submit(() -> {
System.out.printf("[线程] - [%s] 开始工作...\n",
Thread.currentThread().getName());
try {
Thread.sleep(2_000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("[线程] - [%s] 工作完毕...\n",
Thread.currentThread().getName());
})
);
// demoThreadPool.shutdown();
}为了观察状态,我们先一下子创建5个任务,这个时候他会先创建2个线程,然后把3个放到队列里,因为没有满,所以这两个线程就可以应付,但是当一下30个的时候,会发生拒绝,然后过了峰值以后,多余的线程就会销毁。
实现的可能不好,但是我理解的就是这样,源码在https://github.com/colin-xun/threadpool上,欢迎给我点:star: