【Netty源码分析】04 服务端读流程
读流程
客户端接入后,下面一步操作就是读取客户端传输过来的数据,这一节我们就来分析下服务端读取客户端数据流程。从前面分析来看,channel的事件轮询、事件处理是在NioEventLoop的run方法中,从这里我们就很容易找我服务端读流程的入口方法:processSelectedKeys()。
从processSelectedKeys()一直追踪下去,可以看到OP_READ处理逻辑分支:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read();}可能你会比较奇怪:为什么OP_READ和OP_ACCEPT都会走这个分支?
(相关资料图)
OP_ACCEPT是NioServerSocketChannel处理的事件,而OP_READ是NioSocketChannel处理的事件,所以,虽然它们都走这个分支,但是channel类型确是不一样的,即这里的unsafe类型也不一样,一个是:NioMessageUnsafe,另一个是:NioSocketChannelUnsafe。NioServerSocketChannel负责监听客户端连接,当有客户端连接进入时,对它来说就是有个读入消息需要被处理。这里我们是处理client channle的OP_READ,所以,unsafe是NioSocketChannelUnsafe类型实例。
AbstractNioByteChannel.NioByteUnsafe#read方法代码如下:
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 申请ByteBuf对象 byteBuf = allocHandle.allocate(allocator); //doReadBytes(byteBuf):将数据读取到ByteBuf中 //lastBytesRead()将读取的字节数设置到lastBytesRead allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //触发pipeline channelRead事件,将读入数据ByteBuf传入到handler中 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading());//判断是否继续读取 allocHandle.readComplete(); //触发pipeline channelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}这个方法刨除其它逻辑,关于客户端数据处理逻辑主要包括3个步骤:
allocHandle.lastBytesRead(doReadBytes(byteBuf)):调用java api,从channel中读取字节数据到ByteBuf缓存中;pipeline.fireChannelRead(byteBuf):触发pipeline的channelRead事件,并将带有读入数据的ByteBuf通过参数传入;pipeline.fireChannelReadComplete():触发pipeline的channelReadComplete事件;事件传播
调用pipeline的fireChannelRead()就可触发channelRead事件在handler之间传播,事件传播这块代码比较绕,给人感觉不停的来回调用容易绕晕,下面通过图可以更加直观的看出调用流程,再配合代码就很好理解了。
关键点就在于HandlerContext中提供了一个静态方法:invokeChannelRead(final AbstractChannelHandlerContext next, Object msg),第一个是在哪个handler上触发事件,第二个参数就是数据本身,通过这个方法就可以指定在哪个handler上触发channelRead事件。由于pipeline中的handler是被包装成HandlerContext放入的,所以,可以通过handler()方法找到真正的handler对象进行触发。
比如pipeline的fireChannelRead()就是触发head的channelRead事件,如果处理完成需要把事件继续传播给下一个handler,就需要调用ctx.fireChannelRead(msg)方法即可,该方法中通过next属性获取到下一个节点,然后执行static invokeChannelRead(next, msg)这个方法就可以将事件传播到下一个节点上。
pipeline.fireChannelRead(byteBuf)运行完成后会调用pipeline.fireChannelReadComplete()方法,触发channelReadComplete事件,执行机制和channelRead事件一样,就不再赘述。
搞清楚上面原理,就很容易理解
ctx.fireChannelRead()和ctx.pipeline().fireChannelRead()之间的区别了,避免误用。
Pipeline线程模型
上面分析的都是常规模式,没有给handler指定额外线程情况下channelRead和channelReadComplete传播机制,大致如下图:
先触发channelRead事件,按照pipeline中顺序依次触发,当所有handler都触发完后,再触发channelReadComplete事件,按照pipeline中的顺序依次触发。这些所有流程采用的都是同步方式,在同一个线程中执行,这个线程就是channel注册的NioEventLoop。
我们来看下static void invokeChannelRead()这个方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); }}在执行next.invokeChannelRead(m)方法前有个executor.inEventLoop()判断,判断当前执行线程是不是就是handler执行所需的线程。执行handler方法是不能随便线程都可以去执行的,必须使用handler内部指定的executor线程执行器中执行才行。如下图,也就是说红色框框中的内容必须在executor线程执行器中执行,如果当前线程和handler执行线程不是同一个,就需要进行线程切换:则调用封装成一个任务,提交到executor的任务队列中让其执行。
executor线程执行器是通过next.executor()方法获取到的,从这个方法源码中可以看到获取逻辑:如果HandlerContext中executor有值则直接返回;否则返回channel注册的NioEventLoop作为线程执行器。
在添加handler时可以指定一个EventGroup:pipeline.addLast( bizGroup, "handler2", new OtherTest02());,这样,再把handler包装成HandlerContext过程中会从这个EventGroup根据chooser选取策略获得一个EventLoop赋值给executor。
所以,从上面分析,默认情况下handler都是在channel注册的NioEventLoop线程中执行的,除非在addLast添加handloer时特别指定。
下面我们通过一个案例分析下pipeline线程模型,如下,给handler02添加一个额外的线程池:
EventLoopGroup bizGroup = new NioEventLoopGroup(10, new ThreadFactoryBuilder().setNameFormat("biz-%s").build());protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "handler01", new OtherTest01()); pipeline.addLast( bizGroup, "handler02", new OtherTest02()); pipeline.addLast( "handler03", new OtherTest03());}这时,channelRead和channelReadComplete事件触发流程见下图:
channelRead事件执行流程说明:
handler01的channelRead事件,本身当前线程和handler01是同一个线程,所以,直接调用handler#channelRead()方法;handler01#channelRead()方法执行完成后,事件继续向下传播,需要调用handler02#channelRead()方法,但是handler02执行线程并不是默认的channel的注册线程,而是额外设置的biz线程,需要将调用包装成一个任务提交到biz线程的任务队列taskQueue中,然后直接返回;biz线程执行器内部线程会一直循环从taskQueue中获取任务执行,这样就完成了线程切换效果;当handler02#channelRead()方法执行完成后,需要执行handler03#channelRead(),它们又不在同一个线程中执行,这时有需要切换线程,所以会把handler03#channelRead()的调用封装成一个任务提交到register eventLoop的taskQueue中,待其内部线程提取执行;下面再来看下channelReadComplete事件执行流程:
a1将任务提交给taskQueue任务队列后直接返回了,而不是等其执行完成再返回;a1返回后,从源码分析来看,会立即触发channelReadComplete事件,涉及到线程切换,同理b1这里也是将handler02#channelReadComplete()调用封装成任务放入到biz eventLoop的taskQueue中的,然后也直接返回了;这样,biz eventLoop线程执行器taskQueue中就有两个任务,会按照顺序依次执行:先执行channelRead()调用,再执行channelReadComplete()调用;执行a3、b3时同理;总结
从上面可以看出,Pipeline中handler可以在不同线程间切换得到关键是:taskQueue。还要一点非常重要:handler线程池执行器默认使用的channel注册的NioEventLoop这个,NioEventLoop采用的是单线程工作模式,同时还需要处理selector.select()事件轮询,所以,handler里肯定不能有耗时、特别是IO阻塞等操作,不然卡在handler中,selector#select()执行不到,无法及时接收到客户端传送过来的数据。
标签:
- 知识课堂 校园欺凌是什么
- 权威百科知识 双氧水是啥东西
- 生活知识 长颈鹿是什么动物
- 知识导航 涤纶是什么
- 经验分享 嵌入式什么意思
- 百科 梦醒时分粤语版叫什么
- 打破性别“玻璃天花板” 95岁女院士是“她力量”最佳代言
- 中国记者节|今天,一起揭秘他们的独门“武功秘籍”
- 内蒙古通辽遭遇特大暴雪:学校停课 机场关闭
- 河北辛集市暂停举办体育活动 关闭景区文娱场所
- 红色文物·党史故事 “推出胜利”的小推车
- 侵华日军南京大屠杀遇难同胞纪念馆闭馆
- 甘肃:已治愈出院18例 闭环健康管理助回归家庭
- 核酸采样:一位“点长”的50小时冲刺
- 跑道结冰 哈尔滨机场关闭至9日12时
- 辽宁大连迎今冬首场降雪 机场临时关闭跑道地铁3号线停运
- 北京地铁全面开启车内加热装置
- 黑河市多举措保障疫情期间残疾人等特殊群体生活稳定
- 北京丰台海淀两处管控区域解封 社区工作者收到“暖心礼物”
- 百年兰州牛肉面的“隔离与亲近”
- 暴雪侵袭黑龙江 9地市最大雪深17厘米
- 吉林四平一旅游项目违占耕地两千多亩 投资达10亿元
- 湖南双峰27名非法滞留缅北人员被惩戒:小孩回原籍入学
- 江西新增本土“1+6” 上饶增一中风险地区
- 江西上饶一地调整为中风险地区 实行封闭管理措施
- 西宁市主城区首轮全员核酸采集样本144.8万份 结果均为阴性
- 快递旺季遭遇雨雪天气 国家邮政局呼吁理解快递小哥
- 高压、孤独,胆大、心细:手执焊枪的水下“蛙人”
- 掏粪掏了36年,他还在琢磨“新门道”
- 内蒙古:二连浩特市新增1例本土确诊病例 额济纳旗累计治愈出院本土确诊病例76例
- 坚守在海拔4300多米的“天路保健医生”
- 38年后,他终于知道了家在哪儿……
- 受降雪影响 辽宁鞍山一农贸市场发生坍塌
- 中国舞蹈家协会顶尖教师巡回课堂(重庆站)举办
- 边城战“疫”:夜晚七点的暂停键
- 风雪高原战“疫”长卷 寒潮下的西宁疫情防控观察
- 拟音师:“雕刻”声音的人【三百六十行】
- “双减”之后 中小学教师资格考试为何依然火爆
- 大数据助力贫困生成长
- “大漠明珠”驶上发展快车道 塔里木盆地做足生态大文章
- 职校生可报考事业单位 搬走职业教育的一块绊脚石
- 打算“双十一”买买买的姐妹 看完这篇再“剁手”
- 完美“飞天”仰仗全宇宙最酷飞船试驾员
- 冠状病毒中损伤血管的蛋白首次确定
- 新电池结构让飞行汽车成为可能 相关技术将亮相北京冬奥
- H5N8病毒肆虐全球,我国家禽为何“独善其身”
- 重庆奉节一民警因公殉职 年仅28岁
- 哈尔滨市新增本土新冠肺炎确诊病例1例
- 成都本地累计在管密接2757人、次密9097人
- 成都累计报告确诊病例23例 出现1传13特殊案例
-
呼和浩特一学校宿管员扇打学生致双耳鼓膜穿孔 分管校长被免
中新网呼和浩特11月9日电 (记者 张林虎)9日,针对“宿管员扇打学生致其双耳鼓膜穿孔”一事,呼和...
-
郑州通报8例确诊病例和无症状感染者活动轨迹
中新网11月9日电 据郑州市委宣传部官方微信消息,11月8日0至24时,郑州市新增阳性感染者3例,均为...
-
新疆阿克苏果农:我们的生活像苹果一样甜
中新社新疆阿克苏11月9日电 题:新疆阿克苏果农:我们的生活像苹果一样甜 作者 苟继鹏 “我...
-
河北辛集开展大规模消毒消杀工作
今天(9日)上午,河北省辛集市召开疫情防控新闻发布会。会上,辛集市科学技术局局长辛彦卜介绍,新冠...
-
河北辛集新增本土确诊11例 已转运定点医院诊治
今天(9日)上午,河北省辛集市召开疫情防控新闻发布会,辛集市副市长刘士民介绍,2021年11月8日0时至...
-
石家庄深泽县第五轮全员核酸检测结果全部为阴性
11月9日,石家庄市召开第12场新冠肺炎疫情防控工作新闻发布会。发布会上,石家庄市深泽县县长郝英鹏...
-
海口市1例治愈后的境外输入病例复阳 已转至定点医院隔离医学观察
中新网海口11月8日电 (记者 张茜翼)海口市新型冠状病毒感染肺炎疫情防控工作指挥部8日通报称,11...
-
四川新增本土确诊病例4例
中新网11月8日电 据四川省卫健委网站消息,11月7日0-24时,四川新增新型冠状病毒肺炎确诊病例5例(...
-
黑龙江省新增新冠肺炎本土确诊病例6例
中新网哈尔滨11月8日电 (程岩 记者 史轶夫)黑龙江省卫健委8日发布消息,7日0-24时,黑龙江省黑河...
-
河南新增本土确诊病例18例 其中郑州市16例周口市2例
中新网11月8日电 据河南省卫健委官方微博消息,11月7日0—24时,河南省新增本土确诊病例18例(郑州...
-
河北新增确诊病例8例 新增无症状感染者1例
中新网11月8日电 据河北省卫健委网站消息,2021年11月7日0—24时,河北省新增新型冠状病毒肺炎确诊...
-
寒潮持续发威!南方气温纷纷触底 强降雪中心转移至东北
中国天气网讯 今天(11月8日),寒潮继续南下,持续发威,南方大部最高气温将纷纷触底。强降雪中心将...
-
雪后寒!今日北京晴天回归北风劲吹 最高气温5℃上下
中国天气网讯 今天(11月8日)北京晴天回归,但在风寒效应下,“冷”仍然是天气的主题。气温方面,今...
-
黑龙江新增本土确诊病例6例 均在黑河市爱辉区
中新网11月8日电 据黑龙江省卫健委网站消息,2021年11月7日0-24时,黑龙江省新增新冠肺炎本土确诊...
-
寒潮继续影响华东华南等地 东北地区等地有强降雪
中新网11月8日电 据中央气象台网站消息,受寒潮影响,预计11月8日08时至9日08时,黄淮东部、江淮东...
-
辽宁新增本土确诊病例20例 新增本土无症状感染者12例
中新网11月8日电 据辽宁省卫健委网站消息,11月7日0时至24时,辽宁省新增20例本土新冠肺炎确诊病例...
-
寒潮影响“加码”:吉林力保电力供应 停课停运范围加大
中新网长春11月9日电 (记者 郭佳 张瑶)连日来,一轮寒潮引发的强降雪席卷中国北方。位于东北地区...
-
常州连续一周无新增病例 10日全市各类学校将错峰复学
中新网常州11月9日电 (记者 唐娟)11月9日,常州疫情防控指挥部学校防控组对外发布,自11月10起,...
-
哈尔滨机场开放恢复运行 计划航班45架次
中新网哈尔滨11月9日电 (仇建 记者 史轶夫)9日12时22分,随着哈尔滨经阜阳飞往三亚的FU6685航班...
-
山西警方抓获6名“摸金校尉” 缴获“虎枕”等大量文物
中新网长治11月9日电 (记者 李庭耀)记者9日从山西省长治市公安局上党分局获悉,上党警方侦破系列...
-
西藏基层第一书记话产业发展推进乡村振兴
中新网日喀则11月9日电(记者 赵朗)近日,由西藏自治区网信办主办的第一书记话小康活动先后走进山南...
-
内蒙古通辽:强降雪致8个旗县区受灾
中新网通辽11月9日电 (记者 张林虎)9日,记者从内蒙古自治区通辽市应急管理局获悉,自11月5日起,...
-
成都金堂:医护人取消婚礼坚守岗位 手捧花被送到了战“疫”一线
中新网成都11月9日电 (邹立杨)连日来,华西医院金堂县第一人民医院实验医学科的主检验师易维佳都在...
-
江西铅山新一轮核酸检测结果均为阴性
(抗击新冠肺炎)江西铅山新一轮核酸检测结果均为阴性 中新网南昌11月9日电 (记者 吴鹏泉)江西省...
-
辽宁大连幼儿园和中小学学生即日起暂缓入校
中新网11月9日电 据辽宁省大连市人民政府新闻办公室官方微博消息,大连市新冠肺炎疫情防控总指挥部...
-
那年今日 | 一张漫画涨知识之11月8日
-
北京新增1例本土确诊病例
中新网11月8日电 据北京卫健委官方微博消息,11月7日0时至24时,北京新增1例本土确诊病例,无新增...
-
河北石家庄深泽县7日新增1例无症状感染者 为8岁男童
中新网11月8日电 据石家庄卫健委官方微信消息,石家庄深泽县应对新冠肺炎疫情工作领导小组办公室8...
-
高速封闭、机场关闭、学校停课 辽宁多部门发应急预案应对极端天气
中新网沈阳11月8日电 (李晛 王景巍)7日在寒潮影响下,东北地区局地降大雪。辽宁省气象部门当日连...
-
云南新增本土确诊病例3例 新增本土无症状感染者3例
中新网11月8日电 据云南省卫健委网站消息,11月7日0—24时,云南省新增确诊病例9例,其中境外输入...
-
努力让每个人都有出彩机会
努力让每个人都有出彩机会 “孩子明年要参加中考,成绩一直提不上去,送他读职高,也是一种选择...
-
参与和见证中国水电发展
参与和见证中国水电发展 余吉安的童年是在马来西亚加里曼丹岛的沙捞越州古晋市度过的。家门口的...
-
中国航天:为实现中国梦提供战略支撑
中国航天:为实现中国梦提供战略支撑(科技名家笔谈) 今年是中国共产党成立100周年,也是中国航...
-
8日起 江西铅山县开展新一轮全员核酸检测
记者从江西省铅山县疫情防控指挥部了解到,按照疫情防控要求,为了充分保障公众的健康安全,现定于1...
-
适当“早教”可以,“早早教”大可不必
一家之言 适当“早教”可以,“早早教”大可不必 以前国庆节是放假了,可家家都有娃,放假补...
X 关闭
X 关闭






