线程池
1.自定义线程池
- 自定义拒绝策略
@FunctionalInterface // 拒绝策略 interface RejectPolicy<T> { void reject(BlockingQueue<T> queue, T task); } - 自定义任务队列
class BlockingQueue<T>{ //1.队列 private Deque<T> queue = new ArrayDeque<T>(); //2.锁 ReentrantLock lock = new ReentrantLock(); //3.生产者条件变量 private Condition fullWaitSet = lock.newCondition(); //4.消费者条件变量 private Condition emptyWaitSet = lock.newCondition(); //5.容量 private int capacity; public BlockingQueue(int capacity) { this.capacity = capacity; } //阻塞获取 public T take() { //获取过程加锁 lock.lock(); try{ //如果队列为空,消费者就等待 while(queue.isEmpty()){ try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //然后队列不为空时,就从队列头中获取元素,然后唤醒生产者,可以进行生产 T t = queue.removeFirst(); fullWaitSet.signal(); return t; } //保证无论在任何情况下都能够释放锁 finally { lock.unlock(); } } //带超时阻塞获取,就是在等待的时候加上时间 public T poll(long timeout, TimeUnit timeUnit) { lock.lock(); long nanos = timeUnit.toNanos(timeout); try { while (queue.isEmpty()) { if (nanos <= 0) return null; try { nanos = emptyWaitSet.awaitNanos(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } //阻塞添加 public void put(T task){ lock.lock(); try{ while(queue.size()==capacity){ try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //队列不满的时候添加任务,并且唤醒消费者可以进行消费 queue.addLast(task); emptyWaitSet.signal(); } finally { lock.unlock(); } } //带超时时间阻塞添加 public boolean offer(T task,long timeout,TimeUnit timeUnit) throws InterruptedException { lock.lock(); try{ long nanos = timeUnit.toNanos(timeout); while (queue.size()==capacity){ if(nanos<=0) return false; nanos = fullWaitSet.awaitNanos(timeout); } queue.addLast(task); emptyWaitSet.signal(); return true; } finally { lock.unlock(); } } public int size(){ lock.lock(); try{ return queue.size(); } finally { lock.unlock(); } } // public void tryPut(RejectPolicy<T> rejectPolicy,T task){ lock.lock(); try{ if(queue.size()==capacity) rejectPolicy.reject(this,task); else{ queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } } - 自定义线程池
class ThreadPool { //阻塞队列 private BlockingQueue<Runnable> taskQueue; //线程集合 private HashSet<Worker> workers = new HashSet<Worker>(); //核心线程数 private int coreSize; //获取任务的超时时间 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; //执行任务 public void execute(Runnable task){ synchronized (workers){ if(workers.size()<coreSize){ Worker worker = new Worker(task); workers.add(worker); worker.start(); } else{ taskQueue.tryPut(rejectPolicy,task); } } } public ThreadPool(BlockingQueue<Runnable> taskQueue, int coreSize, long timeout, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy) { this.taskQueue = taskQueue; this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.rejectPolicy = rejectPolicy; } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { while(task!=null||(task = taskQueue.poll(timeout,timeUnit))!=null){ try { task.run(); } catch (Exception e){ e.printStackTrace(); } finally { task = null; } } synchronized (workers){ workers.remove(this); } } } }2. ThreadPoolExecutor
2.1 线程状态
|状态名|高三位|接收新任务|处理阻塞任务队列|说明| |—-|—-|—-|—-|—-| |Running|111|Y|Y|| |Shutdown|000|N|Y|不会接受新任务,但会处理阻塞队列剩余任务| |Stop|001|N|N| 会中断正在执行的任务,并抛弃阻塞队列任务| |Tidying|010|-|-| 任务全执行完毕,活动线程为0即将进入终结| |Terminated|011|-|-|终结状态| 这些信息储存在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值
//c 为旧值, ctlOf 返回结果为新值 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))); // rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们 private static int ctlOf(int rs, int wc) { return rs | wc; }2.2 构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }coreSize: 核心线程数 maximumPoolSize:最大线程数 keepAliveTime: 生存时间 - 针对救急线程 unit: 时间单位 - 针对救急线程 workQueue: 阻塞队列 threadFactory: 线程工厂 - 可以为线程创建时起个好名字 handler:拒绝策略 工作方式:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排 队,直到有空闲的线程。
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线 程来救急
- 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它 著名框架也提供了实现