自定义线程池(二)

上一篇已经讲过一些线程池的概念,现在我们来自定义一个线程池,实现的不是一般,就是一个个人理解的线程池实现,仅供参考。

  1. 创建变量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 线程状态,空闲,运行中,阻塞,结束
    private enum WorkerState {FREE, RUNNABLE, BLOCKED,TERMINATED;}
    // 线程池是否被销毁
    private volatile boolean destroy = false;
    // 核心线程数
    private final static int CORE_THREAD_SIZE = 2;
    // 最大线程数
    private final static int MAX_THREAD_SIZE = 10;
    // 任务队列容器,也可以用Queue<Runnable> 遵循 FIFO 规则
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
    // 设置它的边界值为11
    private final static int TASK_QUEUE_MAX_SIZE = 11;
    // 线程容器
    private final static List<Worker> WORKERS = new ArrayList<>();
    1. WorkerState:线程状态,一个线程新创建的时候为FREE,工作的时候为RUNNABLE,处于阻塞状态的时候为BLOCK(意味着当前任务队列没有任务了),而TERMINATED表示当前线程结束
    2. destroy:线程池状态,表示当前线程是否已经销毁,只有执行了shutdown方法才会销毁线程池,并发情况下要保证可见性
    3. TASK_QUEUE_MAX_SIZE:任务阻塞队列的边界值
  2. 编写submit方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    void submit(Runnable runnable) {
    if (destroy) {
    System.out.println("线程池已经销毁了,抛出异常!这里用日志代替");
    return;
    }
    synchronized (TASK_QUEUE) {
    if (WORKERS.size() < CORE_THREAD_SIZE) {
    System.out.println("核心线程数还没有满,直接创建线程");
    createWorkerTask();
    } else if (TASK_QUEUE.size() < TASK_QUEUE_MAX_SIZE) {
    System.out.println("核心线程数已满,但是队列没有满,把任务加到队列");
    } else if (WORKERS.size() < MAX_THREAD_SIZE) {
    System.out.println("核心线程数,队列满了,但是没有达到最大线程数,我们接着创建线程");
    createWorkerTask();
    } else if (WORKERS.size() >= MAX_THREAD_SIZE) {
    System.out.println("达到了最大线程数,应该执行拒绝策略,这里我们就直接返回了");
    return;
    } else {
    System.out.println("情况未知");
    }
    //加入任务队列
    TASK_QUEUE.addLast(runnable);
    // 唤醒所有线程
    TASK_QUEUE.notifyAll();
    }
    }

    没有一个任务,我们都是通过submit交给线程池去处理的,先判断是否达到核心线程数,没有的话直接创建,如果核心线程数满了,就往队列里加,队列也满了就新建线程,如果达到最大线程数,就拒绝,这里有一点改变就是不管哪一种,我们都把人物放到任务队列中,让线程去任务队列里面取。

  3. 创建线程方法

    1
    2
    3
    4
    5
    6
    private void createWorkerTask() {
    Worker worker = new Worker();
    worker.workerState = WorkerState.FREE;
    WORKERS.add(worker);
    worker.start();
    }

    线程创建之初,为空闲状态,并添加进线程集合中

  4. 创建线程类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    public static class Worker extends Thread {
    private WorkerState workerState;
    // 线程编号
    private static int threadInitNumber;
    // 生成线程名
    private static synchronized String nextThreadName() {
    return "Thread-" + (++threadInitNumber);
    }
    Worker() {
    nextThreadName();
    }

    @Override
    public void run() {
    System.out.println("开启了一个新的线程------>" + threadInitNumber);
    Runnable targer;
    OUTER:
    while (this.workerState != WorkerState.TERMINATED) {
    synchronized (TASK_QUEUE) {
    while (this.workerState == WorkerState.FREE && TASK_QUEUE.isEmpty()) {
    try {
    this.workerState = WorkerState.BLOCKED;
    TASK_QUEUE.wait();
    } catch (InterruptedException e) {
    break OUTER;
    }
    }
    targer = TASK_QUEUE.removeFirst();
    System.out.println("处理任务");
    }
    if (targer != null) {
    this.workerState = WorkerState.RUNNABLE;
    targer.run();
    this.workerState = WorkerState.FREE;
    }
    }
    }

    void close() {
    this.workerState = WorkerState.TERMINATED;
    this.interrupt();
    }

    }

    为了方便观看,我们给线程指定编号,如果当前线程没有关闭的话,先去判断队列是否为空且本身的状态是空闲的话,我们就让他wait,等新来的任务再去唤醒,如果获取到了我们就处理这个任务,并把他从队列的头部拿出来

  5. 关闭线程池方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    void shutdown() {
    int size = WORKERS.size();
    while (size > 0) {
    for (Worker worker : WORKERS) {
    if (worker.workerState == WorkerState.BLOCKED) {
    worker.close();
    size--;
    }
    }
    }
    this.destroy = true;
    TASK_QUEUE.clear();
    WORKERS.clear();
    System.out.println("线程池已经关闭!");
    }

    等所有线程的任务都完成了以后才关闭线程,同时把线程池状态标记为销毁,并清空任务队列和线程集合。

  6. 使用run()来控制释放线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @Override
    public void run() {
    while (!destroy) {
    try {
    Thread.sleep(3000);
    synchronized (WORKERS) {
    Iterator<Worker> iterator = WORKERS.iterator();
    while (iterator.hasNext()) {
    Worker worker = iterator.next();
    if (WORKERS.size() > CORE_THREAD_SIZE &&
    TASK_QUEUE.size() < TASK_QUEUE_MAX_SIZE) {
    if (worker.workerState != WorkerState.RUNNABLE && worker.workerState != WorkerState.TERMINATED) {
    worker.close();
    iterator.remove();
    System.out.println("[回收了一个线程]");
    }
    }
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    每隔3秒去遍历一下所有的线程,如果线程数大于核心线程数,且任务队列不满的话,就删除线程,知道和核心线程数一样

  7. 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public static void main(String[] args) throws InterruptedException {
    DemoThreadPool demoThreadPool = new DemoThreadPool();
    IntStream.range(0, 5).forEach(i ->
    demoThreadPool.submit(() -> {
    System.out.printf("[线程] - [%s] 开始工作...\n",
    Thread.currentThread().getName());
    try {
    Thread.sleep(2_000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.printf("[线程] - [%s] 工作完毕...\n",
    Thread.currentThread().getName());
    })
    );

    Thread.sleep(3000);

    IntStream.range(0, 30).forEach(i ->
    demoThreadPool.submit(() -> {
    System.out.printf("[线程] - [%s] 开始工作...\n",
    Thread.currentThread().getName());
    try {
    Thread.sleep(2_000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.printf("[线程] - [%s] 工作完毕...\n",
    Thread.currentThread().getName());
    })
    );
    // demoThreadPool.shutdown();
    }

    为了观察状态,我们先一下子创建5个任务,这个时候他会先创建2个线程,然后把3个放到队列里,因为没有满,所以这两个线程就可以应付,但是当一下30个的时候,会发生拒绝,然后过了峰值以后,多余的线程就会销毁。

    实现的可能不好,但是我理解的就是这样,源码在https://github.com/colin-xun/threadpool上,欢迎给我点:star: