Netty入门之编写EchoServer
Netty是知名的Java高性能网络编程框架,通过学习和使用netty,我们可以编写出支持超过10万并发量的Java网络服务器
Netty核心组件
- Channel
- Callback
- Future
- Event和ChannelHandler
Channel
Channel 是 NIO 基本的结构。它代表了一个用于连接到实体如硬件设备、文件、网络套接字或程序组件,能够执行一个或多个不同的 I/O 操作(例如读或写)的开放连接。
现在,把 Channel 想象成一个可以“打开”或“关闭”,“连接”或“断开”和作为传入和传出数据的运输工具。
Callback
callback (回调)是一个简单的方法,提供给另一种方法作为引用,这样后者就可以在某个合适的时间调用前者。这种技术被广泛使用在各种编程的情况下,最常见的方法之一通知给其他人操作已完成。
Netty 内部使用回调处理事件时。一旦这样的回调被触发,事件可以由接口 ChannelHandler 的实现来处理。如下面的代码,一旦一个新的连接建立了,调用 channelActive(),并将打印一条消息。
public class ConnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client " + ctx.channel().remoteAddress() + " connected");
}
}
Future
Future 提供了另外一种通知应用操作已经完成的方式。这个对象作为一个异步操作结果的占位符,它将在将来的某个时候完成并提供结果。
JDK 附带接口 java.util.concurrent.Future ,但所提供的实现只允许您手动检查操作是否完成或阻塞了。这是很麻烦的,所以 Netty 提供自己了的实现,ChannelFuture,用于在执行异步操作时使用。
ChannelFuture 提供多个附件方法来允许一个或者多个 ChannelFutureListener 实例。这个回调方法 operationComplete() 会在操作完成时调用。事件监听者能够确认这个操作是否成功或者是错误。如果是后者,我们可以检索到产生的 Throwable。简而言之, ChannelFutureListener 提供的通知机制不需要手动检查操作是否完成的。
每个 Netty 的 outbound I/O 操作都会返回一个 ChannelFuture;这样就不会阻塞。这就是 Netty 所谓的“自底向上的异步和事件驱动”。
下面例子简单的演示了作为 I/O 操作的一部分 ChannelFuture 的返回。当调用 connect() 将会直接是非阻塞的,并且调用在背后完成。由于线程是非阻塞的,所以无需等待操作完成,而可以去干其他事,因此这令资源利用更高效。
Channel channel = ...;
//不会阻塞
ChannelFuture future = channel.connect(
new InetSocketAddress("192.168.0.1", 25));
下面代码描述了如何利用 ChannelFutureListener 。首先,连接到远程地址。接着,通过 ChannelFuture 调用 connect() 来 注册一个新ChannelFutureListener。当监听器被通知连接完成,我们检查状态。如果是成功,就写数据到 Channel,否则我们检索 ChannelFuture 中的Throwable。
注意,错误的处理取决于你的项目。当然,特定的错误是需要加以约束 的。例如,在连接失败的情况下你可以尝试连接到另一个。
Channel channel = ...;
//不会阻塞
ChannelFuture future = channel.connect( //1
new InetSocketAddress("192.168.0.1", 25));
future.addListener(new ChannelFutureListener() { //2
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) { //3
ByteBuf buffer = Unpooled.copiedBuffer(
"Hello", Charset.defaultCharset()); //4
ChannelFuture wf = future.channel().writeAndFlush(buffer); //5
// ...
} else {
Throwable cause = future.cause(); //6
cause.printStackTrace();
}
}
});
Event和Handler
事件和ChannelHandler。Netty使用不同的事件来通知我们状态的改变或者是操作的状态。Netty提供了大量的预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议的ChannelHandler。
小结
Netty 的异步编程模型是建立在 future 和 callback 的概念上的。所有这些元素的协同为自己的设计提供了强大的力量。
拦截操作和转换入站或出站数据只需要您提供回调或利用 future 操作返回的。这使得链操作简单、高效,促进编写可重用的、通用的代码。一个 Netty 的设计的主要目标是促进“关注点分离”:你的业务逻辑从网络基础设施应用程序中分离。
Netty 通过触发事件从应用程序中抽象出 Selector,从而避免手写调度代码。EventLoop 分配给每个 Channel 来处理所有的事件,包括
- 注册有趣的事件
- 调度事件到 ChannelHandler
- 安排进一步行动
该 EventLoop 本身是由只有一个线程驱动,它给一个 Channel 处理所有的 I/O 事件,并且在 EventLoop 的生命周期内不会改变。这个简单而强大的线程模型消除你可能对你的 ChannelHandler 同步的任何关注,这样你就可以专注于提供正确的回调逻辑来执行。该 API 是简单和紧凑。
编写Echo服务器
所有的Netty服务器都需要至少一个ChannelHandler以及引导,ChannelHandler实现了服务器对客户端接收的数据的处理,引导是服务器的启动代码。
针对不同类型的事件来调用ChannelHandler;应用程序通过实现或者扩展ChannelHandler来挂钩到事件的生命周期,并且提供自定义的应用程序逻辑;在架构上,ChannelHandler有助于保持业务逻辑与网络处理代码的分离。这简化了开发过程,因为代码必须不断地演化以响应不断变化的需求。
引导服务器需要绑定服务器将在其上监听并接受传入连接请求的端口;配置Channel,以将有关的入站消息通知给EchoServerHandler实例。
ChannelHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
//ChannelInboundHandlerAdapter提供了ChannelInboundHandler默认实现
//Sharable表示可以被多个Channel共享
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
//消息记录到控制台
System.out.println("Server received: " + in.toString(Charset.defaultCharset()));
//消息写给发送者
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//未决消息冲刷到远程节点,且关闭该channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
//关闭channel
ctx.close();
}
}
EchoServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
//一个简单的Netty服务器,将客户端发送的消息全部返还给客户端
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
}
//设置端口值,如果端口置格式不正确,则抛出NumberFormatException
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
private void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
//异步绑定服务器,调用sync方法阻塞等待直到绑定完成
ChannelFuture f = b.bind().sync();
//获取Channel的CloseFuture,并且阻塞当前线程直到它完成
f.channel().closeFuture().sync();
} finally {
//关闭EventLoopGroup,释放所有资源
group.shutdownGracefully().sync();
}
}
}
编写客户端
客户端需要实现以下功能:
- 连接到服务器
- 发送一个或者多个消息
- 对于每个连接,等待并接收从服务器发回的相同的消息
- 关闭连接
客户端也需要通过ChannelHandler实现客户端逻辑
ChannelHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.StandardCharsets;
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//当被通知Channel是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", StandardCharsets.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
//记录已接收消息的转储
System.out.println("Client received: " + in.toString(StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
EchoClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.out.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}