背景

学过java的都使用过RMI框架(remote method invocation),远程方法调用,比如A,B二个服务器,A调用B服务器上的方法就像调用本地方法一样,但是本质上是跨机器的调用了,A机器将调用的方法名,参数通过字节码的形式传输到B这台机器上,B这台机器将这些字节码转换成对B机器上具体方法的调用,并将相应的返回值序列化成二进制数据传输到A服务器上。

RPC(Remote Procedure Call)其实和rmi及其类似,RPC与RMI框架对比的优势就是好多RPC框架都是跨语言的。

RMI只针对java,A,B服务都使用java编写。几乎所有的RPC框架都存在代码生成,自动代码屏蔽了底层序列化通信等各种细节的处理,使得用户(开发者)可以像调用本地方法一样调用远程的方法。一般这种自动生成的代码在客户端我们称为stub,服务端我们称为skeleton。

序列化与反序列化技术,也称为编码与解码技术,比如我们本篇博客讨论的Google Protobuf,和marshalling等技术。

从广义上来讲,webservice也可以称为RPC框架,但是相比于其他的RPC框架来说,webservice的性能稍微差点,因为决定一个rpc性能的优秀与否在于其底层对象编解码性能。RPC一般都是基于socket协议传输的,而webservice基于http传输的,socket协议的性能也要高于http协议传输数据。所以,一般在公司内部各个微服务之间的服务调用都使用RPC框架多一点,因为在性能上的考虑,而我们总所周知的dubbo虽然也算是RPC框架,但其实并不支持多语言。

什么是protocol buffers?

Protocol buffers是谷歌的语言中立,平台中立的,可扩展机制的序列化数据结构框架-可以看作是xml,但是体积更小,传输速率更快,使用更加简单。一旦你定义了你的数据格式,你可以使用生成源代码去轻松地从各种数据流读和写你的结构化数据并且使用不同的语言。protobuf有2.0版本和3.0版本,3.0版本十grpc框架的基础

Protocol buffers目前支持Java, Python, Objective-C, 和C++生成代码。新的proto3语言版本,你可以使用Go, JavaNano, Ruby, 和 C#。

为什么使用Protocol buffers

使用一个简单的可以从一个文件中去读写人员联系信息”地址簿”程序。每个在地址簿的人有姓名,id,邮箱地址和一个联系人电话号码属性。

你如何序列化和检索这样的结构化数据? 有几种方法来解决这个问题:
使用java原生的序列化。这是一种默认的方式因为是内嵌于java语言的,但是有一大堆众所周知的问题(参考Effective Java这本书),并且你不能将数据分享于C++和Python应用(也就是不能跨语言)。

还可以将数据项编码为单个字符串的ad-hoc方式 - 例如将4个ints编码为“12:3:-23:67”。 这是一个简单而灵活的方法,尽管它需要编写一次性编码和解析代码,并且解析具有很小的运行时成本。 这最适合编码非常简单的数据。

将数据序列化为XML。 这种方法可能非常有吸引力,因为XML是(可能的)人类可读的,并且有很多语言的绑定库。 如果您想与其他应用程序/项目共享数据,这可能是一个很好的选择。 然而,XML浪费性能,编码/解码可能会对应用程序造成巨大的性能损失。 另外,检索XML DOM树比在一般类中简单的字段检索要复杂得多。

Protocol buffers是灵活,高效,自动化的解决方案来解决这个问题。 使用Protocol buffers,您可以编写一个.proto描述您希望存储的数据结构。 Protocol buffers编译器创建一个实现自动编码和解析协议缓冲区数据的类,并使用高效的二进制格式。 生成的类为组成Protocol buffers的字段提供getter和setter。

使用Protobuf编写一个编码解码的最简单程序

  • 在 .proto结尾的文件中定义消息格式。
  • 使用protocol buffers编译器将 .proto结尾的文件生成对应语言的源代码(本demo使用java编译器)。
  • 使用Java protocol buffer API 去读写消息。

定义一个Student.proto文件

//定义使用的proto版本
syntax ="proto2";
//包路径,自定义
package com.sakura.protobuf;

//optimize_for文件选项 SPEED加快解析速度,默认就是SPEED
option optimize_for = SPEED;
//包路径,自定义,比package优先级高
//如果java_package存在则默认使用java_package的配置,如果使用java_package就必须要指定package
//以免在生成java以外的语言时,不存在java_package参数,造成命名秘密空间问题
option java_package = "com.sakura.protobuf";
//生成的外部类的名称,自定义 如果不定义则默认使用文件名的驼峰命名法转换的名称作为外部类名称
option java_outer_classname = "DataInfo";

/*
message 表示消息,如同Java的类

格式:
修饰符 数据类型 属性名 = 唯一标记数;
修饰符:
required 必须的,必须存在,必须赋值
optional 可选的,可以不使用
repeated 重复的,标识一个list/链表
数据类型:
包括 bool,int32,float,double,和string
唯一标记数:
标签编号1-15与较高的编号相比,编码所需的字节减少了一个字节,因此,
为了进行优化,您可以决定将这些标签用于常用或重复的元素
*/
message Student{
required string name = 1;
optional int32 age = 2;
optional string address = 3;
}

在Java项目中,除非你已经明确指定了java_package,否则package 用作Java的包名。即使您提供java_package,您仍然应该定义一个package,以避免在Protocol Buffers名称空间和非Java语言中的名称冲突。

在package的定义之后,我们可以看到两个定义的java选项:java_packagejava_outer_classnamejava_package指定您生成的类应该存放的Java包名称。 如果没有明确指定它,将会使用package定义的name作为包名,但这些名称通常不是适合的Java包名称(因为它们通常不以域名开头)。 java_outer_classname选项定义应该包含此文件中所有类的类名。 如果你不明确地给出一个java_outer_classname,它将通过将文件名转换为驼峰的方式来生成。 例如,默认情况下,“my_proto.proto”将使用“MyProto”作为外部类名称。

每个元素上的“= 1”,“= 2”标记标识字段在二进制编码中使用的唯一“标签”。你可以将经常使用或者重复的字段标注成1-15,因为在进行编码的时候因为少一个字节进行编码,所以效率更高。

required:必须提供该字段的值,否则被认为没有初始化。尝试构建一个未初始化的值被会抛出RuntimeException。解析一个为初始化的消息会抛出IOException。除此之外与optional一样。
optional:可以设置或不设置该字段。 如果未设置可选字段值,则使用默认值。
repeated:字段可能重复任意次数(包括零)。 重复值的顺序将保留在protocol buffer中。 将重复的字段视为动态大小的数组。(本列子中没有字段定义成repeated类型,定义成repeated类型其实就是java中List类型的字段。

慎重使用required类型,将required类型的字段更改为optional会有一些问题,而将optional类型的字段更改为required类型,则没有问题。

编译

使用protocol buffers编译器将对应的.proto文件编译成对应的类
关于编译器的安装,下载地址

请输入图片描述

设置环境变量

sudo vim /etc/profile
//在PATH上追加proto解压目录的bin目录下
export PATH=$PATH:/home/environmentLibrary/protoc-3.11.4-linux-x86_64/bin/
//刷新配置文件
source /etc/profile

进入项目目录,执行编译语句如下:

protoc --java_out=src/main/java  src/protobuf/Student.proto

--java_out后面第一个参数指定代码的路径,具体的包名在.proto文件中的java_package指定了,第二个指定要编译的proto文件。

自动生成的类名是DataInfo(在java_outer_classname中指定了),自动生成的类太长,这边就不列出来了

编写序列化反序列化测试类

//实际使用protobuf序列化框架客户端将对象转译成字节数组,然后通过协议传输到服务器端,服务器端可以是其他的语言框架(比如说python)将
//字节对象反编译成java对象
public class ProtoBufTest {
public static void main(String[] args) throws InvalidProtocolBufferException {
Person person = DataInfo.Person.newBuilder().setName("星空")
.setAge(18).setAddress("地球").build();
//将对象转译成字节数组,序列化
byte[] personByteArray = person.toByteArray();
//将字节数组转译成对象,反序列化
Person parse2 = DataInfo.Person.parseFrom(personByteArray);
System.out.println(parse2.getName());
System.out.println(parse2.getAge());
System.out.println(parse2.getAddress());
}
}

执行测试类,控制台打印:

星空
18
地球

Google Protobuf与netty结合

protobuf做为序列化的一种方式,序列化之后通过什么样的载体在网络中传输?

使用netty使得经过protobuf序列化的对象可以通过网络通信进行客户端和服务器的信息通信。客户端使用protobuf将对象序列化成字节码,而服务器端通过protobuf将对象反序列化成原本对象。

写一个使用Protobuf作为序列化框架,netty作为传输层的最简单的demo,需求描述:

  • 服务器的接收到客户端发送的数据并打印

.proto文件

syntax = "proto2";

package com.sakura.netty.sixtheample;

option optimize_for = SPEED;
option java_package = "com.sakura.netty.sixtheample";
option java_outer_classname = "MyDataInfo";

message Person{
required string name = 1;
optional int32 age = 2;
optional string address = 3;
}

使用编译器编译proto文件,生成MyDataInfo对象

服务端代码

服务端主启动类

public class TestServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new TestServerInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
System.out.println(e.getMessage());
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

初始化器 (Initializer)

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

//用于decode前解决半包和粘包问题(利用包头中的包含数组长度来识别半包粘包)
pipeline.addLast(new ProtobufVarint32FrameDecoder());
/*
ProtobufDecoder(MessageLite m) 是解码器,将protobuf构造出来的字节数组,转化成真正的对象
MessageLite(参数) 表示要转换/传输的类的实例

反序列化指定的Probuf字节数组为protobuf类型。
*/
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
//用于在序列化的字节数组前加上一个简单的包头,只包含序列化的字节长度。
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
//用于对Probuf类型序列化。
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestServerHandler());
}
}

自定义处理器 (Handler)

public class TestServerHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {
System.out.println(msg.getName());
System.out.println(msg.getAge());
System.out.println(msg.getAddress());
}
}

客户端代码

客户端主启动类

public class TestClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.handler(new TestClientInitializer());
//与对应的url建立连接通道
ChannelFuture channelFuture = bootstrap.connect("localhost",8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
}

初始化器 (Initializer)

public class TestClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new ProtobufVarint32FrameDecoder());
/*
ProtobufDecoder 是解码器,将protobuf构造出来的字节数组,转化成真正的对象
MessageLite(参数)表示要转换的类的实例
*/
pipeline.addLast(new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance()));
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new TestClientHandler());
}
}

自定义处理器 (Handler)

public class TestClientHandler extends SimpleChannelInboundHandler<MyDataInfo.Person> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.Person msg) throws Exception {

}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
MyDataInfo.Person person = MyDataInfo.Person.newBuilder()
.setName("星空").setAge(18).setAddress("宇宙").build();
ctx.channel().writeAndFlush(person);
}
}

测试

运行服务器端和客户端,服务器控制台打印:

五月 08, 2020 3:35:29 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x3a8afd81] REGISTERED
五月 08, 2020 3:35:29 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x3a8afd81] BIND: 0.0.0.0/0.0.0.0:8899
五月 08, 2020 3:35:29 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x3a8afd81, L:/0:0:0:0:0:0:0:0:8899] ACTIVE
五月 08, 2020 3:35:36 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x3a8afd81, L:/0:0:0:0:0:0:0:0:8899] READ: [id: 0xb56057d5, L:/127.0.0.1:8899 - R:/127.0.0.1:47564]
五月 08, 2020 3:35:36 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0x3a8afd81, L:/0:0:0:0:0:0:0:0:8899] READ COMPLETE
星空
18
宇宙

当前程序的问题

ProtobufDecoder是protobuf的解码器,而本程序new ProtobufDecoder(MyDataInfo.Person.getDefaultInstance())就表明了这个解码器只能解析MyDataInfo.Person这一个对象,如果客户端传递的protobuf对象是另一个对象比如MyDataInfo.Person2那么这个程序是无法解析的,此时客户端是无法发送其它类型的对象。

另外服务器端的Handler的泛型也定死了这件事情SimpleChannelInboundHandler<MyDataInfo.Person>这里也明确表明了接收的对象就是MyDataInfo.Person,如果传的是MyDataInfo.Person2则会造成泛型类型不匹配,也限制了多种类型的传递实现