前言
tigase是一个高性能的服务器,其实个人认为作为即时通讯的服务器,高性能主要体现在他对IO复用,和多线程的使用上,今天来学习一下他的IO的线程处理模型的源码,并记录一下他优秀的设计。
概述
tigase是使用的NIO作为自己的IO模型。IOService是实现了Callable接口的并持有SocketChannel对象的一个抽象的IO封装对象。
tigase的IO处理线程模型的核心类是 SocketThread,此类提供了两个核心的静态方法:addSocketService(IOService<?> s) 和 removeSocketService(IOService<Object> s) 分别是表示把IOService对象添加进线程模型中和把IOService对象移除线程模型中。其他IO监听器类监听到IO的xmppStreamOpened()发生之后,调用addSocketService(IOService<?> s)方法将数据添加到SocketThread之后,就开始了整个多线程处理整个IO复用的流程。
IO处理流程设计
tiagase IO处理流程图
ConnectionOpenThread作为整个tigase的IO监听连接的线程,tigase服务启动之后就会启动ConnectionOpenThread。启动完ConnectionOpenThread流程如下:
- ConnectionOpenThread首先会初始化内部的Selector用于对客户端的连接的监听。(当然ConnectionOpenThread也会做一些限流的控制,超过连接数就拒绝新连接等)对应图中1步
- 接着ConnectionOpenThread就会循环监听就绪的SelectKey,拿到SocketChannel传递给ConnectionOpenListener。对应图中2、3、4步
- ConnectionOpenListener监听器就会创建IOService对象,并设置一些参数,包括SSL容器,和其他的一些监听器。对应图中5、6步
- ConnectionManager(连接管理器)启动IOService对象,添加连接超时需要执行的Task,并将IOService对象通过SocketThread.addSocketService(IOService<?> s)添加到IO复用的线程模型中,自此进入了IO处理的线程的逻辑。对应图中7、8、9步
- SocketThread.addSocketService(IOService<?> s)通过哈希和取模的算法,将数据负载均衡到不同的SocketThread的waitting的跳表集合中进行数据的缓冲。SocketThread循环从waitting跳表集合中拿到IOService并根据读写事件类型,将IOService注册到SocketThread自身的Selector对象上。对应图中10步
- SocketThread循序监听Selector就绪的SelectKey,并拿到IOService对象,并将IOService添加到forCompletion跳表集合中进行缓冲。对应图中11步
- SocketThread循环从forCompletion跳表集合中拿到IOService对象,丢到completionService线程池进行执行IOService的call()方法。对应图中12步
- ResultsListener 结果监听器线程循环监听completionService执行结束的IOService对象的状态,如果IOService状态是isConnected就继续调用SocketThread.addSocketService(IOService<?> s)添加到SocketThread进行重复调用每个IOService自身的call()方法。自此整个IO的循环处理流程基本结束。对应图中13步
SocketThread的线程模型
tigase的线程模型和Netty的主从React的模式有点像,ConnectionOpenThread主要用来处理客户端的连接建立,建立之后创建将SocketChannel封装到IOService抽象对象中,通过IOService.hashCode() % SocketThread.length 算法负载均衡到不同的ScoketThread中(因为是跟IOService计算的hash,保证相同的IO对象会添加到相同的线程中去),通过SocketChannel的一列缓冲操作,将就绪的IOServices丢到CompletionService线程池中去,线程池来执行IOService.call()方法对数据进行读写的各种操作。call()中加了读写锁,这里效率略低于Netty的设计,Netty是一个线程基于责任链的形式一个线程执行到底的无锁设计,这里ResultsListener可以将IOService再次丢入线程池中供不同的线程继续调用。
SocketThread类数据结构设计源码分析
数据结构
class SocketThread implements Runnable{//线程池private static CompletionService<IOService<?>> completionService = null;//读线程数组private static SocketThread[] socketReadThread = null;//写线程数组private static SocketThread[] socketWriteThread = null;
}
SocketThread线程通过持有两个static类型的读写线程组和CompletionService的线程池组成他的独特的线程模型,通过读写线程分组提高代码的效率,并更好地体现了设计的单一职责。CompletionService的线程池主要的作用就是解耦执行和结果的获取。采用static的类变量,就只有初始化一次。在类加载完成之后,线程处理模型就初始化好了。
初始化相关代码
class SocketThread implements Runnable{ //默认每个核心的线程数public static final int DEF_MAX_THREADS_PER_CPU = 8;//获取系统的核心数private static int cpus = Runtime.getRuntime().availableProcessors();static {if (socketReadThread == null) {//根据CPU核数计算默认线程数int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1;executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());completionService = new ExecutorCompletionService<IOService<?>>(executor);socketReadThread = new SocketThread[nThreads];socketWriteThread = new SocketThread[nThreads];//读线程初始化for (int i = 0; i < socketReadThread.length; i++) {socketReadThread[i] = new SocketThread("socketReadThread-" + i);socketReadThread[i].reading = true;Thread thrd = new Thread(socketReadThread[i]);thrd.setName("socketReadThread-" + i);thrd.start();}...//写线程初始化for (int i = 0; i < socketWriteThread.length; i++) {socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);socketWriteThread[i].writing = true;Thread thrd = new Thread(socketWriteThread[i]);thrd.setName("socketWriteThread-" + i);thrd.start();}...} }
}
线程模型初始化很简单,就是通过一个static代码块,初始化相关的读写线程组和处理IOService的call()方法调用的线程池。这里稍微核心一点的我想说可能就是计算线程池的公式,这里对tiagse的计算公式进行化简可得 nThread = CPU 核数 * 4 + 1,而对比《java并发编程实战》一书的第八章的线程池的最优计算公式:线程数 = CPU 核数 * CPU利用率 * (1 + 等待时间/计算时间), 朝着tigase公式的形式化简一下可得:线程数 = CPU 核数 * CPU利用率 + CPU 核数 *(CPU利用率*等待时间/计算时间)),由于tigase是IO密集型系统,所以我大胆假设 CPU 核数 * CPU利用率 ≈1,所以以上两个公式可得出:CPU利用率*等待时间/计算时间 = 4.当然这这只是个人的一些假设,用于复习一下线程池计算公式。实际数据肯定是tigase经过大量测试优化之后的最优数据。
SocketThread成员变量数据结构设计源码
class SocketThread implements Runnable{//selector对象private Selector clientsSel = null;// 控制selector为空的一个计数器private int empty_selections = 0;//“待完成”的IOService的缓冲跳表集合private ConcurrentSkipListSet<IOService<?>> forCompletion = new ConcurrentSkipListSet<IOService<?>>(new IOServiceComparator());//读状态private boolean reading = false;//写状态private boolean writing = false;//停止状态private boolean stopping = false;//等待的IOService的缓冲跳表集合private ConcurrentSkipListSet<IOService<?>> waiting = new ConcurrentSkipListSet<IOService<?>>(new IOServiceComparator());
}
- clientsSel对象: 用于监听对象的就绪状态。
- empty_selections:作为一个计数器,作用是统计 “Selector中就绪的Key为0 且 waiting跳表中的IOService为0” 的个数,当个数大于默认的10次,就重新创建Selector,代码的注释写的是解决两个java的BUG,这里就不做展开。
- forCompletion:待完成的IOService的跳表缓冲集合
- reading: 标记SocketThread是否是处理IO读的线程
- writing::标记SocketThread是否是处理IO写的线程
- stopping: 标记SocketThread线程是否是已经停止
- waiting:等待的IOService的跳表缓冲集合
核心的addSocketService、 removeSocketService方法源码
/*** 核心方法添加IOService* @param s*/public static void addSocketService(IOService<?> s) {//设置IOService已就绪s.setSocketServiceReady(true);// Due to a delayed SelectionKey cancelling deregistering// nature this distribution doesn't work well, it leads to// dead-lock. Let's make sure the service is always processed// by the same thread thus the same Selector.// socketReadThread[incrementAndGet()].addSocketServicePriv(s);//如果IOService是等待读的状态,那个根据 % 算法均衡到不同的线程中去if (s.waitingToRead()) {socketReadThread[s.hashCode() % socketReadThread.length].addSocketServicePriv(s);}//如果是待发送的IOService则根据 % 算法负载到写线程中去if (s.waitingToSend()) {socketWriteThread[s.hashCode() % socketWriteThread.length].addSocketServicePriv(s);}}/*** 核心移除IOService方法* @param s*/public static void removeSocketService(IOService<Object> s) {//设置就绪状态falses.setSocketServiceReady(false);//读写线程中都移除这个IOService对象socketReadThread[s.hashCode() % socketReadThread.length].removeSocketServicePriv(s);socketWriteThread[s.hashCode() % socketWriteThread.length].removeSocketServicePriv(s);}
两个都是静态方法,其核心原理就是用过hash 和取模算法分配IOService到SocketThread数组中的不同SocketThread线程中去。
SocketThread的run方法分析
public void run() {while (!stopping) {try {clientsSel.select();Set<SelectionKey> selected = clientsSel.selectedKeys();int selectedKeys = selected.size();//就绪的Key为0 且 等待的IOService为0if ((selectedKeys == 0) && (waiting.size() == 0)) {//重新创建Selector条件if ((++empty_selections) > MAX_EMPTY_SELECTIONS) {recreateSelector();}} else {empty_selections = 0;if (selectedKeys > 0) {//遍历就绪的Keyfor (SelectionKey sk : selected) {//从就绪的Key中拿到IOService对象IOService s = (IOService) sk.attachment();try {...sk.cancel();//添加到“待完成” 跳表中forCompletion.add(s);} catch (CancelledKeyException e) {//异常就强制停止IOServices.forceStop();}}}clientsSel.selectNow();}//将waiting跳表中的IOService分类注册到当前SocketThread的Selector上addAllWaiting();IOService serv = null;//先是从waiting 跳表注册到到当前forCompletion,然后再从//forCompletion跳表拿到最小的一个不为空,//就丢到completionService中执行,执行完只需要take()就能拿到执行完的对象while ((serv = forCompletion.pollFirst()) != null) {completionService.submit(serv);}} catch (Exception e) {recreateSelector();}}}
SocketThread本身是一个线程,其核心的方法和流程就在run()方法里面,以上代码是我精简之后留下的核心代码,具体的逻辑就是循环将waiting跳表集合中的IOServie注册到Selector上,并监听就绪的IOService,将其添加到forCompletion跳表集合中,然后从forCompletion中挨个取出,提交到completionService线程池中。供线程池调用IOService核心的call()方法进行数据的读写。
小结
tiagse的IO线程处理模型充分利用多线程和单一职责的设计,ConnectionOpenThread负载客户端连接的建立、SocketThread线程组负责监听就绪的IO对象,以及读写IO的分类、缓冲等,CompletionService线程池负责执行IOService的call方法(主要是处理IO的读写操作),以及解耦执行和执行结果的获取操作。ResultsListener线程负责循环利用没有关闭的IOService对象。合理的线程模型设计,更有利于提高系统的效率。