线程池

1.自定义线程池

  1. 自定义拒绝策略
    @FunctionalInterface // 拒绝策略
    interface RejectPolicy<T> {
     void reject(BlockingQueue<T> queue, T task);
    }
    
  2. 自定义任务队列
    class BlockingQueue<T>{
     //1.队列
     private Deque<T> queue = new ArrayDeque<T>();
     //2.锁
     ReentrantLock lock = new ReentrantLock();
     //3.生产者条件变量
     private Condition fullWaitSet = lock.newCondition();
     //4.消费者条件变量
     private  Condition emptyWaitSet = lock.newCondition();
     //5.容量
     private int capacity;
    
     public BlockingQueue(int capacity) {
         this.capacity = capacity;
     }
     //阻塞获取
     public T take() {
       //获取过程加锁
         lock.lock();
         try{
           //如果队列为空,消费者就等待
             while(queue.isEmpty()){
                 try {
                     emptyWaitSet.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             //然后队列不为空时,就从队列头中获取元素,然后唤醒生产者,可以进行生产
             T t = queue.removeFirst();
             fullWaitSet.signal();
             return t;
         }
         //保证无论在任何情况下都能够释放锁
         finally {
             lock.unlock();
         }
     }
    
     //带超时阻塞获取,就是在等待的时候加上时间
     public T poll(long timeout, TimeUnit timeUnit)  {
         lock.lock();
         long nanos = timeUnit.toNanos(timeout);
         try {
             while (queue.isEmpty()) {
                 if (nanos <= 0) return null;
                 try {
                     nanos = emptyWaitSet.awaitNanos(timeout);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             T t = queue.removeFirst();
             fullWaitSet.signal();
             return t;
         } finally {
             lock.unlock();
         }
    
     }
    
    
     //阻塞添加
     public void put(T task){
         lock.lock();
         try{
             while(queue.size()==capacity){
                 try {
                     fullWaitSet.await();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
             //队列不满的时候添加任务,并且唤醒消费者可以进行消费
             queue.addLast(task);
             emptyWaitSet.signal();
         }
         finally {
             lock.unlock();
         }
     }
     //带超时时间阻塞添加
     public boolean offer(T task,long timeout,TimeUnit timeUnit) throws InterruptedException {
         lock.lock();
         try{
             long nanos = timeUnit.toNanos(timeout);
             while (queue.size()==capacity){
                 if(nanos<=0) return false;
                 nanos = fullWaitSet.awaitNanos(timeout);
             }
             queue.addLast(task);
             emptyWaitSet.signal();
             return true;
         }
         finally {
             lock.unlock();
         }
     }
     public int size(){
         lock.lock();
         try{
             return queue.size();
         }
         finally {
             lock.unlock();
         }
     }
    
     //
     public void tryPut(RejectPolicy<T> rejectPolicy,T task){
         lock.lock();
         try{
             if(queue.size()==capacity) rejectPolicy.reject(this,task);
             else{
                 queue.addLast(task);
                 emptyWaitSet.signal();
             }
         }
         finally {
             lock.unlock();
         }
     }
    }
    
  3. 自定义线程池
    class ThreadPool {
     //阻塞队列
     private BlockingQueue<Runnable> taskQueue;
     //线程集合
     private HashSet<Worker> workers = new HashSet<Worker>();
     //核心线程数
     private int coreSize;
     //获取任务的超时时间
     private long timeout;
     private TimeUnit timeUnit;
     private RejectPolicy<Runnable> rejectPolicy;
    
     //执行任务
     public void execute(Runnable task){
         synchronized (workers){
             if(workers.size()<coreSize){
                 Worker worker = new Worker(task);
                 workers.add(worker);
                 worker.start();
             }
             else{
                 taskQueue.tryPut(rejectPolicy,task);
             }
         }
     }
    
     public ThreadPool(BlockingQueue<Runnable> taskQueue, int coreSize, long timeout,
                       TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy) {
         this.taskQueue = taskQueue;
         this.coreSize = coreSize;
         this.timeout = timeout;
         this.timeUnit = timeUnit;
         this.rejectPolicy = rejectPolicy;
     }
    
     class Worker extends Thread{
         private Runnable task;
         public Worker(Runnable task) {
             this.task = task;
         }
    
         @Override
         public void run() {
             while(task!=null||(task = taskQueue.poll(timeout,timeUnit))!=null){
                 try {
                     task.run();
                 }
                 catch (Exception e){
                     e.printStackTrace();
                 }
                 finally {
                     task = null;
                 }
             }
             synchronized (workers){
                 workers.remove(this);
             }
         }
     }
    }
    

    2. ThreadPoolExecutor

    2.1 线程状态

    |状态名|高三位|接收新任务|处理阻塞任务队列|说明| |—-|—-|—-|—-|—-| |Running|111|Y|Y|| |Shutdown|000|N|Y|不会接受新任务,但会处理阻塞队列剩余任务| |Stop|001|N|N| 会中断正在执行的任务,并抛弃阻塞队列任务| |Tidying|010|-|-| 任务全执行完毕,活动线程为0即将进入终结| |Terminated|011|-|-|终结状态| 这些信息储存在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值

    //c 为旧值, ctlOf 返回结果为新值
    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
    // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    

    2.2 构造方法

    public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue) {
       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
      }
    

    coreSize: 核心线程数 maximumPoolSize:最大线程数 keepAliveTime: 生存时间 - 针对救急线程 unit: 时间单位 - 针对救急线程 workQueue: 阻塞队列 threadFactory: 线程工厂 - 可以为线程创建时起个好名字 handler:拒绝策略 工作方式:

  4. 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
  5. 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排 队,直到有空闲的线程。
  6. 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急
  7. 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它 著名框架也提供了实现