首先分析内部类:ThreadPoolExecutor$Worker
//Worker对线程和任务做了一个封装,同时它又实现了Runnable接口, //所以Worker类的线程跑的是自身的run方法 private final class Workerextends AbstractQueuedSynchronizer implements Runnable {/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//创建一个Thread对象,它的Runnable对象是当前Worker对象//创建了线程,但是还没启动,在外部start//Executors.DefaultThreadFactorythis.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}} }final void runWorker(Worker w) {Thread wt = Thread.currentThread();//调用pool.execute()时传入任务时,如果addWorker返回为true,表示创建了worker,则任务也放在worker对象中了。//如果addWorker返回为false,则把任务放入队列Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//第二个task是从队列中取得的while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {//任务执行完后,置空task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);} }private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out? retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop }try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }
接着分析ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {//状态变量,保存了workerCount和runState的值//线程池的初始状态是RUNNINGprivate final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static int ctlOf(int rs, int wc) { return rs | wc; }//状态值从小到大排列// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// 默认为 false,当线程池中已经有了 corePoolSize 个线程,即使这些线程不干活,也不会回收。// 但是如果线程池中的线程数量超过了 corePoolSize,则会回收private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//worker数量小于最小线程数,创建一个worker,并启动//如果addWorker返回true,表示创建了一个worker对象,任务也放在worker对象中了。//如果addWorker返回false,则随后把任务放入队列if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果线程池处于运行状态,往队列投任务//workQueue.offer(command)if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);} //如果workQueue.offer(command)返回false呢?//当队列中积压的任务太多时,就会返回false//这时传给addWorker的是falseelse if (!addWorker(command, false))reject(command);}//core决定worker数量以corePoolSize和maximumPoolSize中哪一个值为上限private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop }}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {final ReentrantLock mainLock = this.mainLock;w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}}
线程池接收任务的流程图:
关闭线程池有 shutdown 和 shutdownNow 2种方法:
shutdown 不再接收新任务,但会把队列中的任务执行完,shutdownNow 不会执行队列中的任务。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate(); }public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();// 删除队列中的任务tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks; }