本文共 6331 字,大约阅读时间需要 21 分钟。
线程池的源码及原理[JDK1.6实现]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | // 流程就是:没达到corePoolSize,创建worker执行,达到corePoolSize加入workQueue // workQueue满了且在maximumPoolSize下,创建新worker,达到maximumPoolSize,执行reject public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 1:poolSize达到corePoolSize,执行3把任务加入workQueue // 2:poolSize没达到,执行addIfUnderCorePoolSize()在corePoolSize内创建新worker立即执行任务 // 如果达到corePoolSize,则同上执行3 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 3:workQueue满了,执行5 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) { // 4:如果线程池关闭,执行拒绝策略 // 如果poolSize==0,新启动一个线程执行队列内任务 ensureQueuedTaskHandled(command); } // 5:在maximumPoolSize内创建新worker立即执行任务 // 如果达到maximumPoolSize,执行6拒绝策略 } else if (!addIfUnderMaximumPoolSize(command)) // 6:拒绝策略 reject(command); // is shutdown or saturated } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | public void run() { try { Runnable task = firstTask; firstTask = null; // getTask()是从workQueue里面阻塞获取任务,如果getTask()返回null则终结本线程 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { // 走到这里代表这个worker或者说这个线程由于线程池关闭或超过aliveTime需要关闭了 workerDone( this); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // 在poolSize大于corePoolSize或允许核心线程超时时 // 阻塞超时获取有可能获取到null,此时worker线程销毁 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; // 这里是是否运行worker线程销毁的判断 if ( workerCanExit()) { if (runState >= SHUTDOWN) // STOP或TERMINATED状态,终止空闲worker interruptIdleWorkers(); return null; // 这里返回null,代表工作线程worker销毁 } // 其他:retry,继续循环 } catch (InterruptedException ie) { // On interruption, re-check runState } } } |
taskCount:线程池需要执行的任务数量。completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不+ getActiveCount:获取活动的线程数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | // 加入的任务外部封装了ScheduledFutureTask,继承于FutureTask,因此也可以获取任务结果 private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { // 省略部分代码 // 周期性运行,执行完成就把任务加入到delay队列中 private void runPeriodic() { // 这里重置线程池状态 boolean ok = ScheduledFutureTask. super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isTerminating()))) { long p = period; if (p > 0) time += p; else time = now() - p; // 重复把任务加入到线程池delay队列中 ScheduledThreadPoolExecutor. super.getQueue().add( this); } else if (down) interruptIdleWorkers(); } // 线程池调用的run方法 public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask. super.run(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public class ExecutorCompletionService<V> implements CompletionService<V> { // 部分代码省略 // 外部Future的封装类 private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } // 这里把Future加入到 completionQueue protected void done() { completionQueue.add(task); } private final Future<V> task; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); // 对f外层又包了一层 QueueingFuture executor.execute( new QueueingFuture(f)); return f; } // 外部则可通过 completionQueue 来获取已完成的任务Future public Future<V> take() throws InterruptedException { return completionQueue.take(); } } |
转载地址:http://txoso.baihongyu.com/