`
MoonMonster
  • 浏览: 35826 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

netty处理tcp粘包/拆包问题

阅读更多

 所谓的粘包/拆包,用一个例子来说明就是:

加入客户端向服务端发送1000条数据,如果不加以处理的话,那么服务端接收的数据可能就是如图所示了:



 数据要么几段粘在了一起,要么一段数据被拆成了几段,这肯定会造成很大的影响。

 

而解决后的所接收的正确数据该如下所示:



 

 

简单讲了一下粘包/拆包是什么样的问题,详细解释可见csdn博客

http://blog.csdn.net/binghuazh/article/details/4222516

 

 

 

客户端代码:

package com.netty.dealpacket;

import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;

/**
 * @author Chalmers 2016年2月24日 下午2:35:39
 */
public class Client {

	public static void main(String[] args) throws UnknownHostException,
			IOException {
		Socket socket = new Socket("127.0.0.1", 9090);

		String message = "hello";

		byte[] bytes = message.getBytes();

		// 设置空间大小为一个存储了长度的int型数据(长度)加上转换后的byte数组
		ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);
		// 将长度存入
		buffer.putInt(bytes.length);
		// 将数据存入
		buffer.put(bytes);

		// 转换成字节数组
		byte[] array = buffer.array();

		// 向服务端发送1000次
		for (int i = 0; i < 1000; i++) {
			socket.getOutputStream().write(array);
		}

		// 关闭
		socket.close();
	}
}

 

 

处理问题代码:

package com.netty.dealpacket;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

/**
 * @author Chalmers 2016年2月24日 下午2:23:49
 */
public class MyDecoder extends FrameDecoder {

	@Override
	protected Object decode(ChannelHandlerContext chc, Channel channel,
			ChannelBuffer buffer) throws Exception {

		// 如果buffer中的可读字节大于4个(即除了长度以外还有数据,因为长度可能是为0的)
		if (buffer.readableBytes() > 4) {

			// 标记,指向当前指针位置,读取数据时使用
			buffer.markReaderIndex();
			// 取得长度
			int len = buffer.readInt();

			// 如果剩余可读字节小于长度的话,则表明发生了拆包现象,那么不对它进行处理
			if (buffer.readableBytes() < len) {
				// 重置标记
				buffer.resetReaderIndex();

				// 返回null,表示等待
				return null;
			}

			// 对数据进行处理
			byte[] bytes = new byte[len];
			buffer.readBytes(bytes);
			// 将数据返回到ServerHandler中进行处理
			return new String(bytes);
		}

		return null;
	}

}

 

 

package com.netty.dealpacket;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

/**
 * @author Chalmers 2016年2月24日 下午2:22:41
 */
public class ServerHandler extends SimpleChannelHandler {

	int count = 1;

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		// 对从MyDecoder中传递过来的数据进行处理
		System.out.println((String) e.getMessage() + "  " + count);
		count++;
	}
}

 

 

服务端代码:

package com.netty.dealpacket;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringEncoder;

/**
 * @author Chalmers 2016年2月24日 下午2:21:33
 */
public class Server {

	public static void main(String[] args) {
		ServerBootstrap serverBootstrap = new ServerBootstrap();

		ExecutorService boss = Executors.newCachedThreadPool();
		ExecutorService worker = Executors.newCachedThreadPool();

		serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,
				worker));
		serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {

			@Override
			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline = Channels.pipeline();
				pipeline.addLast("decoder", new MyDecoder());
				pipeline.addLast("encoder", new StringEncoder());
				pipeline.addLast("handler", new ServerHandler());

				return pipeline;
			}
		});

		serverBootstrap.bind(new InetSocketAddress(9090));
		System.out.println("start...");
	}
}

 

 

  • 大小: 17.4 KB
  • 大小: 16.6 KB
1
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics