上一章讲了EventExecutorGroup的整体结构和原理,这一章我们来探究一下它的具体实现。
EventExecutorGroup和EventExecutor接口
io.netty.util.concurrent.EventExecutorGroup
java.util.concurrent.ScheduledExecutorService
EventExecutorGroup继承了ScheduledExecutorService接口,它自己定义了如下的新方法
方法 | 说明 |
EventExecutor next() | 取出一个EventExecutor, 这个方法要实现派发任务的策略。 |
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); | 优雅地关闭这个executor, 一旦这个方法被调用,isShuttingDown()方法总是总是返回true。和 shutdown方法不同,这个方法需要确保在关闭的平静期(由quietPeriod参数决定)没有新的任务被提交,如果平静期有新任务提交,它会接受这个任务,同时中止关闭动作,等任务执行完毕后从新开始关闭流程。 |
Future<?> shutdownGracefully() | shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)快捷调用方式。 |
boolean isShuttingDown() | 检查是否已经调用了shutdownGracefully或shutdown方法。 |
io.netty.util.concurrent.EventExecutor implement EventExecutorGroup
EventExecutor定义的接口如下
方法 | 说明 |
boolean inEventLoop() | 如果当前线程是这个Executor返回true |
boolean inEventLoop(Thread thread) | 如果thread是这个Executor的线程返回true |
EventExecutorGroup parent() | 返回持有这个Executor的EventExecutorGroup |
<V> Promise<V> newPromise() | 创建一个新的Promise实例 |
<V> ProgressivePromise<V> newProgressivePromise() | 创建一个新的ProgressivePromise实例 |
<V> Future<V> newSucceededFuture(V result); | 创建一个标记为success的Future实例,Future#isSuccess()返回true |
<V> Future<V> newFailedFuture(Throwable cause) | 创建一个标记为failed的Future实例,Future#isSuccess()返回false |
抽象实现AbstractEventExecutorGroup和AbstractEventExecutor
io.netty.util.concurrent.AbstractEventExecutorGroup implement EventExecutorGroup
AbstractEventExecutorGroup实现了EventExecutorGroup接口,它实现方法的形式为:
XXX(){
next().XXX()
}
如:execute方法的实现为
public void execute(Runnable command) {
next().execute(command);
}
这里实现了EventExecutorGroup派发任务的方式,使用next方法取出一EventExecutor, 然后把任务提交给这个executor。其他提交认任务的方法实submit, schedule, scheduleAtFixedRate, scheduleWithFixedDelay, invokeAll, invokeAny都和这个类似。
io.netty.util.concurrent.AbstractEventExecutor extends AbstractExecutorService implements EventExecutor
形如newXXX的方法,直接new一个JDK提供的类型的实例返回, 如:
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}
sumbit方法是调用AbstractExecutorService的实现。
不支持schedule, scheduleAtFixedRate, scheduleWithFixedDelay方法,这几个方法都会抛出UnsupportedOperationException异常。
多线程实现MultithreadEventExecutorGroup和SingleThreadEventExecutor
io.netty.util.concurrent.MultithreadEventExecutorGroup extends AbstractEventExecutorGroup
MultithreadEventExecutorGroup 主要实现了一下两个方面的功能:
- EventExecutor管理: 创建, 结束SingleThreadEventExecutor,EventExecutor的数据是固定的,由传入的参数决定。
- 任务派发策略: 实现了EventExecutor选择器,next方法使选择器选中一个Executor。
这个类的核心功能都在它的构造方法中实现, 构造方法有三个参数:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args).
nThreads: 线程数,即SingleThreadEventExecutor的数量,
threadFactory: 线程工程,传递给SingleThreadEventExecutor实例,SingleThreadEventExecutor使用它创建一个工作线程。
args: 传递给SingleThreadEventExecutor工作线程的参数。
构造方法主要干了两件事:
1. 创建SingleThreadEventExecutor
children = new SingleThreadEventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(threadFactory, args);
}
它把创建的SingleThreadEventExecutor实例放在children属性中维护。 newChild是个抽象方法,需要子类实现。
2. 创建选择器
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
MultithreadEventExecutorGroup内部实现两种类型的选择器,PowerOfTwoEventExecutorChooser--chooserA, GenericEventExecutorChooser--chooserB, 当线程数是2^n时使用chooserA, 否则使用chooserB。选择器的实现使用了一点小技巧,从本质上讲,这两种选择器都是使用取模轮询的方式选择下一个executor, 不同的是当线程数(children的长度)为2^n时可以把取模运算优化成位运算,性能比位运算要好一些。下面是两个选择器的算法:
chooserA: children[childIndex.getAndIncrement() & children.length - 1], 当children.length == 2^n时,它等价于 children[Math.abs(childIndex.getAndIncrement() % children.length)
chooserB: children[Math.abs(childIndex.getAndIncrement() % children.length)
这里我们可以得出结论, nThreads尽量设置成2^n(2, 4, 8, 16, 32 ....), 这样性能会好一些。
io.netty.util.concurrent.SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor
派生关系
SingleThreadEventExecutor
AbstractScheduledEventExecutor
AbstractEventExecutor
SingleThreadEventExecutor实现了一个单线程的Executor, 它使用外部传进来的ThreadFactory实例创建一个唯一的线程,executor方法把任务放进taskQueue中,线程消费taskQueue中排队的任务。这个executor不仅要执行由executor提交的任务,还要执行由schedule方法提交定时任务和由invokeAll, invokeAny提交的批量任务。
除了任务呢排队,这类还实现了一个重要的功能--gracefulShutdown, 优雅地关闭。
下面来详细分析这些功能的实现。
状态:
ST_NOT_STARTED = 1: 初始状态,SingleThreadEventExecutor 实例被创建时处于这个状态,这个时候只是创建了一个线程,这个线程还没有运行。
ST_STARTED = 2: 运行状态,ST_NOT_STARTED时,提交的第一个任务会把它变成这个状态,线程已经开发运行。
ST_SHUTTING_DOWN = 3: 正在执行关闭操作。线程主循环run方法返回或抛出异常,或调用shutdownGracefully 都会变成这个状态。
ST_SHUTDOWN = 4: 已经关闭。调用shutdown会变成这个状态。
ST_TERMINATED = 5: 已经结束。这个是最终状态,ST_SHUTTING_DOWN和ST_SHUTDOWN 状态的过程执行完毕后会变成这个状态。
状态判定方法
是否处于SHUTTING_DOWN状态
public boolean isShuttingDown() {return state >= ST_SHUTTING_DOWN;}
是否处于SHUTDOWN状态
public boolean isShutDown() {return state >= ST_SHUT_DOWN;}
实时任务排队:
public方法execute, 是提供给用户提交实时任务的方法,它的调用栈如下:
execute
addTask
offerTask
taskQueue.offer
execute最终会调用taskQueue的offer方法把任务放到队列中排队,在此之前,如果检测到处于SHUTDOWN状态,就拒绝这个任务,或offer失败也会拒绝任务。
定时任务排队:
用户调用schedule把定时任务到scheduledTaskQueue队列中,这个队列是PriorityQueue类型的实例,他是一个优先级队列。在线程的主循环run中,会调用takeTask,taskTask会优先调用peekScheduledTask,看一看scheduledTaskQueue有没有定时任务,如果有就尝试把所有已经到时间的定时任务放到taskQueue中排队。
批量任务排队:
批量任务排队比较简单,只是简单地对invokeAll或invokeAny的tasks参数中的所有任务调用一次execute。
取出任务:
takeTask的主要功能是从taskQueue中取出任务,同时它还确保到期的定时任务能够及时地进入taskQueue中排队。这是一个比较重要的方法,我们来详细分析它的实现:
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask(); // 先看看优先级队列中是否存在定时任务
if (scheduledTask == null) {
// 如果没有定时任务,直接从taskQueue中取出一个任务返回
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
// 运行到这里表示有定时任务
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
// 没有到期的定时任务,
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
return null;
}
}
if (task == null) {
// 有到期的定时任务,把所有优先级队列中到期的定时任务放入taskQueue中排队
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) { // 在有定时任务但taskQueue为空的时候,for循环会一直空转,直到有定时任务到期才会跳出
return task;
}
}
}
优雅地关闭:
优雅地关闭是这个类的重要的功能,所谓优雅是指在正在关闭之前要确保已经在taskQueue中排队的任务都能被执行,在关闭过程中,如果用户提交了一个任务,是否提交成功要有明确的反馈,如果一个任务被成功提交,就要确保他最终一定会被执行。
线程的主循环run方法返回的时候,就会触发优雅关闭的过程。run方法返回肯由多种原因引起:用户主动调用了shutdown或shutdownGracefully,run方法抛出异常。执行优雅关闭的过程在confirmShutdown方法中实现,执行这个过程的前提是:
确保当前处于SHUTTINGDOWN状态即状态值>=ST_SHUTTING_DOWN
if (!isShuttingDown()) {
return false;
}
确保这个方法在eventLoop线程中执行
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
然后才是优雅关闭的过程:
清除掉定时任务
cancelScheduledTasks();
如果是第一次尝试关闭,设置gracefulShutdownStartTime我当前时间
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
把已在队列中排队的任务都执行掉。
if (runAllTasks() || runShutdownHooks()) {
检查当前状态,如果是关闭状态:>= ST_SHUTDOWN,已经关闭完成。
if (isShutdown()) {
return true;
}
如果gracefulShutdownQuietPeriod==0表示, 关闭过程没有安静期,现在可以立即结束。
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
执行到这里,表示关闭过程还没结束,如果当前状态是SHUTTINGDOWN向taskQueue中添加一个WAKEUP_TASK, 唤醒在taskQueue阻塞的线程。
wakeup(true);
return false;
}
执行到这里表示,taskQueue已经是空的了,同时执行完了所有的的shutdown hook回调。如果现在已经是SHUTDOWN状态,或者这个关闭过程使用的时间已经超时,表示关闭过程已经完成了。
final long nanoTime = ScheduledFutureTask.nanoTime();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
如果这个方法本次执行的时间没有超过安静时间(gracefulShutdownQuietPeriod, 它的值是在调用shutdownGracefully时设置), 100ms之后从新执行关闭过程。
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
return true;
业务线程的默认实现
public class DefaultEventExecutorGroup extends MultithreadEventExecutorGroup
final class DefaultEventExecutor extends SingleThreadEventExecutor
DefaultEventExecutorGroup没有对MultithreadEventExecutorGroup做任何扩展。
DefaultEventExecutor只是实现了run方法
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
这个方法的实现表明,run方法只有在以下3中情况下跳出:
- 用户主动调用shutdown。
- 用户主动调用shutdownGracefully。
- 抛出异常。