基于自定义协议的Netty开发

在某些情况下,我们需要基于TCP/IP协议栈开发自己的应用层协议,以满足特定的业务需求。在这篇博客中,我们将使用Netty开发一个基于自定义协议的客户端和服务器。

1. 环境准备

首先,我们需要在项目中引入Netty的依赖。在Maven项目中,添加以下依赖:

1
2
3
4
5
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.65.Final</version>
</dependency>

2. 自定义协议设计

在这个示例中,我们自定义一个简单的协议,协议格式如下:

| 魔数(4字节) | 版本号(1字节) | 序列号(1字节) | 命令类型(1字节) | 数据长度(4字节) | 数据内容(可变长度) |

其中,魔数用于标识协议,版本号用于标识协议版本,序列号用于标识消息的序列,命令类型用于标识消息类型,数据长度用于标识数据内容的长度,数据内容用于存储实际的消息数据。

3. 自定义协议编码解码器开发

接下来,我们需要开发一个自定义协议的编码解码器,用于将消息对象转换为字节流,以及将字节流转换为消息对象。

3.1 创建Message对象

首先,我们创建一个Message对象,用于存储消息数据:

1
2
3
4
5
6
7
8
9
public class Message {

    private byte version;
    private byte sequence;
    private byte commandType;
    private byte[] data;

    // getter和setter方法
}

3.2 创建MessageEncoder

接下来,我们创建一个MessageEncoder,用于将Message对象编码为字节流:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public class MessageEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 写入魔数
        out.writeBytes(new byte[]{(byte) 0xAB, (byte) 0xCD, (byte) 0xEF, (byte) 0x12});
        // 写入版本号
        out.writeByte(msg.getVersion());
        // 写入序列号
        out.writeByte(msg.getSequence());
        // 写入命令类型
        out.writeByte(msg.getCommandType());
        // 写入数据长度
        out.writeInt(msg.getData().length);
        // 写入数据内容
        out.writeBytes(msg.getData());
    }
}

3.3 创建MessageDecoder

接下来,我们创建一个MessageDecoder,用于将字节流解码为Message对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class MessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 检查可读字节数
        if (in.readableBytes() < 11) {
            return;
        }

        // 读取魔数
        byte[] magic = new byte[4];
        in.readBytes(magic);
        // 读取版本号
        byte version = in.readByte();
        // 读取序列号
        byte sequence = in.readByte();
        // 读取命令类型
        byte commandType = in.readByte();
        // 读取数据长度
        int dataLength = in.readInt();
        // 检查可读字节数
        if (in.readableBytes() < dataLength) {
            return;
        }

        // 读取数据内容
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        // 创建Message对象
        Message msg = new Message();
        msg.setVersion(version);
        msg.setSequence(sequence);
        msg.setCommandType(commandType);
        msg.setData(data);

        // 添加到出站消息列表
        out.add(msg);
    }
}

4. 自定义协议服务器开发

接下来,我们创建一个基于自定义协议的服务器,用于接收和处理客户端的消息。

4.1 创建ServerHandler

首先,我们创建一个ServerHandler,用于处理客户端的消息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ServerHandler extends SimpleChannelInboundHandler<Message> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 打印消息数据
        System.out.println("收到客户端消息:" + new String(msg.getData(), "UTF-8"));

        // 回复客户端
        byte[] responseData = "Hello, Netty!".getBytes("UTF-8");
        Message responseMsg = new Message();
        responseMsg.setVersion(msg.getVersion());
        responseMsg.setSequence(msg.getSequence());
        responseMsg.setCommandType((byte) (msg.getCommandType() + 1));
        responseMsg.setData(responseData);
        ctx.writeAndFlush(responseMsg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

4.2 创建Server

接下来,我们创建一个Server,用于绑定端口并启动服务器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Server {

    public static void main(String[] args) throws Exception {
        // 创建EventLoopGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建ServerBootstrap
            ServerBootstrap bootstrap = new ServerBootstrap();

            // 设置参数
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 添加自定义协议编码解码器
                            ch.pipeline().addLast(new MessageDecoder());
                            ch.pipeline().addLast(new MessageEncoder());
                            // 添加ServerHandler
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口并启动
            ChannelFuture future = bootstrap.bind(8080).sync();

            // 等待服务器关闭
            future.channel().closeFuture().sync();
        } finally {
            // 释放资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

5. 自定义协议客户端开发

接下来,我们创建一个基于自定义协议的客户端,用于连接服务器并发送消息。

5.1 创建ClientHandler

首先,我们创建一个ClientHandler,用于处理服务器的消息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
public class ClientHandler extends SimpleChannelInboundHandler<Message> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
        // 打印服务器消息
        System.out.println("收到服务器消息:" + new String(msg.getData(), "UTF-8"));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

5.2 创建Client

接下来,我们创建一个Client,用于连接服务器并发送消息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class Client {

    public static void main(String[] args) throws Exception {
        // 创建EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            // 创建Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            // 设置参数
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 添加自定义协议编码解码器
                            ch.pipeline().addLast(new MessageDecoder());
                            ch.pipeline().addLast(new MessageEncoder());
                            // 添加ClientHandler
                            ch.pipeline().addLast(new ClientHandler());
                        }
                    })
                    .option(ChannelOption.SO_KEEPALIVE, true);

            // 连接服务器
            ChannelFuture future = bootstrap.connect("localhost", 8080).sync();

            // 发送消息
            byte[] data = "Hello, Server!".getBytes("UTF-8");
            Message msg = new Message();
            msg.setVersion((byte) 1);
            msg.setSequence((byte) 1);
            msg.setCommandType((byte) 1);
            msg.setData(data);
            future.channel().writeAndFlush(msg);

            // 等待连接关闭
            future.channel().closeFuture().sync();
        } finally {
            // 释放资源
            group.shutdownGracefully();
        }
    }
}

6. 测试

现在,我们可以使用我们的自定义协议客户端连接我们的自定义协议服务器,并进行消息通信了。

7. 总结

在这篇博客中,我们使用Netty开发了一个基于自定义协议的客户端和服务器。通过创建Message对象存储消息数据,并创建MessageEncoder和MessageDecoder实现自定义协议的编码解码,我们实现了一个可以进行消息通信的自定义协议系统。当然,这只是一个简单的示例,实际开发中可能需要处理更复杂的情景,但这个示例已经展示了基于自定义协议的Netty开发的基本思路。