网络传输层协议有两种,一种是TCP,另外一种是UDP。
TCP是一种面向连接的协议,提供可靠的数据传输。TCP通过三次握手建立连接,并通过确认和重传机制,保证数据的完整性和可靠性。TCP适用于对数据准确性要求较高、对实时性要求较低的应用场景,如网页浏览、文件传输等。
UDP是一种无连接的协议,不保证数据的可靠性传输。UDP通过尽力交付数据包的方式进行传输,不对数据包的传输状态进行确认和重传,因此速度较快。UDP适用于对实时性要求较高、对数据准确性要求较低的应用场景,如视频传输、语音通信等。
那么,这两种有哪些区别呢?请看下面:
总结:我们可以根据具体的应用场景和需求选择使用UDP或TCP进行数据传输。如果对数据的实时性要求较高,且对数据准确性要求较低,可以选择使用UDP。如果对数据的准确性要求较高,可以选择使用TCP。
游戏由于对数据的准确性(不允许丢包,乱序)非常高,一般都是选择基于TCP的socket通信。但UDP的低时延,快速传输对实时性要求非常高的游戏类型也是非常大的吸引力。据说,魔兽世界以及Dota2使用UDP开发。当然,使用udp通信的游戏肯定在通信层做了适配,保证关键数据不丢包,不乱序。
近年来,基于UDP协议的KCP(Kuai Control Protocol,快速可靠传输协议),也得到了快速的发展。据说,原神是使用KCP通信的。
netty使用udp协议,网上的例子都是非常简单的。都是两个类搞定。没有解决以下几个问题:
本文主要就这两个问题进行案例说明。
udp是无连接的,这意味着通信双方无需像TCP那般“三次握手四次释放”。只要知道对方的socket地址(Ip+Port),即可发送数据,发完即终止。在Netty里,udp的通信载体叫做DatagramPacket,负责将数据(ByteBuf)从源头发送到目的地。
public class DatagramPacket extends DefaultAddressedEnvelope implements ByteBufHolder { public DatagramPacket(ByteBuf data, InetSocketAddress recipient) { super(data, recipient); } public DatagramPacket(ByteBuf data, InetSocketAddress recipient, InetSocketAddress sender) { super(data, recipient, sender); } }
而我们的网络底层通信是基于javabean的,因此定义我们的消息基类如下:
public class UdpMessage implements Message { private String senderIp; private int senderPort; private String receiverIp; private int receiverPort; }
我们将私有协议栈定义为 包头(消息类型id),包体(具体消息经编码后的字节数组)。将自定义消息UdpMessage转为DatagramPacket。
public class UdpProtocolEncoder extends MessageToMessageEncoder { private static final Logger logger = LoggerFactory.getLogger("socketserver"); private final MessageFactory messageFactory; private final MessageCodec messageCodec; public UdpProtocolEncoder(MessageFactory messageFactory, MessageCodec messageCodec) { this.messageFactory = messageFactory; this.messageCodec = messageCodec; } @Override protected void encode(ChannelHandlerContext ctx, UdpMessage message, List
这里消息pojo编码,使用了jforgame的组件,根据javabean的字段元信息,自动编码为byte数组。
依赖申明如下:
io.github.jforgame jforgame-codec-struct 1.1.0
私有协议栈解码负责将数据包DatagramPacket转为UdpMessage。将底层数据流转为ByteBuf之后,还需要将字节数据进行解码,才可以转换为应用程序认识的消息。
public class UdpProtocolDecoder extends MessageToMessageDecoder { private final MessageFactory messageFactory; private final MessageCodec messageCodec; public UdpProtocolDecoder(MessageFactory messageFactory, MessageCodec messageCodec) { this.messageFactory = messageFactory; this.messageCodec = messageCodec; } @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List
服务端会话管理(建立及摧毁),以及消息接受(下文的channelRead方法)。
@ChannelHandler.Sharable public class UdpChannelIoHandler extends ChannelInboundHandlerAdapter { private final static Logger logger = LoggerFactory.getLogger("socketserver"); /** 消息分发器 */ private final SocketIoDispatcher messageDispatcher; public UdpChannelIoHandler(SocketIoDispatcher messageDispatcher) { super(); this.messageDispatcher = messageDispatcher; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); ChannelUtils.duplicateBindingSession(ctx.channel(), new NSession(channel)); SessionManager.getInstance().buildSession(ChannelUtils.getSessionBy(channel)); System.out.println("socket register " + channel); } @Override public void channelRead(ChannelHandlerContext context, Object packet) throws Exception { logger.debug("receive pact, content is {}", packet.getClass().getSimpleName()); final Channel channel = context.channel(); IdSession session = ChannelUtils.getSessionBy(channel); messageDispatcher.dispatch(session, packet); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("socket inactive " + channel); IdSession userSession = ChannelUtils.getSessionBy(channel); messageDispatcher.onSessionClosed(userSession); } }
需要注意的是,UDP Socket Server的链接建立,只在启动的时候触发一次。之后,无论有多少个客户端,上文的channelActive都不会再次触发。也就是说,客户端的链接不是一对一的,全局只有一个服务端链接。这点与tcp不同。那么,怎么区分不同的客户端呢?后文例子揭晓。
具体的消息处理(通过消息路由及消息处理器注解)。网关接受到消息之后,自动把消息分发到对应的处理器。类似与springmvc的Controller以及RequestMapper功能。
public class MessageIoDispatcher extends ChainedMessageDispatcher { private MessageHandlerRegister handlerRegister; MessageFactory messageFactory = GameMessageFactory.getInstance(); private MessageParameterConverter msgParameterConverter= new DefaultMessageParameterConverter(messageFactory); public MessageIoDispatcher() { LoginRouter router = new LoginRouter(); this.handlerRegister = new CommonMessageHandlerRegister(Collections.singletonList(router), messageFactory); MessageHandler messageHandler = (session, message) -> { int cmd = GameMessageFactory.getInstance().getMessageId(message.getClass()); MessageExecutor cmdExecutor = handlerRegister.getMessageExecutor(cmd); if (cmdExecutor == null) { logger.error("message executor missed, cmd={}", cmd); return true; } Object[] params = msgParameterConverter.convertToMethodParams(session, cmdExecutor.getParams(), message); Object controller = cmdExecutor.getHandler(); MessageTask task = MessageTask.valueOf(session, session.hashCode(), controller, cmdExecutor.getMethod(), params); task.setRequest(message); // 丢到任务消息队列,不在io线程进行业务处理 GameServer.getMonitorGameExecutor().accept(task); return true; }; addMessageHandler(messageHandler); } @Override public void onSessionCreated(IdSession session) { } @Override public void onSessionClosed(IdSession session) { } }
public class UdpSocketServer implements ServerNode { private static final Logger logger = LoggerFactory.getLogger("socketserver"); private EventLoopGroup group = new NioEventLoopGroup(); protected HostAndPort nodesConfig = HostAndPort.valueOf(8088); public SocketIoDispatcher socketIoDispatcher; public MessageFactory messageFactory; public MessageCodec messageCodec; @Override public void start() throws Exception { try { SessionManager.getInstance().schedule(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioDatagramChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .handler(new ChannelInitializer() { @Override public void initChannel(DatagramChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("protocolDecoder", new UdpProtocolDecoder(messageFactory, messageCodec)); pipeline.addLast("protocolEncoder", new UdpProtocolEncoder(messageFactory, messageCodec)); pipeline.addLast(new UdpChannelIoHandler(socketIoDispatcher)); } }); logger.info("socket server is listening at " + nodesConfig.getPort() + "......"); bootstrap.bind(nodesConfig.getPort()).sync().channel().closeFuture().sync(); } catch (Exception e) { logger.error("", e); group.shutdownGracefully(); } } @Override public void shutdown() throws Exception { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { UdpSocketServer udpSocketServer = new UdpSocketServer(); udpSocketServer.messageFactory = GameMessageFactory.getInstance(); udpSocketServer.messageCodec = new StructMessageCodec(); udpSocketServer.socketIoDispatcher = new MessageIoDispatcher(); udpSocketServer.start(); } }
public class UdpSocketClient extends AbstractSocketClient { private final EventLoopGroup group = new NioEventLoopGroup(1); private HostAndPort nativeHostPort; public UdpSocketClient(SocketIoDispatcher messageDispatcher, MessageFactory messageFactory, MessageCodec messageCodec, HostAndPort hostPort) { this.ioDispatcher = messageDispatcher; this.messageFactory = messageFactory; this.messageCodec = messageCodec; this.targetAddress = hostPort; } @Override public IdSession openSession() throws IOException { try { final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioDatagramChannel.class); bootstrap.group(nioEventLoopGroup); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); bootstrap.handler(new UdpProtoBufClientChannelInitializer()); ChannelFuture f = bootstrap.connect(new InetSocketAddress(targetAddress.getHost(), targetAddress.getPort()), new InetSocketAddress(nativeHostPort.getHost(), nativeHostPort.getPort())).sync(); IdSession session = new NSession(f.channel()); this.session = session; return session; } catch (Exception e) { group.shutdownGracefully(); throw new IOException(e); } } @Override public void close() throws IOException { this.session.close(); } public void send(UdpMessage message) { message.setSenderIp(nativeHostPort.getHost()); message.setSenderPort(nativeHostPort.getPort()); message.setReceiverIp(targetAddress.getHost()); message.setReceiverPort(targetAddress.getPort()); session.send(message); } class UdpProtoBufClientChannelInitializer extends ChannelInitializer { @Override protected void initChannel(NioDatagramChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("protocolDecoder", new UdpProtocolDecoder(messageFactory, messageCodec)); pipeline.addLast("protocolEncoder", new UdpProtocolEncoder(messageFactory, messageCodec)); pipeline.addLast(new UdpChannelIoHandler(ioDispatcher)); } } }
这里需要注意下:
客户端需要指定端口与服务器通信,这样才能方便消息包携带自己的地址信息。
客户端测试代码如下:
模拟10个玩家登录。读者可自行修改程序。udp是不可靠链接,不保证交付。如果在外网跑,是会出现消息丢包,或者乱序。在本地跑,是不太可能出现这种情况。读者可自行测试,同一个角色发送一大堆消息(附加上次序字段),看服务器收到的消息序号是否完整有序。
private static AtomicLong idFactory = new AtomicLong(1000); public static void main(String[] args) throws Exception { MessageCodec messageCodec = new StructMessageCodec(); GameMessageFactory.getInstance().registeredClassTypes().forEach(Codec::getSerializer); for (int i = 0; i < 10; i++) { System.out.println("----------i=" + i); UdpSocketClient socketClient = new UdpSocketClient(new SocketIoDispatcherAdapter() { @Override public void dispatch(IdSession session, Object message) { System.out.println("receive package ---------" + JsonUtil.object2String(message)); } }, GameMessageFactory.getInstance(), messageCodec, HostAndPort.valueOf(8088)); socketClient.nativeHostPort = HostAndPort.valueOf(8099 + i); socketClient.openSession(); for (int j = 0; j < 1; j++) { ReqLogin req = new ReqLogin(); req.setPlayerId(idFactory.getAndIncrement()); socketClient.send(req); } } }
我们以上面的代码,实现一个简单的游戏逻辑。
@MessageMeta(cmd = 55555) public class ReqLogin extends UdpMessage { private long playerId; }
@MessageMeta(cmd = 55556) public class ResPlayerLogin extends UdpMessage { private long playerId; }
@MessageRoute public class LoginRouter { @RequestHandler public void reqTime(IdSession session, ReqLogin req) { long playerId = req.getPlayerId(); System.out.println("player login" + playerId); Player player = new Player(); player.setId(playerId); player.setRemoteAddr(HostAndPort.valueOf(req.getSenderIp(), req.getSenderPort())); SessionManager.getInstance().register(playerId, player); ResPlayerLogin resp = new ResPlayerLogin(); resp.setPlayerId(playerId); player.receive(session, resp); } }
其中SessionManager 类缓存服务器的全局session,以及各个客户端环境的通信地址。以及,每隔一段时间主动向客户端推送消息。
public class SessionManager { private static SessionManager inst = new SessionManager(); private ConcurrentMap id2Players = new ConcurrentHashMap<>(); private IdSession serverSession; public static SessionManager getInstance() { return inst; } public void register(long playerId, Player player) { id2Players.put(playerId, player); } public void buildSession(IdSession session) { serverSession = session; } public void schedule() { SchedulerManager.getInstance().scheduleAtFixedRate(()->{ id2Players.forEach((key, value) -> { ResWelcome push = new ResWelcome(); push.setTime(System.currentTimeMillis()); value.receive(serverSession, push); }); }, TimeUtil.MILLIS_PER_MINUTE, 10*TimeUtil.MILLIS_PER_SECOND); } }
需要注意的是,客户端只有在登录成功之后,服务器才能绑定玩家与对应的客户端地址,才能主动推送消息。也就是说,在登录之前,服务器也是无法主动推送消息的,在业务上来说,也是没有意义的。
udp是一种无连接的协议,t提供不可靠性传输。UDP通过尽力交付数据包的方式进行传输,不对数据包的传输状态进行确认和重传,因此速度较快。UDP适用于对实时性要求较高、对数据准确性要求较低的应用场景。例如,如果语音视频服务。如果游戏服务器确实需要使用udp协议的话,需要在应用层解决丢包乱序问题。
对于乱序,接受方可以先把数据都缓存起来,等一段窗口期的数据接受完毕后再分发给业务层。对于丢包,无论是发送方还是接受方,都需要缓存最近发送的消息。以便用于丢包重传。当然,这只是基本的思路,实际处理起来可能会非常复杂。如果考虑到UDP协议的话,可能KCP会是更好的选择。