Netty 网络框架
(一) 基础篇
1、I/O 基础
输入流:InputStream 和 Reader 输出流:OutputStream 和 Writer
字节流 字符流
计算机最小的二进制单位 bit 比特 代表 0 和 1 字节 1 byte = 8bit 计算机处理的最小单位 字符 1 char = 2byte = 16bit 人处理的最小单位
所以,字节流处理文件、图片、视频等二进制数据,而字符流处理文本数据。
2、Socket
原意是“插座”,在计算机领域中,翻译为“套接字”。 本质上,是计算机之间进行通信的一种方式。
Linux,“一切皆文件”,给每个文件映射一个 ID,叫做"文件描述符"。 当处理网络连接时,也会看成一个文件,read/write 变成和远程计算机的交互。
OSI 七层模型 = Open System Interconnection 开放式系统互联 从下到上分别为:物理层、数据链路层、网络层、传输层、会话层、表示层和应用层。
实际应用的是优化后的 TCP/IP 模型(四层) 网络接口层/链路层、网络层、传输层、应用层
应用层协议:HTTP、FTP、SMTP(邮件协议) 传输层协议:TCP、UDP
Socket 其实是应用层与传输层之间的抽象层,是一组接口。 在设计模式中,是门面模式。
3、NIO
BIO - BlockingIO 同步阻塞 NIO - New IO / Non-Blocking IO 同步非阻塞 AIO - Asynchronous IO 异步非阻塞
同步和异步,关注的是消息通知的机制 阻塞和非阻塞,关注的是等待消息过程中的状态
多路复用的模型
三大元素:Channel 、Buffer、Selector
1) Channel
FileChannel 文件管道的数据 Pipe.SinkChannel Pipe.SourceChannel 线程间通信的管道 ServerSocketChannel SocketChannel 用于 TCP 网络通信的管道 DatagramChannel 用于 UDP 网络通信的管道
2) Buffer
capacity 总体容量大小 limit 存储容量的大小,是可读写和不可读写的界线 position 已读容量的大小,已读和未读区域的界线
【使用原理】 a) 初始化,给定总容量,position=0, limit=capacity b) 当使用 put 方法存入数据是,通过 position 来记录存储的容量变化,position 不断后移,直到存储结束(写完成) c)写完成需要调用 flip 方法刷新,limit=position,position=0 保障 limit 记录的是可读写区域的大小,position 已读部分重置为空 d) 读数据直到读完成,需要调用 clear 方法,position=0, limit=capacity
3) Selector
三个元素: Selector 选择器、SelectableChannel 可选择的通道、SelectionKey 选择键
本质上,Selector 是监听器,监听的是通道是否有我们关心的操作产生,操作对应的是事件(连接、接收、读/写),使用 SelectionKey 代表具体的事件,在确保通道是可选择的情况下,将通道注册进选择器中,此时 Selector 维护的是,通道和事件之间的关联关系。
Selector,管理被注册的通道集合,以及他们的状态 SelectableChannel,是一个抽象类,提供了通道可被选择需要实现的 api。 FileChannel 就不是可选择的,Socket 相关的通道都是可选择的 一个通道可以被注册到多个选择器上吗? 可以的 多个通道可以注册到一个选择器上,但一个通道只能在一个选择器中注册一次
SelectionKey,封装了要监听的事件,连接、接收、读、写。 一方面,Selector 关心通道要处理哪些事件 另一方面,当事件触发时,通道要处理哪些事件
【使用方式】
a、首先通过 open 方法,获取通道,将通道设置为非阻塞的 b、通过 open 方法,获取选择器,将通道注册进选择器中,伴随设置通道要处理的事件(OP_ACCEPT) c、轮询选择器,当前是否有要处理的操作 select() > 0? 如果有,要获取,待处理操作的集合 Set<SelectionKey> , 进行遍历 遍历到 SelectionKey 时,判断对应哪种操作,不同的操作设置不同的处理方式 如 OP_ACCEPT,接收客户端通道并进行注册,监听后续处理的事件,如 OP_WRITE 如 OP_WRITE,通过 key 的方法获取通道本身,读取数据并继续监听事件,如 OP_READ
4、零拷贝
需求:将磁盘中的文件读取出来,通过 socket 发送出去
传统的拷贝方式(4 次) Socket 网络缓冲区,也属于操作系统的内核缓冲区。
在操作系统中进行的拷贝(如第二次和第三次),叫做 CPU 拷贝。 连接磁盘或网卡等硬件的拷贝(如第一次和第四次),叫做 DMA 拷贝。
零拷贝的概念:减少 CPU 拷贝的次数。
零拷贝是基于操作系统层面的优化方式(以下基于 Linux 系统)
1) mmap = memory mapping 内存映射
2)sendfile (linux2.1 内核支持)
- sendfile with scatter/gather copy(批量 sendfile) 从单个文件的处理,上升到多个物理地址的处理,提高处理速度
4)splice (拼接,在 linux2.6 内核支持)
在操作系统内核缓冲区和 Socket 网络缓冲区之间建立管道,来减少拷贝次数。
线程模型
1) 单线程 Reactor 模型
顾名思义 就是使用一个线程来处理问题 线程中
- selector
- 事件处理 : 连接事件
- 处理事件:handler
单线程服务器
public class ReactorServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public ReactorServer() {
try {
// 初始化监听器 与 channel 通道
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
// 配置为非阻塞的
serverSocketChannel.configureBlocking(false);
// 配置通道连接地址 开放 9090 端口
SocketAddress address = new InetSocketAddress(9090);
serverSocketChannel.socket().bind(address);
//将channel 注册到 selector监听通道事件 达到多路复用
//首个注册事件一般都是 accept 连接事件
SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 创建处理连接事件的 acceptor
// 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生
Acceptor acceptor = new Acceptor(selector, serverSocketChannel);
//附加一个对象 用来处理事件
key.attach(acceptor);
while (true) {
//返回事件的个数 处理事件
int num = selector.select();
if (num == 0) {
continue;
}
//没有跳过就代表有事件需要处理,拿到事件集合
Set<SelectionKey> SKeyset = selector.selectedKeys();
Iterator<SelectionKey> iterator = SKeyset.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到事件的第一事情 移出事件 避免重复处理
iterator.remove();
//根据事件类型 分发 给监听器处理
//需要处理事情的时候 取出存储的对象
//如有接收的时Accpet 事件 获取的就是Acceptor 事件
//如果接受的时读写事件 获取的就是 Handler 事件
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
}
}
Accpetor 连接事件
public class Acceptor implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
try {
//接受客户端传入的连接时 Socket Channel
SocketChannel socketChannel = serverSocketChannel.accept();
//设置异步
socketChannel.configureBlocking(false);
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
// 创造处理器 处理连接
//单线程
//Handler handler = new Handler(key);
//多线程
MultHandler handler = new MultHandler(key);
handler.run();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Handler 单线程处理
public class Handler implements Runnable {
private SelectionKey key;
private State state;
public Handler(SelectionKey key) {
this.key = key;
this.state = State.READ;
}
@Override
public void run() {
//处理 读写操作,判断读写
switch (state) {
case READ:
read();
break;
case WRITE:
write();
break;
default:
break;
}
}
/*轮流处理末尾添加事件达到循环处理*/
//处理 读方法
private void read() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
try {
//将传入的数据写入到buffer中
int num = channel.read(buffer);
// 转化成String
String msg = new String(buffer.array());
// 增加业务处理
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_WRITE);
this.state = State.WRITE;
} catch (Exception e) {
e.printStackTrace();
}
}
//处理 写方法
private void write() {
ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
try {
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
channel.write(buffer);
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_READ);
this.state = State.READ;
} catch (Exception e) {
e.printStackTrace();
}
}
//记录状态 非读即写
private enum State {
//读写事件
READ, WRITE
}
}
2)多线程 Reactor 模型
提高 handler 的处理效率,首先 handler 不再负责具体的业务逻辑,当读取出数据后,分发给子线程处理,子线程处理完成后再将结果返回给 handler,handler 再将结果返回给客户端。
多线程处理 (handler 使用线程池)
public class MultHandler implements Runnable {
private SelectionKey key;
private State state;
private ExecutorService pool;
public MultHandler(SelectionKey key) {
this.key = key;
this.state = State.READ;
}
@Override
public void run() {
//处理 读写操作,判断读写
switch (state) {
case READ:
//将最耗时的操作 放入线程池执行
pool.execute(new Runnable() {
@Override
public void run() {
read();
}
});
break;
case WRITE:
write();
break;
default:
break;
}
}
/*轮流处理末尾添加事件达到循环处理*/
//处理 读方法
private void read() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
try {
//将传入的数据写入到buffer中
int num = channel.read(buffer);
// 转化成String
String msg = new String(buffer.array());
// 增加业务处理
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_WRITE);
this.state = State.WRITE;
} catch (Exception e) {
e.printStackTrace();
}
}
//处理 写方法
private void write() {
ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());
try {
// 通过通道获取KEY
SocketChannel channel = (SocketChannel) key.channel();
channel.write(buffer);
// 继续处理注册写事件
key.interestOps(SelectionKey.OP_READ);
this.state = State.READ;
} catch (Exception e) {
e.printStackTrace();
}
}
//记录状态 非读即写
private enum State {
//读写事件
READ, WRITE
}
}
3)主从 Reactor 模型
mainReactor 用来接收连接事件,然后分发给 acceptor,acceptor 在处理过程中,直接将后续的读写事件,注册到 slaveReactor 之中,以此来达到分流。
主从监听器
//主从模型
public class MultReactorServer {
private Selector mainselector;
private Selector slaveselector;
private ServerSocketChannel serverSocketChannel;
public MultReactorServer() {
try {
// 主 reactor 处理连接事件
mainselector = Selector.open();
//从reactor 处理读写事件
slaveselector = Selector.open();
// 配置为非阻塞的
serverSocketChannel.configureBlocking(false);
// 配置通道连接地址 开放 9090 端口
SocketAddress address = new InetSocketAddress(9090);
serverSocketChannel.socket().bind(address);
//将channel 注册到 selector监听通道事件 达到多路复用
//首个注册事件一般都是 accept 连接事件 (参数变化)
SelectionKey key = serverSocketChannel.register(mainselector, SelectionKey.OP_ACCEPT);
// 创建处理连接事件的 acceptor
// 同时 创建处理器 接受后序的IO读写事件,不断的遍历 是否有事情发生 (参数变化)
Acceptor acceptor = new Acceptor(slaveselector, serverSocketChannel);
//附加一个对象 用来处理事件
key.attach(acceptor);
//主从监听逻辑分离
new HandlerLoop(slaveselector).run();
while (true) {
//返回事件的个数 处理事件
int num = mainselector.select();
if (num == 0) {
continue;
}
//没有跳过就代表有事件需要处理,拿到事件集合
Set<SelectionKey> SKeyset = mainselector.selectedKeys();
Iterator<SelectionKey> iterator = SKeyset.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到事件的第一事情 移出事件 避免重复处理
iterator.remove();
//根据事件类型 分发 给监听器处理
//需要处理事情的时候 取出存储的对象
//只处理主Reactor 只处理连接事件
Runnable runnable = (Runnable) key.attachment();
runnable.run();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
主从事件处理读写分离
//用于处理从reactor事件监听
public class HandlerLoop implements Runnable {
private Selector selector;
public HandlerLoop(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
//返回事件的个数 处理事件
int num = selector.select();
if (num == 0) {
continue;
}
//没有跳过就代表有事件需要处理,拿到事件集合
Set<SelectionKey> SKeyset = selector.selectedKeys();
Iterator<SelectionKey> iterator = SKeyset.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//拿到事件的第一事情 移出事件 避免重复处理
iterator.remove();
//根据事件类型 分发 给监听器处理
//需要处理事情的时候 取出存储的对象
//只处理从reactor 所以 接受的一定是读写事件
Runnable runnable = (Runnable) selectionKey.attachment();
runnable.run();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
(二) 应用篇
1、HTTP
1) 0.9 版本
GET /index.html
服务端只能返回 html 格式,传输过程只能处理文字。
2) 1.0 版本
支持任何格式的内容,包括图像、视频、二进制等等
引入了 POST 命令、HEAD 命令
增加了请求头、状态码,以及权限、缓存等
GET / HTTP/1.0
User-Agent:Mozilla/1.0
Accept: */*
HTTP/1.0 200 OK
Content-Type: text/plain
Content-Encoding: gzip
<html>
<body> hello world </body>
</html>
a、 Content-Type
服务端通知客户端,当前数据的格式
示例: text/html 、 image/png 、 application/pdf 、 video/mp4
前面是一级类型,后面是二级类型,用斜杠分隔; 还可以增加其他参数,如编码格式。
Content-Type: text/plain; charset=utf-8
b、Content-Encoding
表示数据压缩的方式,gzip、compress、deflate
对应客户端的字段为 Accept-Encoding,代表接收哪些压缩方式
c、缺点和问题
每个 TCP 连接只能发送一个请求,发送完毕连接关闭,使用成本很高,性能较差。
Connection: keep-alive - 非标准字段
3) 1.1 版本
GET / HTTP/1.1
User-Agent: PostmanRuntime/7.24.1
Accept: */*
Cache-Control: no-cache
Postman-Token: 636ce8a6-7eab-451a-8638-4534a3578095
Host: cn.bing.com
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
HTTP/1.1 200 OK
Cache-Control: private, max-age=0
Content-Length: 45786
Content-Type: text/html; charset=utf-8
Content-Encoding: br
Vary: Accept-Encoding
P3P: CP="NON UNI COM NAV STA LOC CURa DEVa PSAa PSDa OUR IND"
Set-Cookie: SRCHD=AF=NOFORM; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUID=V=2&GUID=5C28FF778A2C4A00B32F5408147038BF&dmnchg=1; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: SRCHUSR=DOB=20200622; domain=.bing.com; expires=Wed, 22-Jun-2022 07:03:23 GMT; path=/
Set-Cookie: _SS=SID=23AE726877396796143D7C99761766C7; domain=.bing.com; path=/
Set-Cookie: _EDGE_S=F=1&SID=23AE726877396796143D7C99761766C7; path=/; httponly; domain=bing.com
Set-Cookie: _EDGE_V=1; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUID=1CB3B15BCBD2637E2C01BFAACAFC6214; samesite=none; path=/; secure; expires=Sat, 17-Jul-2021 07:03:23 GMT; domain=bing.com
Set-Cookie: MUIDB=1CB3B15BCBD2637E2C01BFAACAFC6214; path=/; httponly; expires=Sat, 17-Jul-2021 07:03:23 GMT
X-MSEdge-Ref: Ref A: 71D6F4B3BA9C448EB453341AC182C7BC Ref B: BJ1EDGE0311 Ref C: 2020-06-22T07:03:23Z
Date: Mon, 22 Jun 2020 07:03:23 GMT
a、持久连接,含义为默认不关闭 tcp 连接,可以被多个请求复用。大多时候,浏览器对同一个域名,允许同时建立 6 个连接。
b、管道机制,支持客户端发送多个请求,管理请求的顺序的。服务器还是按照接受请求的顺序,返回对应的响应结果。
c、Content-Length, 用来区分数据包的重要字段
d、支持 PUT、DELETE、PATCH 等命令
缺点和问题
当部分请求耗时较长时,仍会阻塞后续请求的处理速度,这种现象叫做“队头阻塞”/"线头阻塞"。
4) 2.0 版本
解决队头阻塞的问题,使用的是多路复用的方式。
说了这么多 上代码来操作一下吧!!
我们的编写思路是这样的
- 编写初始化服务端
- 编写自定义初始化器 和 自定义处理器
- 启动 postman 查看我们设置的 http 的响应结果
我们这里有三个类
- HttpServer 初始化服务端
- MyHttpHandler 自定义处理器
- MyHttpInitializer 自定义初始化
首先是 server
我们需要在初始化服务端的时候 设置主从线程模型(Netty 中常用) 设置 启动参数 和阻塞队列的长度等设置 设置 初始化
public class HttpServer {
public static void main(String[] args) {
//可以自定义线程的数量
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 默认创建的线程数量 = CPU 处理器数量 *2
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
//当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
.option(ChannelOption.SO_BACKLOG, 128)
//设置连接保持为活动状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new MyHttpInitializer());
try {
//设置为异步启动 异步 关闭
ChannelFuture future = serverBootstrap.bind(9988).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//netty的优雅关闭 指 等一切执行完毕之后 慢慢的关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
编写 初始化
继承 ChannelInitializer 泛型为 Channel,用来进行设置出站解码器和入站编码器 使用 codec netty 封装好的解码器,这样我们就不用每次定义 解码和编码
public class MyHttpInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//先对应请求解码,后对响应解码
//pipeline.addLast("decoder", new HttpRequestDecoder());
//pipeline.addLast("encoder", new HttpRequestEncoder());
//当然每次我们都要解码编码很麻烦,netty也有为我们提供对应的解决方案,建议直接使用这个 不会出错
pipeline.addLast("codec", new HttpServerCodec());
//压缩数据
pipeline.addLast("compressor", new HttpContentCompressor());
//聚合完整的信息 参数代表可以处理的最大值 此时的是 512 kb
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
pipeline.addLast(new MyHttpHandler());
}
}
有了初始化,我们还需要一个做事的 那就是 处理器 Handler
netty 帮我们封装了返回完整 http 响应的类 DefaultFullHttpResponse 我们只需要在读取的时候 设置协议,状态码和响应信息, 配置响应头的类型和长度 就可以完成对请求的响应
/*
* 泛型需要设置为 FullHttpRequest
* 筛选 message 为此类型的消息才处理
* */
public class MyHttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws Exception {
//DefaultFullHttpResponse 是一个默认的完整的http响应
DefaultFullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
Unpooled.wrappedBuffer("hello http netty demo".getBytes())
);
// 我们还需要设置响应头
// 设置请求 响应头字段 可以使用 HttpHeaderNmaes
// 设置字段时 可以使用
HttpHeaders headers = response.headers();
headers.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + ";charset=UTF-8");
headers.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.write(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
启动结果
postman 差看到的请求响应 参数
可以看到我们设置的 http1.1 协议
类型 text/plain 长度 47
响应给客户端的 内容
hello http netty demo
小结
- 了解 http 各个版本的解决了什么问题,优缺点,优劣性
- 手动编写一个服务端的响应,用 postman 查看响应头我们设置的内容
- 体验到 netty 强大的封装带给我们的便利性
2、WebSocket
websocket 是由浏览器发起的
协议标识符 http://127.0.0.1:8080 ws://127.0.0.1:7777
GET ws://127.0.0.1:7777 HTTP/1.1
Host: 127.0.0.1
Upgrade: websocket # 升级为ws
Connection: Upgrade # 此链接需要升级
Sec-WebSocket-key: client-random-string ... # 标识加密相关信息
HTTP/1.1 101
Upgrade: websocket
Connection: Upgrade
响应码 101 代表本次协议需要更改为 websocket
连接建立后,支持文本信息及二进制信息。
Websocket 实现的原理: 通过 http 协议进行连接的建立(握手和回答),建立连接后不再使用 http,而 tcp 自身是支持双向通信的,所以能达到“全双工”的效果。
通信使用的单位叫帧 frame 客户端:发送时将消息切割成多个帧 服务端:接收时,将关联的帧重新组装
【客户端】
var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
ws.send("hello"); //建立连接后发送数据
}
设计一个样式 左右两个各有一个文本框,中间放一个发送按钮。 左侧文本框用来发送数据,右侧文本框用来显示数据。
Websocket 应用 demo
服务端代码
public class WebSocketServer {
public static void main(String[] args) {
//可以自定义线程的数量
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 默认创建的线程数量 = CPU 处理器数量 *2
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
//当前连接被阻塞的时候,BACKLOG代表的事 阻塞队列的长度
.option(ChannelOption.SO_BACKLOG, 128)
//设置连接保持为活动状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new WebSocketInitialzer());
try {
ChannelFuture future = serverBootstrap.bind(7777).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端初始化器
public class WebSocketInitialzer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//增加编解码器 的另一种方式
pipeline.addLast(new HttpServerCodec());
// 块方式写的处理器 适合处理大数据
pipeline.addLast(new ChunkedWriteHandler());
//聚合
pipeline.addLast(new HttpObjectAggregator(512 * 1024));
/*
* 这个时候 我们需要声明我们使用的是 websocket 协议
* netty为websocket也准备了对应处理器 设置的是访问路径
* 这个时候我们只需要访问 ws://127.0.0.1:7777/hello 就可以了
* 这个handler是将http协议升级为websocket 并且使用 101 作为响应码
* */
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
pipeline.addLast(new WebSocketHandler());
}
}
服务端处理器
通信使用的单位叫帧 frame 客户端:发送时将消息切割成多个帧 服务端:接收时,将关联的帧重新组装
/*
* 泛型 代表的是处理数据的单位
* TextWebSocketFrame : 文本信息帧
* */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
//可以直接调用text 拿到文本信息帧中的信息
System.out.println("msg:" + textWebSocketFrame.text());
Channel channel = ctx.channel();
//我们可以使用新建一个对象 将服务端需要返回的信息放入其中 返回即可
TextWebSocketFrame resp = new TextWebSocketFrame("hello client from websocket server");
channel.writeAndFlush(resp);
}
}
websocket 前端编写
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Title</title>
</head>
<body>
<script>
var socket;
// 判断当前浏览器是否支持websocket
if (!window.WebSocket) {
alert("不支持websocket");
} else {
<!-- 创建websocket 连接对象-->
socket = new WebSocket("ws://127.0.0.1:7777/hello");
//设置开始连接的方法
socket.onopen = function (ev) {
var tmp = document.getElementById("respText");
tmp.value = "连接已经开启";
};
//设置关闭连接的方法
socket.onclose = function (ev) {
var tmp = document.getElementById("respText");
tmp.value = tmp.value + "\n" + "连接已经关闭";
};
//设置接收数据的方法
socket.onmessage = function (ev) {
var tmp = document.getElementById("respText");
tmp.value = tmp.value + "\n" + ev.data;
};
}
function send(message) {
if (!window.socket) {
return;
}
/*
* 判断socket的状态
* connecting 正在连接 closing 正在关闭
* closed 已经关闭或打开连接失败
* open 连接可以 已经正常通信
* */
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接未开启");
}
}
</script>
<!--防止表单自动提交-->
<form onsubmit="return false">
<textarea name="message" style="height: 400px;width: 400px"></textarea>
<input
type="button"
value="发送"
onclick="send(this.form.message.value)"
/>
<textarea id="respText" style="height: 400px;width: 400px"></textarea>
</form>
</body>
</html>
【客户端】
var ws = new WebSocket("ws://127.0.0.1:7777/hello");
ws.onopen = function(ev){
ws.send("hello"); //建立连接后发送数据
}
设计一个样式 左右两个各有一个文本框,中间放一个发送按钮。 左侧文本框用来发送数据,右侧文本框用来显示数据。
演示效果
启动服务发送消息
小结
- websocket 一般用于做可复用连接,http 一般做短链接
- websocket 解决了 http 连接只能客户端发起的
(三)原理篇
1、ByteBuf
NIO 中 ByteBuffer 的缺点:
A 长度固定,无法动态的扩容和缩容,缺乏灵活性 B 使用一个 position 记录读写的索引位置,在读写模式切换时需手动调用 flip 方法,增加了使用的复杂度。 C 功能有限,使用过程中往往需要自行封装
1)分类
按照内存的位置,分为堆内存缓冲区 heap buffer、直接内存缓冲区 direct buffer、复合内存缓冲区 composite buffer。
A heap buffer
将数据存储到 JVM 的堆空间中,实际使用字节数组 byte[]来存放。 优点:数据可以快速的创建和释放,并且能够直接访问内部数组 缺点:在读写数据时,需要将数据复制到直接缓冲区 再进行网络传输。
B direct buffer
不在堆中,而是使用了操作系统的本地内存。 优点:在使用 Socket 进行数据传输过程中,减少一次拷贝,性能更高。 缺点:释放和分配的空间更昂贵,使用时需要更谨慎。
C composite buffer
将两个或多个不同内存的缓冲区合并 优点:可以统一进行操作
应用场景:在通信线程使用缓冲区时,往往使用 direct buffer,而业务消息使用缓冲区时,往往使用 heap buffer,在解决 http 包,请求头+请求体特性不同而选择不同位置存储时,可以将两者拼接使用
D 池化的概念
对于内存空间分配和释放的复杂度和效率,netty 通过内存池的方式来解决。 内存池,可以循环利用 ByteBuf,提高使用率。但是管理和维护较复杂。
Unpooled 正是非池化缓冲区的工具类。
主要区别在于,池化的内存由 netty 管理,非池化的内存由 GC 回收。
E 回收方式
回收方式为引用计数,具体规则为,通过记录被引用的次数,判断当前对象是否还会被使用。 当对象被调用时,引用计为+1,当对象被释放时,引用计为-1,当引用次数为 0 时,对象可以回收。
弊端:可能引发内存泄漏。 当对象不可达,JVM 会通过 GC 回收掉,但此时引用计数可能不为 0,对象无法归还内存池,会导致内存泄漏。netty 只能通过对内存缓冲区进行采样,来检查。
2)工作原理
和 ByteBuffer 不同在于,增加了一个指针,通过两个指针记录读模式和写模式时的索引位置,读指针叫做 readerIndex,写指针叫做 writerIndex。
A 读写分离
当执行 clear()方法时,索引位置清空回初始位置,但数据保持不变。
mark 和 reset 方法在 ByteBuf 中同样适用,如 markReaderIndex 和 resetReaderIndex。
B 深浅拷贝
浅拷贝,拷贝的是对对象的引用,并没有创建新对象,新对象和原对象之间互相影响。 深拷贝,拷贝的是整个对象,和原对象之间完全独立。
duplicate 和 slice 方法,达成全部浅拷贝和部分浅拷贝。 copy,部分深拷贝,部分代表的是可读空间。
3)扩容机制
A ByteBuffer 的存储
ByteBuffer 在 put 数据时,会校验剩余空间是否不足,如果不足,会抛出异常。
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put("yu".getBytes());
----------------------------------------------------
public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
// 额外接收偏移量(存储数据的起始位置) 和数据长度
public ByteBuffer put(byte[] src, int offset, int length) {
// 校验参数的有效性
checkBounds(offset, length, src.length);
// 如果要存储数据的长度 > 剩余可用空间 抛出buffer越界的异常
if (length > remaining())
throw new BufferOverflowException();
// 如果剩余空间足够 计算存储的结束位置 = 偏移量 + 数据长度
int end = offset + length;
for (int i = offset; i < end; i++)
this.put(src[i]);
return this;
}
如果要手动对 ByteBuffer 扩容,可以在 put 之前,先校验剩余空间是否足够,如果不足够,创建一个新的 ByteBuffer,新的容量确保足够,旧的 buffer 数据拷贝到新的 buffer 中,然后继续存储数据。
B ByteBuf 的存储和扩容
当写数据时,先判断是否需要扩容,如果当前空间较小(<4M),以 64 作为基数倍增(10 -> 64 -> 128 -> 256), 如果当前空间较大(>4M), 每次扩容都增加 4M,这种方式叫做"步进式"。
查看源码,以AbstractByteBuf子类为依据查看,最重要的子类之一,ByteBuf的公共属性和功能都在此中实现。
ByteBuf buf = Unpooled.buffer(10);
System.out.println("capacity: " + buf.capacity());
for (int i = 0; i < 11; i++) {
buf.writeByte(i);
}
----------------------------------------------------
[ByteBuf类]
public abstract ByteBuf writeByte(int value);
按住Ctrl+Alt快捷键
[AbstractByteBuf子类]
----------------------------------------------------
@Override
public ByteBuf writeByte(int value) {
// 确保可写空间足够
ensureWritable0(1);
// 写入数据
_setByte(writerIndex++, value);
return this;
}
// 参数为 最小写入数据的大小
final void ensureWritable0(int minWritableBytes) {
final int writerIndex = writerIndex();
// 目标容量 = 当前写操作索引 + 最小写入数据大小
final int targetCapacity = writerIndex + minWritableBytes;
// 容量足够 不需扩容
if (targetCapacity <= capacity()) {
ensureAccessible();
return;
}
// 容量不足时 如果目标容量 超出最大容量 抛出异常
if (checkBounds && targetCapacity > maxCapacity) {
ensureAccessible();
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// 扩容逻辑
// 获取可写空间大小
final int fastWritable = maxFastWritableBytes();
// 如果 可写空间 >= 所需空间 新的容量=写操作索引+可写空间大小
// 如果 可写空间 < 所需空间 计算要扩容的新容量大小 calculateNewCapacity方法
int newCapacity = fastWritable >= minWritableBytes ? writerIndex + fastWritable
: alloc().calculateNewCapacity(targetCapacity, maxCapacity);
// Adjust to the new capacity.
// 计算完成后 生成新的ByteBuffer
capacity(newCapacity);
}
// 获取可写空间大小
public int maxFastWritableBytes() {
return writableBytes();
}
[AbstractByteBufAllocator子类]
----------------------------------------------------
// 计算要扩容的新容量大小
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
// 校验参数有效性
checkPositiveOrZero(minNewCapacity, "minNewCapacity");
if (minNewCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
minNewCapacity, maxCapacity));
}
// 扩容方式的分界点 以4M大小为界
final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.、
// 如果所需容量大于4M 按照步进的方式扩容
// 举例: 比如 minNewCapacity = 5M
if (minNewCapacity > threshold) {
// newCapacity = 5 / 4 * 4 = 4M 确保是4的倍数
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
// newCapacity = 4 + 4 = 8M;
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
// 如果所需容量大于4M 按照64的倍数扩容 找到最接近所需容量的64的倍数
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
// 保障在最大可接受容量范围内
return Math.min(newCapacity, maxCapacity);
}
4)优势
A 池化的方式提高内存使用率
B 提出了复合型缓冲区的整合方案
C 增加了索引,使读写分离,使用更便捷
D 解决了 ByteBuffer 长度固定的问题,增加了扩容机制
E 用引用计数的方式进行对象回收
2、Channel
1)Channel
channel 是通讯的载体,对应通讯的一端,在 BIO 中对应 Socket,NIO 中对应 SocketChannel,Netty 中对应 NioSocketChannel,ServerSocket 同理。 channelhandler 是通道的处理器,一个 channel 往往有多个 handler channelpipeline 是 handler 的容器,装载并管理 handler 的顺序(本质是双向链表)
如图,channel 创建时,会对应创建一个 channelpipeline,pipeline 首先会记录一个头部的处理器 handler,当 pipeline 进行分发时,先分发给头部,然后依次执行,执行 handler 全部执行完成。
同时,channel 创建后,会注册进 EventLoop 之中,EventLoop 会监听事件的发生。不同的事件调用 handler 不同的处理方法,让流程运转起来。
channel 生命周期,对应四种状态,分别为: A) ChannelUnregistered 已创建但还未被注册到监听器中 B) ChannelRegistered 已注册到监听器 EventLoop 中 C) ChannelActive 连接完成处于活跃状态,此时可以接收和发送数据 D) ChannelInactive 非活跃状态,代表连接未建立或者已断开
channelhandler 生命周期,对应三种状态,分别为: A) handlerAdded 把 handler 添加到 pipeline 之中 B) handlerRemoved 从 pipeline 中移除 C) exceptionCaught 在处理过程中有错误产生
创建 channel 源码分析
以服务端启动为例
ChannelFuture future = serverBootstrap.bind(8888).sync();
参数设置
serverBootstrap.channel(NioServerSocketChannel.class)
【AbstractBootstrap】 启动对象的父类
------------------------------------------------------------------
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
.......
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
.......
}
【ReflectiveChannelFactory】 工厂实现类
------------------------------------------------------------------
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
在启动对象调用 bind()或 connect()方法时,会创建 channel 本质上通过反射,使用工厂的反射实现类创建对应的实例,此时实例对象的类型是通过 channel 参数来设置的
2)ChannelHandler
类层次关系图
入站和出站: 从服务端的角度,数据从客户端发送到服务端,称之为入站,当数据处理完成返回给客户端,称之为出站。是相对的概念。
从客户端的角度,数据从服务端发送给客户端,称之为入站,当数据返回给服务端,称之为出站。
不论是入站还是出站,handler 从一端开始,到另一端结束,以责任链的模式依次执行。
责任链模式——"击鼓传花",当请求被不同的接收者处理时,每个接收者都包含对下一个接收者的引用,一个接收者处理完成后,将依次向下传递。
适配器模式——出国时要使用的电源转换器(美国/日本 110V 中国 220V 电压),作为两个不兼容的接口之间的桥梁,将类的接口转换为需要的另外一种接口。
ChannelDuplexHandler 是除了入站和出站 handler 之外的,另一个常用子类。 它同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,如果需要既处理入站事件又处理出站事件,可以继承此类。
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO))
------------------------------------------------------------------
public class LoggingHandler extends ChannelDuplexHandler{}
------------------------------------------------------------------
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
------------------------------------------------------------------
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {}
ChannelHandlerAdapter 提供了额外的 isSharable()方法,用来判断 handler 是否可以被共享到多个 pipeline 之中。默认情况不共享,如果需要共享,在继承了适配器的 handler 上,增加注解@Sharable
@Sharable
public class LoggingHandler extends ChannelDuplexHandler {}
ChannelInboundHandler 最重要的方法是 channelRead(),在使用时,需要显式的释放 ByteBuf 相关的内存。使用 ReferenceCountUtil 是引用计数的工具类。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// netty中的缓冲区 叫做ByteBuf -- 对ByteBuffer的封装
ByteBuf buf = (ByteBuf) msg;
// 释放ByteBuf内存
ReferenceCountUtil.release(msg);
}
为了减少对资源内存的管理,使用 SimpleChannelInboundHandler,使用其 channelRead0()方法,可以自动释放资源,使用更便利。
SimpleChannelInboundHandler源码
------------------------------------------------
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
3)ChannelPipeline
pipeline 中维护入站和出站链路,两条链路的执行顺序。
handler 只负责处理自身的业务逻辑,对通道而言,它是无状态的。通道的信息会保存到 handlerContext 处理器上下文中,它是连接 pipeline 和 handler 之间的中间角色。
pipeline 管理的是由 handlerContext 包裹的 handler,也就是说,当添加 handler 时,先将其转为 handlerContext,然后添加到 pipeline 的双向链表中。头结点叫做 HeadContext,尾节点叫做 TailContext。
ch.pipeline().addLast(new NettyServerHandler());
[DefaultChannelPipeline]
----------------------------------------------------------------
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
// 关键逻辑
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查当前handler是否支持共享,如果不支持,又被添加到其他pipeline中,会报错
checkMultiplicity(handler);
// 将handler封装为context
newCtx = newContext(group, filterName(name, handler), handler);
// 将context添加到链表尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 判断当前通道的注册状态,如果是未注册,执行此逻辑
if (!registered) {
// 添加一个任务,当通道被注册后,能够回调handlerAdded方法
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 如果已被注册 执行调用handlerAdded方法
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
// 尾节点会提前声明并创建
final AbstractChannelHandlerContext tail;
// prev -> tail 在其中插入newctx
// prev -> newctx -> tail 放到倒数第二个位置中 tail节点是保持不变的
// 依次更改 新节点的前后指针 以及prev节点的后置指针和tail节点的前置指针
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
// 构造器中已经提前创建了头尾节点
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
使用 pipeline 模式的优点: A) 解耦,让处理器逻辑独立,可以被多个 channel 共享 B) channel 相关信息,交给 context 维护 C) 具有极大的灵活性,使用处理器可以方便的添加或删除,或者更改它们的顺序
3、EventLoop
EventLoop 事件循环,监听 IO 事件,内部封装了线程
EventLoopGroup 事件循环组,是对 EventLoop 的管理,封装了线程池。
当新建 channel 时,group 会为其分配一个 EventLoop,封装了 nio 中的 Selector,监听通道中的所有事件,一个通道的生命周期内,所有操作都由相同的 EventLoop 所封装的线程处理。
同时,多个通道可以由一个 EventLoop 处理,是多对一的关系
1)EventLoopGroup
类层级结构 (通过选中 NioEventLoopGroup 源码 - 右键 - 选中 Diagrams - 选中 show diagram 展示出来)
A 初始化流程
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
[NioEventLoopGroup]
-------------------------------------------------------------------------
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
// 逐步增加参数
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
// 调用父类的构造器
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
[MultithreadEventLoopGroup]
-------------------------------------------------------------------------
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 初始化线程数量的逻辑 线程数 = cpu核数 * 2
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
[MultithreadEventExecutorGroup]
-------------------------------------------------------------------------
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// 核心逻辑
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 根据线程数量 创建EventLoop的逻辑
// newChild()的具体实现在NioEventLoopGroup类中
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
[NioEventLoopGroup]
-------------------------------------------------------------------------
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
2)EventLoop
重要属性为 Selector 及其父类的父类中的 Thread Selector 用于在 channel 创建之后,注册其中监听后续的 I/O 事件 Thread 用于进行轮询,在 channel 注册之后启动线程
A 注册 channel
ChannelFuture future = serverBootstrap.bind(8888).sync();
[AbstractBootstrap]
--------------------------------------------------------------------------
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
.....
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
注册的源码调用链路
ChannelFuture regFuture = config().group().register(channel);
[MultithreadEventLoopGroup]
------------------------------------------------------------------------------
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
[SingleThreadEventLoop]
------------------------------------------------------------------------------
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
[AbstractChannel]
------------------------------------------------------------------------------
// 核心逻辑是 调用了 register0()方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// channel中存在EventLoop类型的属性
// 通道初始化时,会将指定的EventLoop与channel进行关联
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
// 核心逻辑是调用了doRegister()方法
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
[AbstractNioChannel]
----------------------------------------------------------------------------
// 核心注册逻辑
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将通道注册进Selector中 此时感兴趣的事件是0
// 并且将当前对象作为附加对象存入其中 等价selectionKey.attach()方法
// 使用对象时 再通过selectionkey.attachment()方法取出对象
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
B 轮询事件的状态
读取源码的思路:找到入口,找到核心逻辑。
AbstractChannel 中 register()方法,对 eventLoop.execute()的调用,就是启动线程进行轮询的入口。
[SingleThreadEventExecutor]
-----------------------------------------------------------------------------
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
private final Queue<Runnable> taskQueue;
// 当前类维护了一个任务队列
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
// 启动线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 真正的调用逻辑
SingleThreadEventExecutor.this.run();
success = true;
}
......
}
}
}
[NioEventLoop]
-----------------------------------------------------------------------------
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
// 判断队列中是否存在任务
if (!hasTasks()) {
// 如果不存在 调用select()进行获取
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
// 处理任务的核心逻辑
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
processSelectedKeys()是处理任务的核心逻辑,来自于 NioEventLoop 的 run()方法调用
// 处理事件集合
private void processSelectedKeys() {
// 如果selectedKeys(事件集合)没有值,重新获取,如果有值,直接处理
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
// 这是注册时,存储的附加对象,即为通道对象channel
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
// 处理具体事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
// 处理具体事件
// 判断事件类型,调用对应的逻辑处理
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
其中读事件的处理
unsafe.read();
[AbstractNioByteChannel]
----------------------------------------------------------------------------
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
4、Bootstrap
引导,对应用程序进行配置,并让他运行起来的过程。
1)配置
必选参数:group 、 channel、 handler(服务端 -- childHandler)
group(): 指定一个到两个 reactor
channel():指定 channel 工厂,反射的方式创建 channel 使用
handler():指定 reactor 的处理器,其中 childHandler 指定的是,服务端所接收到的客户端 channel 使用的处理器,而服务端的主 reactor(bossGroup),已经默认添加了 acceptor 处理器,所以可以不指定。
option():指定 TCP 相关的参数,以及 netty 自定义的参数
配置参数的过程,称之为初始化。
2)运行
// 启动并设置端口号 但需要使用sync异步启动
try {
ChannelFuture future = serverBootstrap.bind(8888).sync();
// 将关闭通道的方式 也设置为异步的
// 阻塞finally中的代码执行
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 优雅关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
bind(),将服务端的 channel 绑定到端口号,然后接收客户端的连接,让整个 netty 运行起来。
sync(),因为绑定事件是异步的,所以使用 sync 同步等待结果,换句话说,bind 只触发了绑定端口的事件,需要使用 sync 等待事件执行的结果。
future.channel().closeFuture().sync(),含义为,当通道被关闭时,才执行后续的操作,sync 使当前线程执行到此处阻塞,以确保不执行后续的 shutdown 方法。
3)源码解析
A 类声明
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
嵌套的泛型使用,可以达到,子类中返回子类本身的效果,具体如下:
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
B init 方法
工作内容
a、设置 channel 相关的选项参数 b、设置 channel 的属性键值对 c、添加对 channel 的 IO 事件处理器 (Acceptor 角色)
void init(Channel channel) {
// 设置channel相关的选项参数
setChannelOptions(channel, newOptionsArray(), logger);
// 设置channel的属性键值对
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 添加对channel的IO事件处理器 (Acceptor角色)
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
Acceptor 分析
功能:将主 reactor 接收到的客户端通道,传递给从 reactor
// ServerBootstrapAcceptor是ServerBootstrap的静态内部类
// netty将acceptor看作一个处理器,并且是入站处理器
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 具体的逻辑封装到channelRead()方法中
// 对客户端通道进行配置 , 然后注册到从Reactor中
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 此时msg对应 服务端接收到的客户端通道
final Channel child = (Channel) msg;
// 设置处理链路
child.pipeline().addLast(childHandler);
// 设置通道的配置项和参数
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// childGroup 是从reactor的资源池 调用注册方法 注册客户端通道child
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 增加监听 获取注册的异步结果
// 如果注册失败 或者抛出异常 都会关闭channel
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
// 如果处理客户端连接失败了,暂停一秒,然后继续接受
// 为保障服务端能够尽可能多的处理客户端的连接 不受某一次处理失败的结果影响
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
4) Future 和 Promise
Future 代表的是,一个还未完成的异步任务的执行结果。可以通过 addListener 方法,监听执行结果后进行相应的处理,此时它的状态可以分为未完成、已完成(成功、失败、主动取消)等。
对 Future 而言,状态只能读取,无法更改,又出现了 Promise,但是 Promise 只能更改一次。
参照生活中的定额发票(Future)和机打发票(Promise)。
(四)拓展篇
1、心跳检测
检测逻辑: 1) 服务端启动,客户端建立连接,连接的目的是互相发送消息。 2) 如果客户端在工作,服务端一定能收到数据,如果客户端空闲,服务端会出现资源浪费。 3) 服务端需要一种检测机制,验证客户端的活跃状态,不活跃则关闭。
需求设计: 1) 客户端向服务端发送 “I am alive” , sleep 一个随机时间,模拟空闲状态 2) 服务端收到消息后,返回“over”, 客户端有空闲,记录空闲次数 3) 设定阈值,达到阈值时主动关闭连接
IdleStateHandler , 是 netty 提供的处理器 1)超过多长时间没有读 readerIdleTime
- 超过多长时间没有写 writerIdleTime
- 超过多长时间没有读和写 allIdleTime
底层实现检测的是 IdleStateEvent 事件,通过管道传递给下一个 handler 处理,处理方法是 userEventTriggered。
其中 IdleStateEvent 事件,分为 READER_IDLE、WRITER_IDLE、ALL_IDLE 三大类
2、TCP 粘包拆包
TCP 是基于流的。
产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。
UDP 会发生拆包和粘包吗? 不会,UDP 是基于报文的,在 UDP 的首部使用 16bit 存储报文的长度,因此不会发生。
TCP 发生粘包和拆包的本质原因:
要发送的数据先经过 TCP 的缓冲区,还限制了最大报文长度。
A 如果要发送的数据 > TCP 剩余的缓冲区大小,发生拆包
B 如果要发送的数据 > 最大报文长度,发生拆包
C 如果要发送的数据 << TCP 剩余的缓冲区大小,发生粘包
D 接收数据的应用层,没有及时读取缓冲区数据,也会发生粘包
解决办法: A 设置出消息的长度 B 设置出消息的边界——分隔符
Netty 提供的解码器,两类
A 基于长度的解码器,在包头部设置出数据的长度。(类似于 UDP 的头部处理) LengthFieldBasedFrameDecoder 自定义长度的处理方式 FixedLengthFrameDecoder 固定长度的处理方式
B 基于分隔符的解码器 DelimiterBasedFrameDecoder 自定义分隔符的处理方式 LineBasedFrameDecoder 行尾("\n"或"\r\n")分隔符的处理方式
【Demo 逻辑】 需求:客户端循环 100 次向服务端请求时间
1)第一种方式,传输的过程数据单位是字节流 ByteBuf,需要自行处理分隔符以及数据的长度,此时会出现粘包和拆包的问题
2)第二种方式,使用 LineBasedFrameDecoder,配合 StringDecoder 使用,传输的数据单位变成字符串,可以直接处理,保证业务逻辑上的包和真正传输的包是一致的
3、序列化框架——protobuf
protobuf = protocol buffers 类似于 xml 的生成和解析,但效率更高,生成的是字节码,可读性稍差。
【Demo 逻辑】
1)安装 idea 插件,protobuf support
如果安装之后,创建*.proto 文件没有使用插件,手动设置关联关系
settings -> file types -> 找到 protobuf -> 增加正则表达式
2)引入 maven 依赖和插件
<properties>
<os.detected.classifier>windows-x86_64</os.detected.classifier>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:3.1.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
3)在右侧 maven project 中可以找到相应的插件 (没有的话刷新)
4)在和 java 平级的目录下,创建 proto 文件夹,然后创建 person.proto 文件 5)person.proto
// 声明包名称的空间
syntax="proto3";
// 具体的类生成目录
option java_package="com.duing";
// 具体的类名
option java_outer_classname="PersonModel";
// 类结构
message Person{
int32 id = 1; // 此处的1代表顺序
string name = 2;
}
- 使用插件进行编译,将编译生成的代码拷贝到需要的目录下 7)编写测例进行序列化和反序列化操作