目录
整体结构说明
protobuf2文件
golang客户端
目录结构
生成pb.go文件
main.go
util.go
java服务端
目录结构
pom.xml
application.yml
NettyConfig.java
生成Helloworld.java
SocketServerHandler.java
NettyServerListener.java
Application.java
测试
因为设备的通信协议准备采用
protobuf
,所以准备这篇protobuf
的使用入门,golang
作为客户端,java
作为服务端,这才能真正体现出protobuf
的无关语言特性。本文采用
protobuf2
,注重于如何快速入门使用,并不会涉及到具体的细节知识点。
整体结构说明
golang
作为客户端,java
作为服务端,protobuf2
为两者的通信协议格式。
protobuf2文件
protobuf2简介
详细说明
helloworld.proto
syntax = "proto2";
package proto;
message ProtocolMessage {
message SearchRequest{
required string name = 1;
optional int32 search = 2 ;
}
message ActionRequest{
required string name = 1;
optional int32 action = 2 ;
}
message SearchResponse{
required string name = 1;
optional int32 search = 2 ;
}
message ActionResponse{
required string name = 1;
optional int32 action = 2 ;
}
optional SearchRequest searchRequest = 1;
optional ActionRequest actionRequest = 2;
optional SearchResponse searchResponse = 3;
optional ActionResponse actionResponse = 4;
}
SearchRequest
和SearchResponse
为对应的请求和相应message;
ActionRequest
和ActionResponse
为对应的请求和相应message;
由于服务端使用netty
框架,限制了只能接受一个message进行编码解码,所以把SearchRequest
、SearchResponse
、ActionRequest
和ActionResponse
都内嵌到ProtocolMessage
中,通过对ProtocolMessage
编码解码进行数据交互。
golang客户端
目录结构
client_proto/
├── api
│ ├── proto # 存放proto协议文件以及生产的pd.go文件
│ ├── helloworld.pb.go
│ └── helloworld.proto
├── cmd
│ ├── main.go
│ ├── util
│ └── util.go
采用go mod
进行开发
生成pb.go文件
安装proto
自行百度......
在.proto文件处,输入protoc --go_out=./ helloworld.proto
即可生成helloworld.pb.go
文件
main.go
package main
import (
"github.com/gin-gonic/gin"
proto "grpc/api/grpc_proto"
"grpc/cmd/demo3/util"
"net/http"
"time"
)
func init() {
util.InitTransfer()
}
func main() {
router := gin.Default()
// search 测试
router.GET("/search", func(c *gin.Context) {
name := "search"
search := int32(12)
message := &proto.ProtocolMessage{
SearchRequest:&proto.ProtocolMessage_SearchRequest{
Name:&name,
Search:&search,
},
}
if err := util.G_transfer.SendMsg(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
return
}
if err := util.G_transfer.ReadResponse(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
return
}
c.JSON(200, gin.H{
"message": message.SearchResponse.Name,
})
})
// action测试
router.GET("/action", func(c *gin.Context) {
name := "action"
action := int32(34)
message := &proto.ProtocolMessage{
ActionRequest: &proto.ProtocolMessage_ActionRequest{
Name: &name,
Action: &action,
},
}
if err := util.G_transfer.SendMsg(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
}
if err := util.G_transfer.ReadResponse(message); err != nil {
c.JSON(500, gin.H{
"err": err.Error(),
})
}
c.JSON(200, gin.H{
"message": message.ActionResponse.Name,
})
})
ReadTimeout := time.Duration(60) * time.Second
WriteTimeout := time.Duration(60) * time.Second
s := &http.Server{
Addr: ":8090",
Handler: router,
ReadTimeout: ReadTimeout,
WriteTimeout: WriteTimeout,
MaxHeaderBytes: 1 << 20,
}
s.ListenAndServe()
}
util.go
package util
import (
"encoding/binary"
"errors"
"github.com/gogo/protobuf/proto"
grpc_proto "grpc/api/grpc_proto"
"net"
)
var (
G_transfer *Transfer
)
func InitTransfer() {
var (
pTCPAddr *net.TCPAddr
conn net.Conn
err error
)
if pTCPAddr, err = net.ResolveTCPAddr("tcp", "127.0.0.1:3210"); err != nil {
return
}
if conn, err = net.DialTCP("tcp", nil, pTCPAddr); err != nil {
return
}
// 定义 Transfer 指针变量
G_transfer = &Transfer{
Conn: conn,
}
}
// 声明 Transfer 结构体
type Transfer struct {
Conn net.Conn // 连接
Buf [1024 * 2]byte // 传输时,使用的缓冲
}
// 获取并解析服务器的消息
func (transfer *Transfer) ReadResponse(response *grpc_proto.ProtocolMessage) (err error) {
_, err = transfer.Conn.Read(transfer.Buf[:4])
if err != nil {
return
}
// 根据 buf[:4] 转成一个 uint32 类型
var pkgLen uint32
pkgLen = binary.BigEndian.Uint32(transfer.Buf[:4])
//根据pkglen 读取消息内容
n, err := transfer.Conn.Read(transfer.Buf[:pkgLen])
if n != int(pkgLen) || err != nil {
return
}
if err = proto.Unmarshal(transfer.Buf[:pkgLen], response); err != nil {
return
}
return
}
// 发送消息到服务器
func (transfer *Transfer) SendMsg(action *grpc_proto.ProtocolMessage) (err error) {
var (
sendBytes []byte
readLen int
)
//sendBytes, ints := action.Descriptor()
if sendBytes, err = proto.Marshal(action); err != nil {
return
}
pkgLen := uint32(len(sendBytes))
var buf [4]byte
binary.BigEndian.PutUint32(buf[:4],pkgLen)
if readLen, err = transfer.Conn.Write(buf[:4]); readLen != 4 && err != nil {
if readLen == 0 {
return errors.New("发送数据长度发生异常,长度为0")
}
return
}
// 发送消息
if readLen, err = transfer.Conn.Write(sendBytes); err != nil {
if readLen == 0 {
return errors.New("检查到服务器关闭,客户端也关闭")
}
return
}
return
}
这里发送消息和读取消息都需要先发送/解析数据的长度,然后发送/解析数据本身;
这里与服务端怎么样解析/发送数据有关,这是由于netty
框架中定义的编码解码器决定的。
java服务端
目录结构
server_proto/
├── src
│ ├── main
│ ├── java
│ ├── com
│ ├── dust
│ ├── proto_server
│ ├── config
│ └── NettyConfig.java
│ ├── netty
│ └── NettyServerListener.java
│ └── SocketServerHandler.java
│ ├── proto
│ └── Helloworld.java
│ └── helloworld.proto # proto配置文件
│ └── Application.java # 启动配置类
│ ├── resources
│ └── application.yml #配置文件
│ ├── test
└── pom.xml # maven配置文件
采用springBoot
+netty
+maven
开发
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dust</groupId>
<artifactId>proto_server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>proto_server</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- protobuf依赖-->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.protobuf-java-format</groupId>
<artifactId>protobuf-java-format</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.19.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
注意:protobuf-java
的版本为3.8.0
,必须和安装proto.exe
的版本保持一致。
application.yml
# netty配置
netty:
# 端口号
port: 3210
# 最大线程数
maxThreads: 1024
# 数据包的最大长度
max_frame_length: 65535
NettyConfig.java
package com.dust.proto_server.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {
private int port;
}
生成Helloworld.java
在.proto文件处,输入protoc --java_out=./ helloworld.proto
即可生成Helloworld.java
文件
SocketServerHandler.java
package com.dust.proto_server.netty;
import com.dust.proto_server.proto.Helloworld;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
@ChannelHandler.Sharable
public class SocketServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(SocketServerHandler.class);
public ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
LOGGER.info(channel.id().toString()+"加入");
CHANNEL_GROUP.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx){
Channel channel = ctx.channel();
LOGGER.info(channel.id().toString()+"退出");
CHANNEL_GROUP.remove(channel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
//
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
LOGGER.info("开始读取客户端发送过来的数据");
Helloworld.ProtocolMessage protocolMessage = (Helloworld.ProtocolMessage) msg;
Helloworld.ProtocolMessage.Builder builder = Helloworld.ProtocolMessage.newBuilder();
if (protocolMessage.getSearchRequest().getSerializedSize() != 0) {
Helloworld.ProtocolMessage.SearchRequest searchRequest = protocolMessage.getSearchRequest();
LOGGER.info("searchRequest--{}",searchRequest);
Helloworld.ProtocolMessage.SearchResponse searchResponse = Helloworld.ProtocolMessage.SearchResponse.newBuilder().setName("i am SearchResponse").setSearch(45).build();
builder.setSearchResponse(searchResponse);
} else if (protocolMessage.getActionRequest().getSerializedSize() != 0) {
Helloworld.ProtocolMessage.ActionRequest actionRequest = protocolMessage.getActionRequest();
LOGGER.info("actionRequest--{}",actionRequest);
Helloworld.ProtocolMessage.ActionResponse actionResponse = Helloworld.ProtocolMessage.ActionResponse.newBuilder().setName("i am ActionResponse").setAction(67).build();
builder.setActionResponse(actionResponse);
}
Helloworld.ProtocolMessage message = builder.build();
// 发送数据长度
ctx.channel().writeAndFlush(message.toByteArray().length);
// 发送数据本身
ctx.channel().writeAndFlush(message);
}
}
NettyServerListener.java
package com.dust.proto_server.netty;
import com.dust.proto_server.config.NettyConfig;
import com.dust.proto_server.proto.Helloworld;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
@Component
public class NettyServerListener {
/**
* NettyServerListener 日志输出器
*
*/
private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerListener.class);
/**
* 创建bootstrap
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* BOSS
*/
EventLoopGroup boss = new NioEventLoopGroup();
/**
* Worker
*/
EventLoopGroup work = new NioEventLoopGroup();
@Resource
private SocketServerHandler socketServerHandler;
/**
* NETT服务器配置类
*/
@Resource
private NettyConfig nettyConfig;
/**
* 关闭服务器方法
*/
@PreDestroy
public void close() {
LOGGER.info("关闭服务器....");
//优雅退出
boss.shutdownGracefully();
work.shutdownGracefully();
}
/**
* 开启及服务线程
*/
public void start() {
// 从配置文件中(application.yml)获取服务端监听端口号
int port = nettyConfig.getPort();
serverBootstrap.group(boss, work).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 负责通过4字节Header指定的Body长度将消息切割
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
// 负责将frameDecoder处理后的完整的一条消息的protobuf字节码转成ProtocolMessage对象
pipeline.addLast("protobufDecoder",
new ProtobufDecoder(Helloworld.ProtocolMessage.getDefaultInstance()));
// 负责将写入的字节码加上4字节Header前缀来指定Body长度
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
// 负责将ProtocolMessage对象转成protobuf字节码
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
pipeline.addLast(socketServerHandler);
}
}).option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO));
try {
LOGGER.info("netty服务器在[{}]端口启动监听", port);
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOGGER.info("[出现异常] 释放资源");
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
这个类就定义服务端是怎么样处理接受和发送数据的;
frameDecoder
和protobufDecoder
对应的handler用于解码Protobuf package数据包,他们都是Upstream Handles:先处理长度,然后再处理数据本身;
frameEncoder
和protobufEncoder
对应的handler用于编码Protobuf package数据包,他们都是Downstream Handles;
此外还有一个handler,是一个自定义的Upstream Handles,用于开发者从网络数据中解析得到自己所需的数据socketServerHandler
;
上例Handles的执行顺序为
upstream:frameDecoder,protobufDecoder,handler //解码从Socket收到的数据
downstream:frameEncoder,protobufEncoder //编码要通过Socket发送出去的数据
Application.java
package com.dust.proto_server;
import com.dust.proto_server.netty.NettyServerListener;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import javax.annotation.Resource;
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Resource
private NettyServerListener nettyServerListener;
@Override
public void run(String... args) throws Exception {
nettyServerListener.start();
}
}
测试
先启动服务端,再启动客户端
search测试
action测试