一般我们想要使用tcp通信,最原始的方法就是使用socket套接字,但是每次都要创建新的socket对象,并且在finally中关闭,好浪费资源,很麻烦。所以就是用netty工具,可以轻松地实现tcp通信,当然也可以实现http通信,因为我们需要用tcp通信,所以这里只研究tcp。
最好先了解一下netty是干嘛的,以及NIO是什么。
netty:
https://blog.csdn.net/qq_34730511/article/details/98472924?ops_request_misc=&request_id=&biz_id=102&utm_term=netty&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-8-98472924.nonecase&spm=1018.2226.3001.4187
BIO NIO:
https://blog.csdn.net/weixin_34301132/article/details/85786733?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522164499824216780264047131%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=164499824216780264047131&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-85786733.first_rank_v2_pc_rank_v29&utm_term=NIO+BIO+AIO&spm=1018.2226.3001.4187
接下来就建项目了。这里我用的是maven项目,jdk1.8
- 引入netty的依赖
<!--netty配置-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
创建NettyServer类,在这个类中先创建一个方法,然后在这个方法中创建事件线程组,这个构造器里面可以不传参数。写一个run()方法,里边对需要的参数进行配置,
package com.yy.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
public class NettyServer {
public void run() {
//创建事件线程组,
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
//给服务启动器配置参数
serverBootstrap.group(boosGroup, workerGroup)//绑定线程组
.channel(NioServerSocketChannel.class)//Nio模式
.option(ChannelOption.SO_BACKLOG, 1024)//标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。主要是作用于boss线程,用于处理新连接
.childOption(ChannelOption.SO_KEEPALIVE, true)//启用心跳保活机制,主要作用与worker线程,也就是已创建的channel。
.handler(new LoggingHandler("DEBUG"))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringDecoder())//加载编解码器
.addLast(new StringEncoder())
.addLast(new NettyServerHandler());//加载handler处理器
}
});
//绑定端口,这里用到的是ChannelFuture类
ChannelFuture future = serverBootstrap.bind(8080).sync();
System.out.println("服务端启动成功....");
//关闭通道
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//在这里关闭事件线程组,释放资源
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new NettyServer().run();
}
}
再创建另外一个处理类,用于接收客户端发送的消息,并且给客户端发送应答,这个类继承于ChannelInboundHandlerAdapter类,重写其中的方法即可:
package com.yy.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetAddress;
/**
* 接收客户端发送的消息并返回应答
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = Logger.getLogger(NettyServerHandler.class);
/**
* 读取客户端的数据,也可以在这个方法中给ctx赋值,也就是给客户端返回的应答
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf in = (ByteBuf)msg;
// int readableBytes = in.readableBytes();
// byte[] bytes =new byte[readableBytes];
// in.readBytes(bytes);
// System.out.println("服务端接收的倒的消息:"+new String(bytes));
// System.out.println("远程连接到的地址为:"+ctx.channel().remoteAddress());
//System.out.print(in.toString(CharsetUtil.UTF_8));
System.out.println("服务端接收到一条来自客户端的消息:"+msg);
ctx.write("我是服务端,我收到客户端的消息了,我现在给你返回");
// logger.error("服务端接受的消息 : " + msg);
}
/**
* 发生异常时关闭
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
/**
* 读取数据完毕,就刷新ctx对象,把应答的信息推送给客户端
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
//也可以使用下面的方法,一次性将应答信息写入并且推送出去,但是一般不这么用,一般是在read方法中返回应答信息,处理复杂的业务逻辑,在complete方法中推送
//ctx.writeAndFlush(Unpooled.copiedBuffer("Hello World,I am Server.", CharsetUtil.UTF_8));
}
/**
* 将object类型转为字节数组的方法
* @param object
* @return
* @throws IOException
*/
public static byte[] objectToBytes(final Serializable object) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
oos.flush();
return baos.toByteArray();
} finally {
if (oos != null) {
oos.close();
}
if (baos != null) {
baos.close();
}
}
}
}
3)创建NettyClient类
package com.yy.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private SocketChannel socketChannel;
public void run(Object msg){
//配置线程组
EventLoopGroup group = new NioEventLoopGroup();
//创建服务启动器
Bootstrap bootstrap = new Bootstrap();
//配置参数
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringEncoder())
.addLast(new StringDecoder())
.addLast(new NettyClientHandler());
}
})
.remoteAddress("127.0.0.1",8080);
//连接
ChannelFuture future = bootstrap.connect();
System.out.println("客户端正在连接服务端...");
//客户端断线重连逻辑
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
System.out.println("连接Netty服务端成功");
future.channel().writeAndFlush(msg);
} else {
System.out.println("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(() -> run(msg), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
}
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient();
nettyClient.run("hi");
}
}
创建Client的处理类
package com.yy.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.apache.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(NettyClientHandler.class);
private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static byte[] responseByte;
/**
* 管道就绪时触发该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("云上建立连接时:" + sdf.format(new Date()));
//ctx.writeAndFlush(Unpooled.copiedBuffer("Hello World,I am Client.", CharsetUtil.UTF_8));
}
/**
* 管道读取事件时会触发该方法,
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("客户端收到服务端返回的消息:"+msg);
byte[] bytes = objectToBytes((Serializable) msg);
//byte []scaleMsg=(byte[]) msg;
StringBuffer originalData=new StringBuffer();
for (byte b : bytes) {
originalData.append(b+" ");
}
System.out.println(originalData.toString());
if(bytes!=null){
responseByte = bytes;
}
}
/**
* 发生异常时触发
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public static byte[] objectToBytes(final Serializable object) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = null;
try {
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
oos.flush();
return baos.toByteArray();
} finally {
if (oos != null) {
oos.close();
}
if (baos != null) {
baos.close();
}
}
}
}
单机测试是否能连通,先启动服务端代码,再启动客户端代码,就可以直接发数据了,效果如下图所示:
服务端:
客户端:

现在暂时只能发送一次数据,但是服务端始终处于监听的状态,后边再研究吧!
