聊聊基于tcp的应用层消息边界如何定义

背景

2018年笔者有幸接触一个项目要用到长连接实现云端到设备端消息推送,所以借机了解过相关的内容,最终是通过rabbitmq+mqtt实现了相关功能,同时在心里也打了一个问号“如果自己实现长连接框架,该怎么定义消息的边界呢?”之后断续续整理了一些,一直不成体系,最近放假了整理出来跟大家交流一番。

为什么需要消息边界

消息边界并非长连接场景才需要,即使是短连接也可能需要,拿我们比较常用的http1.0协议(

http1.1稍微复杂一些,后面会单独说

)来说,它基于tcp这个传输协议来传递消息,而tcp协议又是一个面向流的协议,怎么能识别出已经到了流的末尾呢?我们需要一种规则来定义消息的边界,告诉对方读取已经到了末尾,可以结束了。

举一个生活中的例子来帮助理解,2020年由于疫情的原因,平日里都是在线下会议室开会,特殊时期演变成了线上会议。不知道大家有没有遇到过这种情况,线下开会时通过观察别人的动作、神情很容易知道他说完了,这时候下一个人就可以接着发言了,但是线上开会时这样就行不通了,你如果想发言是不是得先确认下别人有没有说完,如果直接发言可能会打断别人,这样很不礼貌,为什么会出现这种情况呢?因为你不知道他到底有没有结束发言,更专业一点说你不知道是否到达了消息的边界。那怎么改进呢,如果每个人发言完毕都显示地告诉别人“我说完了”,是不是会好一些呢,“我说完了”这四个字就是一种消息的边界,给接收方传达一种消息结束的讯息。

TCP层面的分析

在基于流的传输(例如TCP/IP)中,将接收到的数据存储到套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是数据包队列而是字节队列。这意味着,即使您将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一堆字节。因此,不能保证读取的内容与远端写的完全一样。例如,假设操作系统的TCP/IP栈已收到三个数据包:

由于是基于流的协议,因此很有可能在应用程序中读到以下四个分段:

因此,无论是服务器端还是客户端,接收方都应将接收到的数据整理到一个或多个有意义的帧中,以使应用程序逻辑易于理解。在上面的示例中,正确的数据应采用以下格式:

消息边界的种类

前面介绍了消息边界的定义以及作用,这一节我们来看看大概会有哪几种消息边界。

1.特殊字符:比如上面提到的“我说完了”这就是一种特殊字符作为消息边界的例子,以特殊字符为边界的典型产品有我们熟知的redis,客户端和服务器发送的命令或数据一律以\r\n(CRLF)结尾,还有Netty中的DelimiterBasedFrameDecoder。

2.基于消息长度:比如约定了消息长度为4k字节,接收方每次读取4k字节以后就认为已到达消息边界,结束本次读取。当然现实中消息长度一般是变长的,这样就需要设计一个约定好的消息头部,将消息长度作为头部的一部分传输过去,以长度为边界的例子有Dubbo、http

、websocket,Netty中的FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder等。

打开网易新闻 查看更多图片

redis如何解析完整消息

上面说过,redis是通过\r\n来作为消息边界的,下面我将从源码角度分析下redis具体是如何处理的。

1.这里通过telnet来发送内联格式命令请求redis,之所以没有选用redis-cli是想模拟一条指令redis-server分多次收到的情况,在telnet模式下,每输入一个字符,就会发送给redis-server端,而redis-cli不是,它是按下回车时才会发送整体输入的命令,redis-server端是分多次还是一次收到完整的命令,这个取决于底层,如果想模拟分多次收到,这个过程较为复杂。

2.redis-server端每次有输入时会触发readQueryFromClient(networking.c)函数,对redis执行流程感兴趣的可以参考我之前的文章“redis源码学习之工作流程初探”。

3.redis-server将收到的内容暂存到redisClient的querybuf中,如果没有收到\r\n就等待,直到收到\r\n才将querybuf中的内容解析成指令执行。

测试步骤如下:

telnet中输入g

debug查看redisClient中querybuf的值,目前只有g

telnet中输完geta按回车以后,redisClient中querybuf保存了所有的输入geta\r\n

打开网易新闻 查看更多图片

源码分析如下:

readQueryFromClient

voidreadQueryFromClient(aeEventLoop*el,intfd,void*privdata,intmask){redisClient*c=(redisClient*)privdata;intnread,readlen;size_tqblen;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);server.current_client=c;readlen=REDIS_IOBUF_LEN;/*Ifthisisamultibulkrequest,andweareprocessingabulkreply*thatislargeenough,trytomaximizetheprobabilitythatthequery*buffercontainsexactlytheSDSstringrepresentingtheobject,even*attheriskofrequiringmoreread(2)calls.Thiswaythefunction*processMultiBulkBuffer()canavoidcopyingbufferstocreatethe*RedisObjectrepresentingtheargument.*/if(c->reqtype==REDIS_REQ_MULTIBULK&&c->multibulklen&&c->bulklen!=-1&&c->bulklen>=REDIS_MBULK_BIG_ARG){intremaining=(int)((unsigned)(c->bulklen+2)-sdslen(c->querybuf));if(remainingquerybuf);if(c->querybuf_peakquerybuf_peak=qblen;c->querybuf=sdsMakeRoomFor(c->querybuf,readlen);//从fd中读取内容,读取的内容存到redisClient的querybuf中nread=read(fd,c->querybuf+qblen,readlen);if(nread==-1){if(errno==EAGAIN){nread=0;}else{#ifdef_WIN32redisLog(REDIS_VERBOSE,"Readingfromclient:%s",wsa_strerror(errno));#elseredisLog(REDIS_VERBOSE,"Readingfromclient:%s",strerror(errno));#