Netty整合STOMP

2022-10-23,,

1.STOMP协议简介

常用的WebSocket协议定义了两种传输信息类型:文本信息和二进制信息。类型虽然被确定,但是他们的传输体是没有规定的,也就是说传输体可以自定义成什么样的数据格式都行,只要客户端和服务端约定好,得到数据后能够按照约定的语义解析数据就好。相较于Http协议约定好了必须有请求头、请求体这类语义,接收到数据后先要解析头信息,WebSocket协议为了节省带宽和减轻服务器的负担,只要建立了ws连接,双方需要采用提前定义好的传输体格式(比如JSON格式)去解析数据,没有了Http协议那些头信息的解析工作和传输,也就提升了通讯效率。

直接使用WebSocket因为没有高层级的协议,所以需要我们定义应用之间所发送消息的语义,还要确保联机的两端都能遵循。因此Stomp作为文本定向消息协议常常作为子协议用于集成WebSocket,用来定义消息的语义。请求和响应数据格式类似,STOMP帧由命令、一个或多个头信息以及负载所组成。

下图是测试使用Stomp协议从服务端主动推送给客户端信息,这整个消息被称为“帧”,其中:

MESSAGE代表命令帧,表明此帧的为那种类型操作。Stomp的命令帧还包括:CONNECT\CONNECTED\SUBSCRIBE\SEND等;
message-id、subscription、content-length、content-type都是作为头信息,用于携带简单的信息内容;
负载则是服务端具体发送给客户端的详细信息,具体格式可以由开发人员随意设置;

<<< MESSAGE  #命令
message-id:6834f845-49dd-480e-8b94-84d31372d744 #头信息
subscription:sub-0 #头信息
content-length:136 #头信息
content-type:application/json #头信息 {"content":"此条内容为发送的消息--你好,世界!"} # 负载

由此可以看出Stomp既没有Http协议那样包含了太多的头信息而占用过多的网络带宽,又能提供比较丰富的格式语义方便双方的通讯。

2.Netty整合STOMP存在的问题

“即便Netty提供了STOMP编辑码器,但STOMP 是为消息中间件而设计的一种消息传递协议,而Netty 提供的是异步的、事件驱动的网络应用程序框架,并非作为消息代理,因此从功能契合的意义上说Netty不支持 STOMP协议。”———— Apache ActiveMQ 开发人员 Justin Bertram

不过实际开发中如果只是单纯的采用Netty+WebSocket会导致通信形式层级过低从而导致通信缺乏可靠性,所以Spring社区就提倡将Stomp作为WebSocket子协议增强通信的消息语义,《Spring实战》也给出了如何通集成Spring、WebSocket、Stomp如何进行集成开发。但是Netty+WebSocket如何将Stomp作为子协议,本人搜索了Google、Stack Overflow后并没有具体的范例,直至后面在Netty的GitHub社区中看到了简单的范例说明,因此特意记录下来希望帮助到有需要的人。

具体代码可参考:https://github.com/1148973713/whales-netty

3.Netty消息通讯的握手原理

在集成STOMP协议过程中,最重要的是官方提供的WebSocketServerProtocolHandler处理器(代码3-1):

    protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(StompWebSocketClientPageHandler.INSTANCE)
.addLast(new WebSocketServerProtocolHandler(chatPath, StompVersion.SUB_PROTOCOLS))
.addLast(stompWebSocketProtocolCodec);
}

让我们进入WebSocketServerProtocolHandler类中发现此处需要传入的两个参数分别是:websocketPath--websocket地址、subprotocols--附加的子协议。

    public WebSocketServerProtocolHandler(String websocketPath, String subprotocols) {
this(websocketPath, subprotocols, 10000L);
}

此方法最终调用的是(代码3-2):

public WebSocketServerProtocolHandler(String websocketPath, String subprotocols, boolean checkStartsWith,
boolean dropPongFrames, long handshakeTimeoutMillis,
WebSocketDecoderConfig decoderConfig) {
this(WebSocketServerProtocolConfig.newBuilder()
.websocketPath(websocketPath)
.subprotocols(subprotocols)
.checkStartsWith(checkStartsWith)
.handshakeTimeoutMillis(handshakeTimeoutMillis)
.dropPongFrames(dropPongFrames)
.decoderConfig(decoderConfig)
.build());
}

其中WebSocketServerProtocolConfig.build()是将前面的各种配置参数传入,最后创建一个WebSocketServerProtocolConfig实体类:

        public WebSocketServerProtocolConfig build() {
return new WebSocketServerProtocolConfig(
websocketPath,
subprotocols,
checkStartsWith,
handshakeTimeoutMillis,
forceCloseTimeoutMillis,
handleCloseFrames,
sendCloseFrame,
dropPongFrames,
decoderConfigBuilder == null ? decoderConfig : decoderConfigBuilder.build()
);
}

此处为了读者能理解后续的代码执行流程先插入一个知识点,ChannelHandler 回调方法的执行顺序:

    连接请求,handlerAdded () -> channelRegistered () -> channelActive () -> channelRead () -> channelReadComplete ();
    数据请求,channelRead () -> channelReadComplete ();
    通道被关闭,channelInactive () -> channelUnregistered () -> handlerRemoved ();

handlerAdded ()、channelRegistered ()、channelActive ()只会在生命周期执行一次,后面便不会再被执行。

具体可以查看链接:https://book.itxueyuan.com/b2A7/ZGGQg

在Netty的ChannelHandler生命周期中handlerAdded()方法的执行是在检测到新请求连接时添加一个handler处理器到链表之中。经过WebSocketServerProtocolConfig.build()后,ChannelHandler会触发handlerAdded()方法[注:以下代码也是存在与WebSocketServerProtocolHandler类中](代码3-3)

     @Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// Add the WebSocketHandshakeHandler before this one.
cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(serverConfig));
}
if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
// Add the UFT8 checking before this one.
cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
new Utf8FrameValidator(serverConfig.decoderConfig().closeOnProtocolViolation()));
}
}

在上述代码中因为handlerAdded中判断第一次连接时,Pipeline管道内不存在WebSocketServerProtocolHandshakeHandler(WebSocket服务器协议握手处理程序),就会通过addBefore()方法将WebSocketServerProtocolHandshakeHandler添加在WebSocketServerProtocolHandler前。

最终形成WebSocketServerProtocolHandshakeHandler------>Utf8FrameValidator(道理和上面一样)----->WebSocketServerProtocolHandler

我们先不管Utf8FrameValidator,着重分析WebSocketServerProtocolHandshakeHandler处理器内部代码(代码3-4):

 	@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
final HttpObject httpObject = (HttpObject) msg; if (httpObject instanceof HttpRequest) {
//获取当前请求信息
final HttpRequest req = (HttpRequest) httpObject;
//进行比对请求路径,验证协议
isWebSocketPath = isWebSocketPath(req);
if (!isWebSocketPath) {
ctx.fireChannelRead(msg);
return;
} try {
... ... //创建握手工厂,从参数可以看出:webSocketURL--webSocket的路径地址、subprotocols--添加的自定义子协议、decoderConfig解码设置
final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
serverConfig.subprotocols(), serverConfig.decoderConfig());
////创建一个握手处理器
final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);
//握手回调
final ChannelPromise localHandshakePromise = handshakePromise;
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
//设置处理器
WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);
ctx.pipeline().remove(this); //ChannelFuture的作用是用来保存Channel异步操作的结果。在Netty中所有的I/O操作都是异步的。ChannelFuture代表了异步计算的结果,这个接口的主要方法就是检查计算是否已完成,等待计算然后返回计算结果。
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
//进行结果监听
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
... ...
//如果发送成功情况,保持兼容性触发事件
localHandshakePromise.trySuccess();
//如果发送成功情况,则触发握手成功事件
ctx.fireUserEventTriggered(
new WebSocketServerProtocolHandler.HandshakeComplete(
req.uri(), req.headers(), handshaker.selectedSubprotocol()));
}
});
applyHandshakeTimeout();
}
}
}

以下是笔者先用SpirngBoot+WebSocket整合Stomp协议做了基本模板,我们可以看到在相应头中存在sec-websocket-protocol属性,其意义就是所携带的STOMP协议和其版本信息:

如本文开头所说,STOMP协议意在让WebSocket协议变得可靠一些,通过客户端与服务端达成的语义相互确认后才进行连接通讯。因此如何获取客户端请求中的子协议信息并让服务端匹配确认,这一步骤在握手环节过程中是非常重要的。

所以,在上述代码中wsFactory.newHandshaker(req) handshaker.handshake(ctx.channel(), req)两个方法实现了我们想要的效果。

首先是newHandshaker方法,它是属于WebSocketServerHandshakerFactory类中的方法(代码3-5):

public WebSocketServerHandshaker newHandshaker(HttpRequest req) {

        //获取头信息中sec-websocket-protocol属性的值
CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
if (version != null) {
//寻常的websocket协议版本
... ...
} else {
//因为我们是附加了STOMP子协议,所以返回的是WebSocketServerHandshaker00
return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
}
}

WebSocketServerHandshaker00()主要是构建一个自定义的握手方式,其中字符串参数subprotocols便是由开发人员自定义的协议,它在先前WebSocketServerHandshakerFactory对象创建之前就被传入(具体可以看代码3-4)。随后subprotocols将会被分割形成保存自定义协议名称的数组。

之后的handshaker.handshake(ctx.channel(), req)方法将会开始处理握手,因为其中传入参数req正式客户端发过来的请求,所以此方法部分作用就是用来比对客户端携带的sec-websocket-protocol属性值是否与服务端自定义属性相对应,如果是则返回握手成功的结果(代码3-6):

public ChannelFuture handshake(Channel channel, FullHttpRequest req) {
return handshake(channel, req, null, channel.newPromise());
} public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise) {
... ...
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
... ...
return promise;
}

newHandshakeResponse由WebSocketServerHandshaker00类自身实现,它主要的作用就是用来判断匹配并在匹配成功后设置由服务端返回给客户端的响应内容(代码3-7):

    @Override
protected FullHttpResponse newHandshakeResponse(FullHttpRequest req, HttpHeaders headers) { // 服务WebSocket握手请求
if (!req.headers().containsValue(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE, true)
|| !HttpHeaderValues.WEBSOCKET.contentEqualsIgnoreCase(req.headers().get(HttpHeaderNames.UPGRADE))) {
throw new WebSocketServerHandshakeException("not a WebSocket handshake request: missing upgrade", req);
} // Hixie 75不包含这些报头,而Hixie 76包含
//查询头信息中是否包含sec-websocket-key1属性与sec-websocket-key2属性
boolean isHixie76 = req.headers().contains(HttpHeaderNames.SEC_WEBSOCKET_KEY1) &&
req.headers().contains(HttpHeaderNames.SEC_WEBSOCKET_KEY2); //获取头信息Origin(起源)值
String origin = req.headers().get(HttpHeaderNames.ORIGIN);
... ...
if (isHixie76) {
... ...
} else {
//因为我们上面请求头是不包含上面两个属性的,所以直接到这里面了
//添加头属性与对应值
res.headers().add(HttpHeaderNames.WEBSOCKET_ORIGIN, origin);
res.headers().add(HttpHeaderNames.WEBSOCKET_LOCATION, uri());
//获取sec-websocket-protocol属性值
String protocol = req.headers().get(HttpHeaderNames.WEBSOCKET_PROTOCOL);
if (protocol != null) {
//先对比,后设置sec-websocket-protocol
res.headers().set(HttpHeaderNames.WEBSOCKET_PROTOCOL, selectSubprotocol(protocol));
}
}
return res;
}

在代码最后调用了selectSubprotocol(protocol),传入的参数是客户端发来的请求中携带的sec-websocket-protocol属性值(代码3-8):

    protected String selectSubprotocol(String requestedSubprotocols) {
... ...
//将sec-websocket-protocol属性值进行分割
String[] requestedSubprotocolArray = requestedSubprotocols.split(",");
for (String p: requestedSubprotocolArray) {
String requestedSubprotocol = p.trim(); for (String supportedSubprotocol: subprotocols) {
if (SUB_PROTOCOL_WILDCARD.equals(supportedSubprotocol)
|| requestedSubprotocol.equals(supportedSubprotocol)) {
selectedSubprotocol = requestedSubprotocol;
return requestedSubprotocol;
}
}
}
return null;
}

我们可以注意到if中进行了requestedSubprotocol.equals(supportedSubprotocol)判断,其中requestedSubprotocol是请求中所携带的协议信息,而supportedSubprotocol则是自定义的协议信息,在(代码3-5)中已经随着WebSocketServerHandshaker00创建赋予了值。

所以根据上述的流程,我们应当创建一个StompVersion和一个握手事件成功后触发的处理器,分别用于握手前传入WebSocketServerProtocolHandler()方法以及握手成功后返回响应和对应的STOMP帧告诉客户端连接成功可以进行通讯。

4.Netty如何兼容STOMP协议

根据上述代码流程我们可以发现,当确认握手成功建立连接后,就会触发握手成功事件。因此我门只需要编写事件触发后接收事件的处理器,并添加到pipeline(返回看3-1代码)中(代码4-1):

public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocketFrame, StompSubframe> {

    private final StompChatHandler stompChatHandler = new StompChatHandler();
private final StompWebSocketFrameEncoder stompWebSocketFrameEncoder = new StompWebSocketFrameEncoder(); //userEventTriggered:当fireUserEventTriggered()方法触发时候被调用。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//当事件属于HandshakeComplete(握手成功时),与上面new WebSocketServerProtocolHandler.HandshakeComplete对应
if (evt instanceof HandshakeComplete) {
//查找出STOMP协议版本
StompVersion stompVersion = StompVersion.findBySubProtocol(((HandshakeComplete) evt).selectedSubprotocol());
ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).set(stompVersion);
ctx.pipeline()
.addLast(new WebSocketFrameAggregator(65536))
.addLast(new StompSubframeDecoder())
.addLast(new StompSubframeAggregator(65536))
.addLast(stompChatHandler)
.remove(StompWebSocketClientPageHandler.INSTANCE);
} else {
super.userEventTriggered(ctx, evt);
}
} @Override
protected void encode(ChannelHandlerContext ctx, StompSubframe stompFrame, List<Object> out) throws Exception {
System.out.println("---------------------->进行编码");
stompWebSocketFrameEncoder.encode(ctx, stompFrame, out);
} @Override
protected void decode(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame, List<Object> out) {
System.out.println("---------------------->进行解码");
if (webSocketFrame instanceof TextWebSocketFrame || webSocketFrame instanceof BinaryWebSocketFrame) {
out.add(webSocketFrame.content().retain());
} else {
ctx.close();
}
}
}

在StompChatHandler处理器中(代码过长,直接下载文章开头的势力代码进行阅读即可),我们根据STOMP官方规定的命令(CONNECT、SUBSCRIBE、SEND、UNSUBSCRIBE、DISCONNECT)进行判断和对消息内容的处理。以下以SEND命令为例(代码4-2):

	private final ConcurrentMap<String, Set<StompSubscription>> chatDestinations =
new ConcurrentHashMap<String, Set<StompSubscription>>(); private void onSend(ChannelHandlerContext ctx, StompFrame inboundFrame) {
//获取目的地名称
String destination = inboundFrame.headers().getAsString(DESTINATION); if (destination == null) {
sendErrorFrame("missed header", "required 'destination' header missed", ctx);
return;
} //根据目的地名称获取相应订阅地址信息
Set<StompSubscription> subscriptions = chatDestinations.get(destination); for (StompSubscription subscription : subscriptions) { subscription.channel().writeAndFlush(transformToMessage(inboundFrame, subscription));
}
} private static StompFrame transformToMessage(StompFrame sendFrame, StompSubscription subscription) { Charset charset = StandardCharsets.UTF_8; //设置消息命令为MESSAGE,消息文本内容副本,并返回DefaultStompFrame类型
StompFrame messageFrame = new DefaultStompFrame(StompCommand.MESSAGE, sendFrame.content().retainedDuplicate());
String id = UUID.randomUUID().toString();
//设置STOMP帧的头信息,具体参照文章开始STOMP帧格式
messageFrame.headers()
.set(MESSAGE_ID, id)
.set(SUBSCRIPTION, subscription.id())
.set(CONTENT_LENGTH, Integer.toString(messageFrame.content().readableBytes())); //获取STOMP内容类型,默认未BYTEBUF
CharSequence contentType = sendFrame.headers().get(CONTENT_TYPE);
if (contentType != null) {
//如果有,则也设置入头中
messageFrame.headers().set(CONTENT_TYPE, contentType);
}
//返回设置好的消息格式
return messageFrame;
}

进入DefaultStompFrame可知,我们需要发送的文本被转化成ByteBuf类型。如果默认采用此类型而不加以设置会导致当内容为中文时产生乱码。

    public DefaultStompFrame(StompCommand command, ByteBuf content) {
this(command, content, null);
}

如上问题,我们需要编写一个编码器StompWebSocketFrameEncoder(返回看代码4-1)对发送出去的消息进行编码,其中比较重要的方法如下(代码4-3):

    @Override
protected WebSocketFrame convertFullFrame(StompFrame original, ByteBuf encoded) {
//如果content-type消息头类型为text或则是application/json,则采用TextWebSocketFrame编码方式
if (isTextFrame(original)) {
return new TextWebSocketFrame(encoded);
}
//否则采用BinaryWebSocketFrame(二进制)编码方式
return new BinaryWebSocketFrame(encoded);
} private static boolean isTextFrame(StompHeadersSubframe headersSubframe) {
//判断STOMP帧是否携带content-type消息头,并且对消息类型进行判断
String contentType = headersSubframe.headers().getAsString(StompHeaders.CONTENT_TYPE);
return contentType != null && (contentType.startsWith("text") || contentType.startsWith("application/json"));
}

为什么发送消息出去要用的编码器而不是解码器,我们以下面这流程图为例:

5.编写StompVersion

即使Spring官方推荐采用SockJS以兼容更多的浏览器,但考虑到目前主流的浏览器都可以支持WebSocket协议,因此笔者便放弃采用SockJS+ Stomp.over()创建客户端,而是选用Stomp.client(url)。前者与后者区别主要在于Stomp.client(url)调用时采用的是浏览器自身所支持的不同的WebSocket协议,而不是教由SockJS统一实现。

根据源码可以看到STOMP当的默认版本是['v10.stomp', 'v11.stomp'],因此要让Netty兼容STOMP则必须让服务端明白当前采用STOMP协议与其对应的版本,可以看到STOMP协议版本:v10.stomp、v11.stomp。

        client: function(url, protocols) {
var klass, ws;
if (protocols == null) {
protocols = ['v10.stomp', 'v11.stomp'];
}
klass = Stomp.WebSocketClass || WebSocket;
ws = new klass(url, protocols);
return new Client(ws);
}

还记得(代码3-1)与(代码4-1)中 我们需要传递附加的子协议吗?这便是为什么需要编写StompVersion(代码5-1):

public enum StompVersion {

    STOMP_V11("1.1", "v11.stomp"),

    STOMP_V12("1.2", "v12.stomp");

    //获取key为stomp_version的常量。如果没有这样的常量,将创建并返回一个新的常量。
public static final AttributeKey<StompVersion> CHANNEL_ATTRIBUTE_KEY = AttributeKey.valueOf("stomp_version");
public static final String SUB_PROTOCOLS; static {
List<String> subProtocols = new ArrayList<String>(values().length);
for (StompVersion stompVersion : values()) {
subProtocols.add(stompVersion.subProtocol);
}
//将枚举类型转化为字符串格式: v11.stomp,v12.stomp
SUB_PROTOCOLS = StringUtil.join(",", subProtocols).toString();
} private final String version;
private final String subProtocol; StompVersion(String version, String subProtocol) {
this.version = version;
this.subProtocol = subProtocol;
} public String version() {
return version;
} public String subProtocol() {
return subProtocol;
} //根据subProtocol返回匹配上的枚举
public static StompVersion findBySubProtocol(String subProtocol) {
if (subProtocol != null) {
for (StompVersion stompVersion : values()) {
if (stompVersion.subProtocol().equals(subProtocol)) {
return stompVersion;
}
}
}
}

在代码(代码4-1)中,即握手事件成功时,我们需要比对并获取当前Stomp协议的版本,并将该版本信息设置进CHANNEL_ATTRIBUTE_KEY中:

 StompVersion stompVersion = StompVersion.findBySubProtocol(((HandshakeComplete) evt).selectedSubprotocol());
ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).set(stompVersion);

而当客户端真正连接服务端,发送CONNECT命令时,就会触发StompChatHandler代码中onSend方法(代码5-2):

 private static void onConnect(ChannelHandlerContext ctx, StompFrame inboundFrame) {
String acceptVersions = inboundFrame.headers().getAsString(ACCEPT_VERSION);
//获取当前采用的STOMP协议版本
StompVersion handshakeAcceptVersion = ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).get();
StompFrame connectedFrame = new DefaultStompFrame(StompCommand.CONNECTED);
//设置头信息
connectedFrame.headers()
.set(VERSION, handshakeAcceptVersion.version())
.set(SERVER, "Netty-Server")
.set(HEART_BEAT, "0,0");
ctx.writeAndFlush(connectedFrame);
}

这时候客户端会接受到服务端发送来封装好的帧,而这些帧正如HTTP中的头信息携带语义,可以让客户端得知当前连接成功以及开始后续其他操作:

<<< CONNECTED
version:1.1
server:Netty-Server
heart-beat:0,0

自此,整个Netty整合STOMP协议告一段落,因为除了Netty官网的示例外,暂时没有看到有其他博客对此有说明记录,因此尽兴了比较详细的讲解。内容或许过于分散,希望读者跟着源码进行阅读理解。

Netty整合STOMP的相关教程结束。

《Netty整合STOMP.doc》

下载本文的Word格式文档,以方便收藏与打印。