什么是 RPC ?

img

RPC (Remote Procedure Call)即远程过程调用,是分布式系统常见的一种通信方法。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不用程序员显式编码这个远程调用的细节。

  • RPC就是从一台机器(客户端)上通过参数传递的方式调用另一台机器(服务器)上的一个函数或方法(可以统称为服务)并得到返回的结果。
  • RPC会隐藏底层的通讯细节(不需要直接处理Socket通讯或Http通讯)。
  • 客户端发起请求,服务器返回响应(类似于Http的工作方式)RPC在使用形式上像调用本地函数(或方法)一样去调用远程的函数(或方法)

最终解决的问题:让分布式或者微服务系统中不同服务之间的调用(远程调用)像本地调用一样简单!调用者感知不到远程调用的逻辑。为此rpc需要解决三个问题(实现的关键):

  • Call ID映射。我们怎么告诉远程机器(注册中心)我们要调用哪个函数呢
  • 序列化和反序列化。客户端怎么把参数值传给远程的函数呢?
  • 数据网络传输。远程调用往往是基于网络的,客户端和服务端是通过网络连接的。所有的数据都需要通过网络传输,因此就需要有一个网络传输层。

一个RPC框架要包含

  • 客户端和服务端建立网络连接模块( server模块、client模块 )
  • 服务端处理请求模块
  • 协议模块
  • 序列化反序列模块。
img

一个完整的RPC架构里面包含了四个核心的组件,分别是Client ,Server,Client Stub以及Server Stub,这个Stub可以理解为存根(调用与返回)。分别说说这几个组件:

  • 客户端(Client): 服务的调用方。
  • 服务端(Server):真正的服务提供者。
  • 客户端存根:存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
  • 服务端存根:接收客户端发送过来的消息,将消息解包,并调用本地的方法。
img

动态代理

代理模式是一种设计模式,能够使得在不修改源目标的前提下,额外扩展源目标的功能。即通过访问源目标的代理类,再由代理类去访问源目标。这样一来,要扩展功能,就无需修改源目标的代码了。只需要在代理类上增加就可以了。

其实代理模式的核心思想就是这么简单,在java中,代理又分静态代理和动态代理2种,其中动态代理根据不同实现又区分基于接口的的动态代理和基于子类的动态代理。

静态代理这种模式虽然好理解,但是缺点也很明显:

  • 会存在大量的冗余的代理类,这里演示了2个接口,如果有10个接口,就必须定义10个代理类。
  • 不易维护,一旦接口更改,代理类和目标类都需要更改。

Jdk中的动态代理

JDK中的动态代理是通过反射类Proxy以及InvocationHandler回调接口实现的,但是JDK中所有要进行动态代理的类必须要实现一个接口,也就是说只能对该类所实现接口中定义的方法进行代理,这在实际编程中有一定的局限性,而且使用反射的效率也不高

Cglib实现

使用cglib是实现动态代理,不受代理类必须实现接口的限制,因为cglib底层是用ASM框架,利用ASM框架,对代理对象类生成的class文件加载进来,通过修改其字节码生成子类来处理。比使用Java反射的效率要高,cglib不能对声明final的方法进行代理,因为cglib原理是动态生成被代理类的子类

什么时候用cglib什么时候用jdk动态代理?

1、目标对象生成了接口 默认用JDK动态代理

2、如果目标对象使用了接口,可以强制使用cglib

3、如果目标对象没有实现接口,必须采用cglib库,Spring会自动在JDK动态代理和cglib之间转换

JDK动态代理和cglib字节码生成的区别?

  • JDK动态代理只能对实现了接口的类生成代理,而不能针对类
  • Cglib是针对类实现代理,主要是对指定的类生成一个子类,覆盖其中的方法,并覆盖其中方法的增强,但是因为采用的是继承,所以该类或方法最好不要生成final,对于final类或方法,是无法继承的

Cglib比JDK快?

  • cglib底层是ASM字节码生成框架,但是字节码技术生成代理类,在JDL1.6之前比使用java反射的效率要高
  • 在jdk6之后逐步对JDK动态代理进行了优化,在调用次数比较少时效率高于cglib代理效率
  • 只有在大量调用的时候cglib的效率高,但是在1.8的时候JDK的效率已高于cglib
  • Cglib不能对声明final的方法进行代理,因为cglib是动态生成代理对象,final关键字修饰的类不可变只能被引用不能被修改

Spring如何选择是用JDK还是cglib?

  • 当bean实现接口时,会用JDK代理模式
  • 当bean没有实现接口,用cglib实现
  • 可以强制使用cglib(在spring配置中加入<aop:aspectj-autoproxy proxyt-target-class=”true”/>)

还有: 在jdk6、jdk7、jdk8逐步对JDK动态代理优化之后,在调用次数较少的情况下,JDK代理效率高于CGLIB代理效率,只有当进行大量调用的时候,jdk6和jdk7比CGLIB代理效率低一点,但是到jdk8的时候,jdk代理效率高于CGLIB代理。

动态代理,通俗点说就是:无需声明式的创建java代理类,而是在运行过程中生成”虚拟”的代理类,被ClassLoader加载。从而避免了静态代理那样需要声明大量的代理类。JDK从1.3版本就开始支持动态代理类的创建。主要核心类只有2个:java.lang.reflect.Proxyjava.lang.reflect.InvocationHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*处理代理实例上的方法并返回实例并返回结果,我们一般在此方法中添加一些额外的逻辑
* @param proxy: the proxy instance that the method was invoked on

* @param method the {@code Method} instance corresponding to
* the interface method invoked on the proxy instance. The declaring
* class of the {@code Method} object will be the interface that
* the method was declared in, which may be a superinterface of the
* proxy interface that the proxy class inherits the method through.

* @param args an array of objects containing the values of the
* arguments passed in the method invocation on the proxy instance,
* or {@code null} if interface method takes no arguments.
* Arguments of primitive types are wrapped in instances of the
* appropriate primitive wrapper class, such as
* {@code java.lang.Integer} or {@code java.lang.Boolean}.
*/

Object invoke(Object proxy, Method method, Object[] args)

/* @param loader: the class loader to define the proxy class
* @param interfaces: the list of interfaces for the proxy class to implement
* @param h : the invocation handler to dispatch method invocations to
*/
Object newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h)

//一般步骤为先new一个被代理对象,再生成调用处理器,最后得到代理对象的实例
//invoke()这个方法的第一个参数proxy可以用于返回,实现连续调用的效果

JDK的动态代理使用的最多的一种代理方式。也叫做接口代理。

JDK动态代理说白了只是根据接口”凭空“来生成类,至于具体的执行,都被代理到了InvocationHandler 的实现类里。上述例子我是需要继续执行原有bean的逻辑,才将原有的bean构造进来。只要你需要,你可以构造进任何对象到这个代理实现类。也就是说,你可以传入多个对象,或者说你什么类都不代理。只是为某一个接口”凭空“的生成多个代理实例,这多个代理实例最终都会进入InvocationHandler的实现类来执行某一个段共同的代码。

基于接口的代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public interface Person {
void wakeup();
void sleep();
}
public class JDKProxy implements InvocationHandler {

public JDKProxy(){ }
//处理代理实例上的方法并返回实例并返回结果
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("invoke执行");
String s="test";
return s;
}
}
public class test {
public static void main(String[] args) {
JDKProxy proxy = new JDKProxy();
Person personProxy=(Person)Proxy.newProxyInstance(Person.class.getClassLoader(),
new Class[]{Person.class}, proxy);
personProxy.sleep();
}
}

BIO与NIO

参考: https://developer.aliyun.com/article/769587#slide-12

  • BIO 适用于连接数比较小的业务场景,这样的话不至于系统中没有可用线程去处理请求。这种方式写的程序也比较简单直观,易于理解。
  • NIO 适用于连接数比较多并且请求消耗比较轻的业务场景,比如聊天服务器。这种方式相比 BIO,相对来说编程比较复杂。
  • AIO 适用于连接数比较多而且请求消耗比较重的业务场景,比如涉及 I/O 操作的相册服务器。这种方式相比另外两种,编程难度最大,程序也不易于理解。

BIO

BIO是传统的Java IO编程,其基本的类和接口在java.io包中
BIO(blocking I/O):同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
BIO方式使用于连接数目比较小且固定的架构,这种服务方式对服务器资源要求比价高,并且局限于应用中,JDK1.4以前的唯一选择,程序简单易理解

img

可以看出BIO编程的两个问题:

  1. 服务器端在监听客户端连接时(serverSocket.accept()),服务器端处于阻塞状态,不能处理其他事务
  2. 服务器端需要为每个客户端建立一个线程,虽然可以用线程池来优化,但在并发较大时,线程开销依旧很大
  3. 当连接的客户端没有发送数据时,服务器端会阻塞在read操作上,等待客户端输入,造成线程资源浪费

NIO

从JDK1.4开始,java提供了一系列改进输入/输出的新特性,统称为NIO,全称n为new I/O,是同步非阻塞的,所以也有人称为non-blocking I/O。NIO的相关类都放在java.nio包或其子包下,并对原先java.io包中许多类进行了改写。

img

NIO的三大核心

  1. 缓冲区(Buffer)

NIO是面向缓冲区, 或者说是面向块编程的。在NIO的IO传输中,数据会先读入到缓冲区,当需要时再从缓冲区写出,这样减少了直接读写磁盘的次数,提高了IO传输的效率。

缓冲区(buffer)本质上是一个可以读写数据的内存块,即在内存空间中预留了一定的存储空间,这些存储空间用来缓冲输入和输出的数据,这部分预留的存储空间就叫缓冲区。在NIO程序中,通道channel虽然负责数据的传输,但是输入和输出的数据都必须经过缓冲区buffer。

Buffer的常用子类(它们之间最大区别在于底层实现数组的数据类型):

  • ByteBuffer:存储字节数据到缓冲区
  • CharBuffer:存储字符数据到缓冲区
  • IntBuffer:存储整型数据到缓冲区
  • ShortBuffer:存储短整型数据到缓冲区
  • LongBuffer:存储长整型数据到缓冲区
  • FloatBuffer:存储浮点型数据到缓冲区
  • DoubleBuffer:存储双精度浮点型数据到缓冲区
  1. 通道(Channel)

在NIO程序中服务器端和客户端之间的数据读写不是通过流,而是通过通道来读写的。

通道类似于流,都是用来读写数据的,但它们之间也是有区别的:

  • 通道是双向的,即可以读也可以写,而流是单向的,只能读或写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲区读数据,也可以把数据写入缓冲区

java中channel的相关类在java.nio.channel包下。Channel是一个接口,其常用的实现类有:

  • FileChannel:用于文件的数据读写,其真正的实现类为FileChannelImpl
  • DatagramChannel:用于UDP的数据读写,其真正的实现类为DatagramChannelImpl
  • ServerSocketChannel:用于监听TCP连接,每当有客户端连接时都会创建一个SocketChannel,功能类似ServerSocket,其真正的实现类为ServerSocketChannelImpl
  • SocketChannel:用于TCP的数据读写,功能类似节点流+Socket,其真正的实现类为SocketChannelImpl
  1. 选择器(Selector)

在NIO程序中,可以用选择器Selector实现一个选择器处理多个通道,即一个线程处理多个连接。只要把通道注册到Selector上,就可以通过Selector来监测通道,如果通道有事件发生,便获取事件然后针对每个事件进行相应的处理。这样,只有在通道(连接)有真正的读/写事件发生时,才会进行读写操作,大大减少了系统开销,并且不必为每个连接创建单独线程,就不用去维护过多的线程。

如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。

选择器的相关类在java.nio.channels包和其子包下,顶层类是Selector,它是一个抽象类,它的常用方法有:

图片

NIO服务器端如何实现非阻塞?

服务器上所有Channel需要向Selector注册,而Selector则负责监视这些Socket的IO状态(观察者),当其中任意一个或者多个Channel具有可用的IO操作时,该Selector的select()方法将会返回大于0的整数,该整数值就表示该Selector上有多少个Channel具有可用的IO操作,并提供了selectedKeys()方法来返回这些Channel对应的SelectionKey集合(一个SelectionKey对应一个就绪的通道)。正是通过Selector,使得服务器端只需要不断地调用Selector实例的select()方法即可知道当前所有Channel是否有需要处理的IO操作。注:java NIO就是多路复用IO,jdk7之后底层是epoll模型。

NIO与BIO的对比

Linux IO模式 https://segmentfault.com/a/1190000003063859

阻塞式IO

preview

非阻塞IO

preview

异步IO

img

Reactor模型和Proactor模型

如何深刻理解Reactor和Proactor? - 知乎 (zhihu.com)

Reactor

常见的 Reactor 实现方案有三种。

  • 第一种方案单 Reactor 单进程 / 线程,不用考虑进程间通信以及数据同步的问题,因此实现起来比较简单,这种方案的缺陷在于无法充分利用多核 CPU,而且处理业务逻辑的时间不能太长,否则会延迟响应,所以不适用于CPU密集型的场景,适用于业务处理快速的场景,比如 Redis 采用的是单 Reactor 单进程的方案。

  • 第二种方案单 Reactor 多线程,通过多线程的方式解决了方案一的缺陷,但它离高并发还差一点距离,差在只有一个 Reactor 对象来承担所有事件的监听和响应,而且只在主线程中运行,在面对瞬间高并发的场景时,容易成为性能的瓶颈的地方。

  • 第三种方案多 Reactor 多进程 / 线程,通过多个 Reactor 来解决了方案二的缺陷,主 Reactor 只负责监听事件,响应事件的工作交给了从 Reactor,Netty 和 Memcache 都采用了「多 Reactor 多线程」的方案,Nginx 则采用了类似于 「多 Reactor 多进程」的方案。

    img

Proactor

前面提到的 Reactor 是非阻塞同步网络模式,而 Proactor 是异步网络模式。真正的异步 I/O 是「内核数据准备好」和「数据从内核态拷贝到用户态」这两个过程都不用等待

img

Reactor 可以理解为「来了事件操作系统通知应用进程,让应用进程来处理」,而 Proactor 可以理解为「来了事件操作系统来处理,处理完再通知应用进程」。

现在我们再来理解 Reactor 和 Proactor 的区别,

  • Reactor 是非阻塞同步网络模式,感知的是就绪可读写事件。在每次感知到有事件发生(比如可读就绪事件)后,就需要应用进程主动调用 read 方法来完成数据的读取,也就是要应用进程主动将 socket 接收缓存中的数据读到应用进程内存中,这个过程是同步的,读取完数据后应用进程才能处理数据。
  • Proactor 是异步网络模式, 感知的是已完成的读写事件。在发起异步读写请求时,需要传入数据缓冲区的地址(用来存放结果数据)等信息,这样系统内核才可以自动帮我们把数据的读写工作完成,这里的读写工作全程由操作系统来做,并不需要像 Reactor 那样还需要应用进程主动发起 read/write 来读写数据,操作系统完成读写工作后,就会通知应用进程直接处理数据。

不过,无论是 Reactor,还是 Proactor,都是一种基于「事件分发」的网络编程模式,区别在于 Reactor 模式是基于「待完成」的 I/O 事件,而 Proactor 模式则是基于「已完成」的 I/O 事件。

两个项目的难点

RPC项目

最初的实现

定义接口:

1
2
3
4
public interface ServiceRegistry {  
<T> void register(T service);
Object getService(String serviceName);
}

使用DefaultServiceRegistry 来实现这个接口,并提供服务。这里的重点是使用ConcurrentHashMap 来保存服务名与提供服务的对象的对应关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet();
@Override
public synchronized <T> void register(T service) {
String serviceName = service.getClass().getCanonicalName(); //获取服务的全类名
if(registeredService.contains(serviceName)) return;
registeredService.add(serviceName);
Class<?>[] interfaces = service.getClass().getInterfaces();
if(interfaces.length == 0) {
throw new RpcException(RpcError.SERVICE_NOT_IMPLEMENT_ANY_INTERFACE);
}
for(Class<?> i : interfaces) {
serviceMap.put(i.getCanonicalName(), service);
}
logger.info("向接口: {} 注册服务: {}", interfaces, serviceName);
}

@Override
public synchronized Object getService(String serviceName) {
Object service = serviceMap.get(serviceName);
if(service == null) {
throw new RpcException(RpcError.SERVICE_NOT_FOUND);
}
return service;
}
}

每一个请求处理线程要执行的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void run() {
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
//获取请求对象
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
//获取需要的服务名
String interfaceName = rpcRequest.getInterfaceName();
//获取服务
Object service = serviceRegistry.getService(interfaceName);
//使用动态代理来执行服务,并获取结果
Object result = requestHandler.handle(rpcRequest, service);

//将结果写回并刷新缓冲区
objectOutputStream.writeObject(RpcResponse.success(result));
objectOutputStream.flush();
} catch (IOException | ClassNotFoundException e) {
logger.error("调用或发送时有错误发生:", e);
}
}

rpc-core

注解类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
1 个 Annotation 和 1~n 个 ElementType 关联。ElementType.TYPE

public enum ElementType {
TYPE, /* 类、接口(包括注释类型)或枚举声明 */

FIELD, /* 字段声明(包括枚举常量) */

METHOD, /* 方法声明 */

PARAMETER, /* 参数声明 */

CONSTRUCTOR, /* 构造方法声明 */

LOCAL_VARIABLE, /* 局部变量声明 */

ANNOTATION_TYPE, /* 注释类型声明 */

PACKAGE /* 包声明 */
}


1 个 Annotation 和 1 个 RetentionPolicy 关联。
public enum RetentionPolicy {
SOURCE, /* Annotation信息仅存在于编译器处理期间,编译器处理完之后就没有该Annotation信息了 */

CLASS, /* 编译器将Annotation存储于类对应的.class文件中。默认行为 */

RUNTIME /* 编译器将Annotation存储于class文件中,并且可由JVM读入 */
}

public interface Annotation {

boolean equals(Object obj);

int hashCode();

String toString();

Class<? extends Annotation> annotationType();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 表示一个服务提供类,用于远程接口的实现类
* @author ziyang
* @Target(ElementType.TYPE) 的意思就是指定该 Annotation 的类型是 ElementType.TYPE。
* 这就意味着,MyAnnotation1 是来修饰"类、接口(包括注释类型)或枚举声明"的注解。
**/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {

public String name() default "";

}

这两个注解将在服务器端启动时,将我们用注解标注过的服务自动注册。具体的逻辑就是:

我们使用ReflectUtil.getStackTrace() 这个方法获取主类名以及具体的路径,并获取启动类的类对象。这个类对象中包含了标注的注解信息,其中就有ServiceScan注解,然后通过ServiceScan注解的value属性 获取服务所在的基包。最后我们扫描这个基包拥有的所有类(即服务),并获取它们的类对象以及实例对象,最后将其注册到注册中心去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void scanServices() {
String mainClassName = ReflectUtil.getStackTrace();
Class<?> startClass;
try {
startClass = Class.forName(mainClassName);
if(!startClass.isAnnotationPresent(ServiceScan.class)) {
logger.error("启动类缺少 @ServiceScan 注解");
throw new RpcException(RpcError.SERVICE_SCAN_PACKAGE_NOT_FOUND);
}
} catch (ClassNotFoundException e) {
logger.error("出现未知错误");
throw new RpcException(RpcError.UNKNOWN_ERROR);
}
String basePackage = startClass.getAnnotation(ServiceScan.class).value();
if("".equals(basePackage)) {
basePackage = mainClassName.substring(0, mainClassName.lastIndexOf("."));
}else{
basePackage = mainClassName.substring(0,mainClassName.lastIndexOf("."))+"."+basePackage;
}
Set<Class<?>> classSet = ReflectUtil.getClasses(basePackage);
for(Class<?> clazz : classSet) {
if(clazz.isAnnotationPresent(Service.class)) {
String serviceName = clazz.getAnnotation(Service.class).name();
Object obj;
try {
obj = clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
logger.error("创建 " + clazz + " 时有错误发生");
continue;
}
if("".equals(serviceName)) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> oneInterface: interfaces){
publishService(obj, oneInterface.getCanonicalName());
}
} else {
publishService(obj, serviceName);
}
}
}
}

编解码器

在传输过程中,我们可以在发送的数据上加上各种必要的数据,形成自定义的协议,而自动加上这个数据就是编码器的工作,解析数据获得原始数据就是解码器的工作。编解码器都继承了netty的编解码器的类

+—————+—————+—————–+————-+
| Magic Number | Package Type | Serializer Type | Data Length |
| 4 bytes | 4 bytes | 4 bytes | 4 bytes |
+—————+—————+—————–+————-+
| Data Bytes |
| Length: ${Data Length} |
+—————————————————————+

Data Length 就是实际数据的长度,设置这个字段主要防止粘包

编解码器的实现都非常简单,主要就是按照协议的格式来进行编码s和解码,其中会涉及到netty的知识以及序列化和反序列化的知识。

序列化和反序列化的实现方式需要我们自己去实现

一个简单的例子:使用Jkson工具实现Json方式的序列化和反序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class JsonSerializer implements CommonSerializer {

private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);

private ObjectMapper objectMapper = new ObjectMapper();

@Overrid
public byte[] serialize(Object obj) {
try {
return objectMapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
logger.error("序列化时有错误发生: {}", e.getMessage());
e.printStackTrace();
return null;
}
}

@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
try {
Object obj = objectMapper.readValue(bytes, clazz);
if(obj instanceof RpcRequest) {
obj = handleRequest(obj);
}
return obj;
} catch (IOException e) {
logger.error("反序列化时有错误发生: {}", e.getMessage());
e.printStackTrace();
return null;
}
}

/*
这里由于使用JSON序列化和反序列化Object数组,无法保证反序列化后仍然为原实例类型
需要重新判断处理
*/
private Object handleRequest(Object obj) throws IOException {
RpcRequest rpcRequest = (RpcRequest) obj;
for(int i = 0; i < rpcRequest.getParamTypes().length; i ++) {
Class<?> clazz = rpcRequest.getParamTypes()[i];
if(!clazz.isAssignableFrom(rpcRequest.getParameters()[i].getClass())) {
byte[] bytes = objectMapper.writeValueAsBytes(rpcRequest.getParameters()[i]);
rpcRequest.getParameters()[i] = objectMapper.readValue(bytes, clazz);
}
}
return rpcRequest;
}

@Override
public int getCode() {
return SerializerCode.valueOf("JSON").getCode();
}

}

序列化和反序列化都比较循规蹈矩,把对象翻译成字节数组,和根据字节数组和 Class 反序列化成对象。这里有一个需要注意的点,就是在 RpcRequest 反序列化时,由于其中有一个字段是 Object 数组,在反序列化时序列化器会根据字段类型进行反序列化,而 Object 就是一个十分模糊的类型,会出现反序列化失败的现象,这时就需要 RpcRequest 中的另一个字段 ParamTypes 来获取到 Object 数组中的每个实例的实际类,辅助反序列化,这就是 handleRequest() 方法的作用。

上面提到的这种情况不会在其他序列化方式中出现,因为其他序列化方式是转换成字节数组,会记录对象的信息,而 JSON 方式本质上只是转换成 JSON 字符串,会丢失对象的类型信息

NettyServer的实现:

NettyRpcServerHandler业务处理器
业务处理器处理三类事件:

  • 异常事件:直接关闭连接
  • 心跳事件:客户端会每隔5s发送一次心跳ping,如果服务端持续30s没有收到心跳消息,说明连接可能已失效,则关闭连接
  • 读事件:根据RpcMessage的属性确定接收到的消息类型:
    • 若是心跳ping,则返回心跳pong
    • 若是RpcRequest,则调用服务方法,返回调用结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NettyServer implements RpcServer {

private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

@Override
public void start(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 256)
.option(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CommonEncoder(new JsonSerializer()));
pipeline.addLast(new CommonDecoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
logger.error("启动服务器时有错误发生: ", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

NettyClient 的实现

客户端主要实现服务调用请求的发送和请求结果的接收

客户端通过Bootstrap创建,在pipeline中添加IdleStateHandler心跳处理器,编码解码器,NettyRpcClientHandler业务处理器

1)发送服务调用请求
发送服务调用请求的步骤:

  1. 根据服务名获取服务提供方的地址
  2. 根据地址创建与服务端的连接(连接利用一个Map进行存储,如果已经创建过了则复用该连接)
  3. 构建RpcMessage并通过该连接发送

2)请求结果的接收
难点:通过上述方法异步发送RpcRequest后,RpcResponse只能在NettyRpcClientHandler中通过read方法接收,那么该如何获取请求结果呢

解决方案:通过CompletableFuture异步获取请求

步骤:

  1. 为每个请求创建一个CompletableFuture
  2. 用一个ConcurrentHashMap保存已发送且未收到回复的请求(key为requestId,value为该请求的CompletableFuture
  3. 通过上述发送请求时,在该Map中存入该请求的CompletableFuture,且方法返回该Future
    NettyRpcClientHandler收到服务器返回的调用结果后,从Map中移除该CompletableFuture,并为该Future设置好调用结果
  4. 这样调用方就可以通过CompletableFuture.get()获取到调用结果了(如果未设置结果则一直阻塞)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    @Override
public CompletableFuture<RpcResponse> sendRequest(RpcRequest rpcRequest) {
if (serializer == null) {
logger.error("未设置序列化器");
throw new RpcException(RpcError.SERIALIZER_NOT_FOUND);
}
CompletableFuture<RpcResponse> resultFuture = new CompletableFuture<>();
try {
//利用服务发现,在nacos注册中心找到可用的服务地址,并将请求发送到这个地址去请求服务。
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName());
Channel channel = ChannelProvider.get(inetSocketAddress, serializer);
if (!channel.isActive()) {
group.shutdownGracefully();
return null;
}
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
logger.info(String.format("客户端发送消息: %s", rpcRequest.toString()));
} else {
future1.channel().close();
resultFuture.completeExceptionally(future1.cause());
logger.error("发送消息时有错误发生: ", future1.cause());
}
});
} catch (InterruptedException e) {
unprocessedRequests.remove(rpcRequest.getRequestId());
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
return resultFuture;
}
}

channel 将 RpcRequest 对象写出,并且等待服务端返回的结果。注意这里的发送是非阻塞的,所以发送后会立刻返回,而无法得到结果。这里通过 AttributeKey 的方式阻塞获得返回结果:

1
2
3
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = channel.attr(key).get();

通过这种方式获得全局可见的返回结果,在获得返回结果 RpcResponse 后,将这个对象以 key 为 rpcResponse 放入 ChannelHandlerContext 中,这里就可以立刻获得结果并返回,我们会在 NettyClientHandler 中看到放入的过程。

NettyServerHandler 和 NettyClientHandler

NettyServerHandler 和 NettyClientHandler 都分别位于服务器端和客户端责任链的尾部,直接和 RpcServer 对象或 RpcClient 对象打交道,而无需关心字节序列的情况。

  • NettyServerhandler 用于接收 RpcRequest,并且执行调用,将调用结果返回封装成 RpcResponse 发送出去。
  • NettyClientHandler用于接受响应包,并将响应包向上传递。

kryo序列化方法

我们将kryo对象放在ThreadLocal对象中,一个线程一个kryo。

序列化时:先创建一个 Output 对象(Kryo 框架的概念),接着使用 writeObject 方法将对象写入 Output 中,最后调用 Output 对象的 toByte() 方法即可获得对象的字节数组

反序列化时:是从 Input 对象中直接 readObject,这里只需要传入对象的类型,而不需要具体传入每一个属性的类型信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class KryoSerializer implements CommonSerializer {

private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);

private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
kryo.setReferences(true);
kryo.setRegistrationRequired(false);
return kryo;
});

@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)){
Kryo kryo = kryoThreadLocal.get();
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
logger.error("序列化时有错误发生:", e);
throw new SerializeException("序列化时有错误发生");
}
}

@Override
public Object deserialize(byte[] bytes, Class<?> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return o;
} catch (Exception e) {
logger.error("反序列化时有错误发生:", e);
throw new SerializeException("反序列化时有错误发生");
}
}

@Override
public int getCode() {
return SerializerCode.valueOf("KRYO").getCode();
}
}

几种序列化方式的优劣

实现了 JSON、Kryo、和 Protobuf 的序列化。

JSON 是一种轻量级的数据交换语言,该语言以易于让人阅读的文字为基础,用来传输由属性值或者序列性的值组成的数据对象,类似 xml,Json 比 xml更小、更快更容易解析。JSON 由于采用字符方式存储,占用相对于字节方式较大,并且序列化后类的信息会丢失,可能导致反序列化失败。

剩下的都是基于字节的序列化。

Kryo 是一个快速高效的 Java 序列化框架,旨在提供快速、高效和易用的 API。无论文件、数据库或网络数据 Kryo 都可以随时完成序列化。 Kryo 还可以执行自动深拷贝、浅拷贝。这是对象到对象的直接拷贝,而不是对象->字节->对象的拷贝。kryo 速度较快,序列化后体积较小,但是跨语言支持较复杂。

protobuf(Protocol Buffers)是由 Google 发布的数据交换格式,提供跨语言、跨平台的序列化和反序列化实现,底层由 C++ 实现,其他平台使用时必须使用 protocol compiler 进行预编译生成 protoc 二进制文件。性能主要消耗在文件的预编译上。序列化反序列化性能较高,平台无关。

jdk 自带对象序列化类

  • 1.无法跨语言。这应该是java序列化最致命的问题了。由于java序列化是java内部私有的协议,其他语言不支持,导致别的语言无法反序列化,这严重阻碍了它的应用。
  • 2.序列后的码流太大,java序列化的大小是二进制编码的5倍多!
  • 3.序列化性能太低。java序列化的性能只有二进制编码的6.17倍,可见java序列化性能实在太差了。

Thrift 方式

优点

  1. 序列化和RPC支持一站式解决,比pb更方便 。
  2. 跨语言,IDL接口定义语言,自动生成多语言文件 。
  3. 省流量,体积较小 。
  4. 包含完整的客户端/服务端堆栈,可快速实现RPC 。
  5. 为服务端提供了多种工作模式,如线程池模型、非阻塞模型。

缺点

  1. 不支持双通道 。
  2. rpc方法非线程安全,服务器容易被挂死,需要串行化。
  3. 默认不具备动态特性(可以通过动态定义生成消息类型或者动态编译支持)
  4. 开发环境、编译较麻烦。

使用Nacos作为服务器注册与发现

几种注册中心的对比

我们之前的注册与发现是用一个ConcurrentHashMap来进行存储的。这存在一个隐患,当我们将服务端地址固化在代码中,对于客户端来说,它只会去寻找一个服务提供者,但如果这个提供者挂掉了或者换了地址,那就没有办法了。

在分布式架构中,有一个重要的组件,就是服务注册中心,它用于保存多个服务提供者的信息,每个服务提供者在启动时都需要向注册中心注册自己所拥有的服务。这样客户端在发起 RPC 时,就可以直接去向注册中心请求服务提供者的信息,如果拿来的这个挂了,还可以重新请求,并且在这种情况下可以很方便地实现负载均衡。

在最初的实现中,我们使用了本地保存服务的类称为 ServiceRegistry,现在更改为 ServiceProvider,而 ServiceRegistry 作为远程注册表(Nacos)使用,对应的类名也有修改。我们定义一个接口:

1
2
3
4
public interface ServiceRegistry {
void register(String serviceName, InetSocketAddress inetSocketAddress);
InetSocketAddress lookupService(String serviceName);
}

两个方法很好理解,register 方法将服务的名称和地址注册进服务注册中心,lookupService 方法则是根据服务名称从注册中心获取到一个服务提供者的地址。

我们让NacosServiceRegistry实现这个接口,以供服务器调用,注册服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class NacosServiceRegistry implements ServiceRegistry {

private static final Logger logger = LoggerFactory.getLogger(NacosServiceRegistry.class);

private static final String SERVER_ADDR = "127.0.0.1:8848";
private static final NamingService namingService;

static {
try {
namingService = NamingFactory.createNamingService(SERVER_ADDR);
} catch (NacosException e) {
logger.error("连接到Nacos时有错误发生: ", e);
throw new RpcException(RpcError.FAILED_TO_CONNECT_TO_SERVICE_REGISTRY);
}
}

@Override
public void register(String serviceName, InetSocketAddress inetSocketAddress) {
try {
namingService.registerInstance(serviceName, inetSocketAddress.getHostName(), inetSocketAddress.getPort());
} catch (NacosException e) {
logger.error("注册服务时有错误发生:", e);
throw new RpcException(RpcError.REGISTER_SERVICE_FAILED);
}
}

@Override
public InetSocketAddress lookupService(String serviceName) {
try {
List<Instance> instances = namingService.getAllInstances(serviceName);
Instance instance = instances.get(0);
return new InetSocketAddress(instance.getIp(), instance.getPort());
} catch (NacosException e) {
logger.error("获取服务时有错误发生:", e);
}
return null;
}
}

自动注销服务和负载均衡策略

自动注销服务

如果你启动完成服务端后把服务端给关闭了,并不会自动地注销 Nacos 中对应的服务信息,这样就导致了当客户端再次向 Nacos 请求服务时,会获取到已经关闭的服务端信息,最终就有可能因为连接不到服务器而调用失败。

那么我们就需要一种办法,在服务端关闭之前自动向 Nacos 注销服务。但是有一个问题,我们不知道什么时候服务器会关闭,也就不知道这个方法调用的时机,就没有办法手工去调用。这时,我们就需要钩子。

钩子是什么呢?是在某些事件发生后自动去调用的方法。那么我们只需要把注销服务的方法写到关闭系统的钩子方法里就行了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//钩子函数    
public static void clearRegistry() {
if(!serviceNames.isEmpty() && address != null) {
String host = address.getHostName();
int port = address.getPort();
Iterator<String> iterator = serviceNames.iterator();
while(iterator.hasNext()) {
String serviceName = iterator.next();
try {
namingService.deregisterInstance(serviceName, host, port);
} catch (NacosException e) {
logger.error("注销服务 {} 失败", serviceName, e);
}
}
}
}

使用了单例模式创建其对象,在 addClearAllHook 中,Runtime 对象是 JVM 虚拟机的运行时环境,调用其addShutdownHook方法增加一个钩子函数,创建一个新线程调用 clearRegistry 方法完成注销工作。这个钩子函数会在 JVM 关闭之前被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ShutdownHook {

private static final Logger logger = LoggerFactory.getLogger(ShutdownHook.class);

private final ExecutorService threadPool = ThreadPoolFactory.createDefaultThreadPool("shutdown-hook");
private static final ShutdownHook shutdownHook = new ShutdownHook();

public static ShutdownHook getShutdownHook() {
return shutdownHook;
}

public void addClearAllHook() {
logger.info("关闭后将自动注销所有服务");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
NacosUtil.clearRegistry();
threadPool.shutdown();
}));
}

}

负载均衡策略

一个客户端的负载均衡器, 使用自己定义的负载均衡器(随机算法,轮转算法),当我们在客户端需要某个服务的时候,就利用服务发现机制,以及负载均衡算法得到一个适合的服务地址,然后向对应地址的服务器发起请求。

rpc-client,rpc-server

保证通用性,我们定义两个接口:

1
2
3
4
5
6
7
public interface RpcServer {
void start(int port);
}

public interface RpcClient {
Object sendRequest(RpcRequest rpcRequest);
}

一、Netty简介与IO模型 - 知乎 (zhihu.com)

RPC调用过程

  1. 首先服务器端启动,通过我们自定义的注解将服务地址等信息注册到注册中心,客户端要发起请求,它首先会创建一个请求的实例,客户端组装的请求实例中包含这样几个属性,调用接口名称,方法名称,方法参数,参数类型,是否心跳包。

  2. 它首先会来到注册中心,根据请求实例对象,获取所有对应的服务实例信息,然后通过本地的负载均衡算法选择一个合适的服务器来发起请求。

  3. 在发起请求之前,客户端通过netty来与服务器端建立连接(通过异步的方式),然后得到相应的channel。之后这个客户端与服务器的交互都是通过这个channel来实现。

  4. 得到channel之后,客户端在发起请求,这个请求首先经过客户端的netty的出站处理器的处理。在客户端的出栈处理器里包含编码器(编码器中进行序列化),空闲时处理器。经过了出栈处理器的处理后。请求会到达服务器端。

  5. 请求到达服务器端,服务器端其实也有一个被注册到工作线程组中某个事件循环的selector的channel。它用于与客户端的相应的channel通信。channel事件消息在ChannelPipeline中流动和传播,相应的事件能够被ChannelHandler拦截处理、传递、忽略或者终止 。 客户端发起的请求报文会经过该channel入栈处理器的处理。主要是在解码器中进行解码和反序列化。

  6. 解码器将请求报文解析成一个request类,它会被channelPipeline的末端NettyServerHandler处理。在这个处理器中调用请求的方法,我们在服务器端会将所有的服务和它的全类名存入到一个Map的集合中,在处理器中我们通过全类名得到服务实例对象,然后通过实例对象的Class对象获取方法的实例,最后通过调用method.invoke方法来得到最终的结果。

  7. 我们将结果封装为一个response对象,通过ChannelHandlerContext对象将响应包写回发送给客户端,当然这个过程也要经过服务器端的出站处理器的处理,主要是在编码器中进行编码和序列化。

netty

(229条消息) 吃透Netty源码系列一之NioEventLoopGroup_王伟王胖胖的博客-CSDN博客

Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty 基于 NIO 的,封装了 JDK 的 NIO,让我们使用起来更加方法灵活。

特点和优势:

  • 使用简单:封装了 NIO 的很多细节,使用更简单。
  • 功能强大:预置了多种编解码功能,支持多种主流协议。
  • 定制能力强:可以通过 ChannelHandler 对通信框架进行灵活地扩展。
  • 性能高:通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优。

为什么 Netty 性能高

  • IO 线程模型:同步非阻塞,用最少的资源做更多的事。
  • 内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
  • 内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
  • 串行化处理读写:避免使用锁带来的性能开销。
  • 高性能序列化协议:支持 protobuf 等高性能序列化协议。

Netty是一个NIO框架,它将IO通道的建立、可读、可写等状态变化,抽象成事件,以责任链的方式进行传递,可以在处理链上插入自定义的Handler,对感兴趣的事件进行监听和处理。Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。

Netty 推荐使用主从Reactor模型,Netty 主线程组BossGroup用于客户端的监听,并将监听到的客户端Channel注册到从线程组WorkGroup上的一个NIOEventLoop的Selector上,这个NIOEventLoop主要用于和客户端数据的 Read / Write,客户端和服务端会在数据传输的管道ChannelPipeline中调用InboundHandler/OutboundHandler对数据进行处理。

在netty中I/O操作都是异步执行,所以任何的I/O调用都将立即返回。netty为我们提供了一个ChannelFuture实例,这个实例将会返回关于I/O操作结果或者状态。我们可以通过 addListener()为ChannelFuture添加监听器,这样当相关的操作执行结束之后就会发送消息给监听器。

  • Netty 是由 JBOSS 提供的一个 Java 开源框架, 现为 Github 上的独立项目。
  • Netty 是一个异步的、 基于事件驱动的网络应用框架, 用以快速开发高性能、 高可靠性的网络 IO 程序。
  • Netty 本质是一个 NIO 框架, 适用于服务器通讯相关的多种应用场景。

image.png

在这里插入图片描述

Netty pipeline 分析:

参考: https://www.jianshu.com/p/6efa9c5fa702

说下 Netty 零拷贝

Netty 的零拷贝主要包含三个方面:

  • Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
  • Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。
  • Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。

Netty 中责任链

首先说明责任链模式:

适用场景:

  • 对于一个请求来说,如果有个对象都有机会处理它,而且不明确到底是哪个对象会处理请求时,我们可以考虑使用责任链模式实现它,让请求从链的头部往后移动,直到链上的一个节点成功处理了它为止

优点:

  • 发送者不需要知道自己发送的这个请求到底会被哪个对象处理掉,实现了发送者和接受者的解耦
  • 简化了发送者对象的设计
  • 可以动态的添加节点和删除节点

缺点:

  • 所有的请求都从链的头部开始遍历,对性能有损耗
  • 极差的情况,不保证请求一定会被处理

Netty的责任链:

netty 的 pipeline 设计,就采用了责任链设计模式, 底层采用双向链表的数据结构, 将链上的各个处理器串联起来

客户端每一个请求的到来,netty 都认为,pipeline 中的所有的处理器都有机会处理它,因此,对于入栈的请求,全部从头节点开始往后传播,一直传播到尾节点(来到尾节点的 msg 会被释放掉)。

责任终止机制

  • 在pipeline中的任意一个节点,只要我们不手动的往下传播下去,这个事件就会终止传播在当前节点
  • 对于入站数据,默认会传递到尾节点,进行回收,如果我们不进行下一步传播,事件就会终止在当前节点

简单说下 Netty 中的重要组件

  • Channel:Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。
  • EventLoop:主要是配合 Channel 处理 I/O 操作,用来处理连接的生命周期中所发生的事情。
  • ChannelFuture:Netty 框架中所有的 I/O 操作都为异步的,因此我们需要 ChannelFuture 的 addListener()注册一个 ChannelFutureListener 监听事件,当操作执行成功或者失败时,监听就会自动触发返回结果。
  • ChannelHandler:充当了所有处理入站和出站数据的逻辑容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
  • ChannelPipeline:为 ChannelHandler 链提供了容器,当 channel 创建时,就会被自动分配到它专属的 ChannelPipeline,这个关联是永久性的。

netty粘包和拆包问题

Netty 中的粘包和拆包 - rickiyang - 博客园 (cnblogs.com)

在RPC框架中,粘包和拆包问题是必须解决一个问题,因为RPC框架中,各个微服务相互之间都是维系了一个TCP长连接,比如dubbo就是一个全双工的长连接。由于微服务往对方发送信息的时候,所有的请求都是使用的同一个连接,这样就会产生粘包和拆包的问题

出现粘包、拆包现象的根本原因

1、客户端要发送的数据小于TCP发送缓冲区的大小,TCP为了提升效率,将多个写入缓冲区的数据包一次发送出去,多个数据包粘在一起,造成粘包;
2、服务端的应用层没有及时处理接收缓冲区中的数据,再次进行读取时出现粘包问题;
3、数据发送过快,数据包堆积导致缓冲区积压多个数据后才一次性发送出去;
4、拆包一般由于一次发送的数据包太大,超过MSS的大小,那么这个数据包就会被拆成多个TCP报文分开进行传输。

1. 定长协议

指定一个报文具有固定长度。比如约定一个报文的长度是 5 字节,那么:

报文:1234,只有4字节,但是还差一个怎么办呢,不足部分用空格补齐。就变为:1234 。

如果不补齐空格,那么就会读到下一个报文的字节来填充上一个报文直到补齐为止,这样粘包了。

定长协议的优点是使用简单,缺点很明显:浪费带宽。

Netty 中提供了 FixedLengthFrameDecoder ,支持把固定的长度的字节数当做一个完整的消息进行解码。

2. 特殊字符分割协议

很好理解,在每一个你认为是一个完整的包的尾部添加指定的特殊字符,比如:\n,\r等等。

需要注意的是:约定的特殊字符要保证唯一性,不能出现在报文的正文中,否则就将正文一分为二了。

Netty 中提供了 DelimiterBasedFrameDecoder 根据特殊字符进行解码,LineBasedFrameDecoder默认以换行符作为分隔符。

3. 变长协议

变长协议的核心就是:将消息分为消息头和消息体,消息头中标识当前完整的消息体长度。

  1. 发送方在发送数据之前先获取数据的二进制字节大小,然后在消息体前面添加消息大小;
  2. 接收方在解析消息时先获取消息大小,之后必须读到该大小的字节数才认为是完整的消息。

Netty 中提供了 LengthFieldBasedFrameDecoder ,通过LengthFieldPrepender 来给实际的消息体添加 length 字段。

如何解决 TCP 的粘包拆包问题

TCP 是以流的方式来处理数据,一个完整的包可能会被 TCP 拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送。

TCP 粘包/分包的原因:应用程序写入的字节大小大于套接字发送缓冲区的大小,会发生拆包现象,而应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包现象;

Netty 自带解决方式:

  • 消息定长:FixedLengthFrameDecoder 类
  • 包尾增加特殊字符分割:
    • 行分隔符类:LineBasedFrameDecoder
    • 自定义分隔符类 :DelimiterBasedFrameDecoder
  • 将消息分为消息头和消息体:LengthFieldBasedFrameDecoder 类。分为有头部的拆包与粘包、长度字段在前且有头部的拆包与粘包、多扩展头部的拆包与粘包。

框架解决方式:

自定义协议,其中有字段标明包长度。

netty心跳机制

在RpcRequest中,我们增加了一个属性heartBeat ,使用这个属性在NettyClientHandler中我们定义了一个触发器,这个触发器会在ChannelProvider类的get方法中,我们绑定了这样一个处理器:addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))

1
2
3
4
5
6
7
8
9
10
11
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/*自定义序列化编解码器*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new CommonEncoder(serializer))
.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS))
.addLast(new CommonDecoder())
.addLast(new NettyClientHandler());
}
});

IdleStateHandler 是实现心跳的关键, 它会根据不同的 IO idle 类型来产生不同的 IdleStateEvent 事件,而这个事件的捕获, 其实就是在 userEventTriggered 方法中实现的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//当出现空闲时间的触发器
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
logger.info("发送心跳包 [{}]", ctx.channel().remoteAddress());
Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress(), CommonSerializer.getByCode(CommonSerializer.DEFAULT_SERIALIZER));
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setHeartBeat(true);
channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}

当IdleStateHandler产生这样一个事件时,上述方法会创建一个RpcRequest对象,并将它的HearBeat属性置位true,并使用channel.writeAndFlush()方法发送出去

在服务端,当接收到一个请求时,它会首先判断是否是心跳包,如果是心跳包就直接返回,如果不是,则调用对应的服务

Netty 是如何保持长连接的

首先 TCP 协议的实现中也提供了 keepAlive 报文用来探测对端是否可用。TCP 层将在定时时间到后发送相应的 KeepAlive 探针以确定连接可用性。

ChannelOption.SO_KEEPALIVE, true 表示打开 TCP 的 keepAlive 设置。

TCP 心跳的问题:

考虑一种情况,某台服务器因为某些原因导致负载超高,CPU 100%,无法响应任何业务请求,但是使用 TCP 探针则仍旧能够确定连接状态,这就是典型的连接活着但业务提供方已死的状态,对客户端而言,这时的最好选择就是断线后重新连接其他服务器,而不是一直认为当前服务器是可用状态一直向当前服务器发送些必然会失败的请求。

Netty 中提供了 IdleStateHandler 类专门用于处理心跳。

IdleStateHandler 的构造函数如下:

复制代码

1
2
3
public IdleStateHandler(long readerIdleTime, long writerIdleTime, 
long allIdleTime,TimeUnit unit){
}

第一个参数是隔多久检查一下读事件是否发生,如果 channelRead() 方法超过 readerIdleTime 时间未被调用则会触发超时事件调用 userEventTrigger() 方法;

第二个参数是隔多久检查一下写事件是否发生,writerIdleTime 写空闲超时时间设定,如果 write() 方法超过 writerIdleTime 时间未被调用则会触发超时事件调用 userEventTrigger() 方法;

第三个参数是全能型参数,隔多久检查读写事件;

第四个参数表示当前的时间单位。

所以这里可以分别控制读,写,读写超时的时间,单位为秒,如果是0表示不检测,所以如果全是0,则相当于没添加这个 IdleStateHandler,连接是个普通的短连接。

一致性哈希算法

(229条消息) 一致性哈希算法原理详解_张维鹏的博客-CSDN博客_一致性哈希

(229条消息) 一致性哈希算法详解_邋遢的流浪剑客的博客-CSDN博客_一致性哈希算法

是一种特殊的哈希算法,目的是解决分布式缓存的问题。 在移除或者添加一个服务器时,能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系。一致性哈希解决了简单哈希算法在分布式哈希表( Distributed Hash Table,DHT) 中存在的动态伸缩等问题 。

在分布式系统中应用非常广泛。一致性哈希是一种哈希算法,简单地说在移除或者添加一个服务器时,此算法能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系,尽可能满足单调性的要求。在普通分布式集群中,服务请求与处理请求服务器之间可以一一对应,也就是说固定服务请求与处理服务器之间的映射关系,某个请求由固定的服务器去处理。这种方式无法对整个系统进行负载均衡,可能会造成某些服务器过于繁忙以至于无法处理新来的请求。而另一些服务器则过于空闲,整体系统的资源利用率低,并且当分布式集群中的某个服务器宕机,会直接导致某些服务请求无法处理 。

一致性哈希算法将整个哈希值空间映射成一个虚拟的圆环,整个哈希空间的取值范围为0232-1。整个空间按顺时针方向组织。0232-1在零点中方向重合。接下来使用如下算法对服务请求进行映射,将服务请求使用哈希算法算出对应的hash值,然后根据hash值的位置沿圆环顺时针查找,第一台遇到的服务器就是所对应的处理请求服务器。当增加一台新的服务器,受影响的数据仅仅是新添加的服务器到其环空间中前一台的服务器(也就是顺着逆时针方向遇到的第一台服务器)之间的数据,其他都不会受到影响。综上所述,一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性

一致性哈希算法是在哈希算法基础上提出的,在动态变化的分布式环境中,哈希算法应该满足的几个条件:平衡性、单调性和分散性

  1. 平衡性(Balance):平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。很多哈希算法都能够满足这一条件。

  2. 单调性(Monotonicity):单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲加入到系统中。哈希的结果应能够保证原有已分配的内容可以被映射到原有的或者新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区。

  3. 分散性(Spread):在分布式环境中,终端有可能看不到所有的缓冲,而是只能看到其中的一部分。当终端希望通过哈希过程将内容映射到缓冲上时,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。这种情况显然是应该避免的,因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率。分散性的定义就是上述情况发生的严重程度。好的哈希算法应能够尽量避免不一致的情况发生,也就是尽量降低分散性。

数据倾斜问题

在一致性Hash算法服务节点太少的情况下,容易因为节点分布不均匀面造成数据倾斜(被缓存的对象大部分缓存在某一台服务器上)问题,如图特例

在这里插入图片描述

这时我们发现有大量数据集中在节点A上,而节点B只有少量数据。在极端情况下,假如A节点出现故障,存储在A上的数据要全部转移到B上,大量的数据导可能会导致节点B的崩溃,之后A和B上所有的数据向节点C迁移,导致节点C也崩溃,由此导致整个集群宕机。这种情况被称为雪崩效应。为了解决数据倾斜问题,一致性Hash算法引入了虚拟节点机制,即对每一个服务器节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体操作可以为服务器IP或主机名后加入编号来实现数据定位算法不变,只需要增加一步:虚拟节点到实际点的映射。所以加入虚拟节点之后,即使在服务节点很少的情况下,也能做到数据的均匀分布。

服务之间的调用为啥不直接用 HTTP 而用 RPC?

RPC 只是一种概念、一种设计,就是为了解决 不同服务之间的调用问题, 它一般会包含有 传输协议序列化协议 这两个。但是,HTTP 是一种协议,RPC框架可以使用 HTTP协议作为传输协议或者直接使用TCP作为传输协议,使用不同的协议一般也是为了适应不同的场景。

服务发现

首先要向某个服务器发起请求,你得先建立连接,而建立连接的前提是,你得知道 IP 地址和端口 。这个找到服务对应的 IP 端口的过程,其实就是 服务发现

HTTP 中,你知道服务的域名,就可以通过 DNS 服务 去解析得到它背后的 IP 地址,默认 80 端口

RPC 的话,就有些区别,一般会有专门的中间服务去保存服务名和 IP 信息,比如 Consul、Etcd、Nacos、ZooKeeper,甚至是 Redis。想要访问某个服务,就去这些中间服务去获得 IP 和端口信息。由于 DNS 也是服务发现的一种,所以也有基于 DNS 去做服务发现的组件,比如 CoreDNS

可以看出服务发现这一块,两者是有些区别,但不太能分高低。

底层连接形式

以主流的 HTTP1.1 协议为例,其默认在建立底层 TCP 连接之后会一直保持这个连接(keep alive),之后的请求和响应都会复用这条连接。

RPC 协议,也跟 HTTP 类似,也是通过建立 TCP 长链接进行数据交互,但不同的地方在于,RPC 协议一般还会再建个 连接池,在请求量大的时候,建立多条连接放在池内,要发数据的时候就从池里取一条连接出来,用完放回去,下次再复用,可以说非常环保。

connection_pool

由于连接池有利于提升网络请求性能,所以不少编程语言的网络库里都会给 HTTP 加个连接池,比如 Go 就是这么干的。

可以看出这一块两者也没太大区别,所以也不是关键。

传输的内容

基于 TCP 传输的消息,说到底,无非都是 消息头 Header 和消息体 Body。

Header 是用于标记一些特殊信息,其中最重要的是 消息体长度

Body 则是放我们真正需要传输的内容,而这些内容只能是二进制 01 串,毕竟计算机只认识这玩意。所以 TCP 传字符串和数字都问题不大,因为字符串可以转成编码再变成 01 串,而数字本身也能直接转为二进制。但结构体呢,我们得想个办法将它也转为二进制 01 串,这样的方案现在也有很多现成的,比如 JSON,Protocol Buffers (Protobuf)

这个将结构体转为二进制数组的过程就叫 序列化 ,反过来将二进制数组复原成结构体的过程叫 反序列化

序列化和反序列化

对于主流的 HTTP1.1,虽然它现在叫超文本协议,支持音频视频,但 HTTP 设计 初是用于做网页文本展示的,所以它传的内容以字符串为主。Header 和 Body 都是如此。在 Body 这块,它使用 JSON序列化 结构体数据。

我们可以随便截个图直观看下。

HTTP报文

可以看到这里面的内容非常多的冗余,显得非常啰嗦。最明显的,像 Header 里的那些信息,其实如果我们约定好头部的第几位是 Content-Type,就不需要每次都真的把 Content-Type 这个字段都传过来,类似的情况其实在 Body 的 JSON 结构里也特别明显。

而 RPC,因为它定制化程度更高,可以采用体积更小的 Protobuf 或其他序列化协议去保存结构体数据,同时也不需要像 HTTP 那样考虑各种浏览器行为,比如 302 重定向跳转啥的。因此性能也会更好一些,这也是在公司内部微服务中抛弃 HTTP,选择使用 RPC 的最主要原因。

HTTP原理

RPC原理

当然上面说的 HTTP,其实 特指的是现在主流使用的 HTTP1.1HTTP2在前者的基础上做了很多改进,所以 性能可能比很多 RPC 协议还要好,甚至连gRPC底层都直接用的HTTP2

总结

  • HTTP里面的内容非常多的冗余,显得非常啰嗦。最明显的,像 Header 里的那些信息, RPC,因为它定制化程度更高,可以采用体积更小的 Protobuf 或其他序列化协议去保存结构体数据,同时也不需要像 HTTP 那样考虑各种浏览器行为,比如 302 重定向跳转啥的。因此性能也会更好一些,这也是在公司内部微服务中抛弃 HTTP,选择使用 RPC 的最主要原因。
  • 纯裸 TCP 是能收发数据,但它是个无边界的数据流,上层需要定义消息格式用于定义 消息边界 。于是就有了各种协议,HTTP 和各类 RPC 协议就是在 TCP 之上定义的应用层协议。
  • 从发展历史来说,HTTP 主要用于 B/S 架构,而 RPC 更多用于 C/S 架构。但现在其实已经没分那么清了,B/S 和 C/S 在慢慢融合。 很多软件同时支持多端,所以对外一般用 HTTP 协议,而内部集群的微服务之间则采用 RPC 协议进行通讯。
  • RPC 其实比 HTTP 出现的要早,且比目前主流的 HTTP1.1 性能要更好,所以大部分公司内部都还在使用 RPC。
  • HTTP2.0HTTP1.1 的基础上做了优化,性能可能比很多 RPC 协议都要好,但由于是这几年才出来的,所以也不太可能取代掉 RPC。