ea-rpc笔记

简单实例和流程

1.基本概念

什么是RPC?

专业定义:RPC(Remote Procedure Call)即远程过程调用,是一种计算机通信协议,它允许程序在不同的计算机之间进行通信和交互,就像本地调用一样。

为什么需要 RPC?

RPC 允许一个程序(称为服务消费者)像调用自己程序的方法一样,调用另一个程序(称为服务提供者)的接口,而不需要了解数据的传输处理过程、底层网络通信的细节等。这些都会由 RPC 框架帮你完成,使得开发者可以轻松调用远程服务,快速开发分布式系统。

举例:

现在有个项目 A 提供了点餐服务,项目 B 需要调用点餐服务完成下单。

这个时候常规的思路就是发送http请求,B调用A的接口地址,但是这种情况下,如果项目 B 需要调用更多第三方服务,每个服务和方法的调用都编写一个 HTTP 请求,那么会非常麻烦!

2.实现思路

首先我们有两个角色: 服务者和消费者

image-20240318110616032

消费者想要调用提供者,**就需要提供者启动一个 web 服务,**然后通过 请求客户端发送 HTTP 或者其他协议的请求来调用。

比如请求到提供者地址后,提供者会调用 orderService 的 order 方法:

image-20240318111157653

但是在这里思考一下,如果消费者需要调用多个接口,岂不是每一个接口都需要一段http调用的逻辑吗?

答案是: 可以提供一个统一的服务调用接口,通过 请求处理器 根据客户端的请求参数来进行不同的处理、调用不同的服务和方法。

也就是可以在服务提供者程序维护一个 本地服务注册器,记录服务和对应实现类的映射。

什么是本地服务注册器呢?

举个例子,消费者要调用 orderService 服务的 order 方法,可以发送请求,参数为 service=orderService,method=order,然后请求处理器会根据 service 从服务注册器中找到对应的服务实现类,并且通过 Java 的反射机制调用 method 指定的方法

流程:

image-20240318145649832

这里需要注意对对象进行序列化,才能在网络中传输

最后为了简化消费者请求的代码,实现类似本地调用的体验。可以基于代理模式,为消费者要调用的接口生成一个代理对象,由代理对象完成请求和响应的过程。

完整架构图:

虚线处就是RPC框架需要提供的模块和能力

image-20240318150156552

扩展设计:

服务的注册和发现

问题 1:消费者如何知道提供者的调用地址呢?

因此,我们需要一个 注册中心,来保存服务提供者的地址。消费者要调用服务时,只需从注册中心获取对应服务的提供者地址即可。

架构图如下:

image-20240318150510921

一般用现成的第三方注册中心,比如 Redis、Zookeeper 即可。

负载均衡

问题 2:如果有多个服务提供者,消费者应该调用哪个服务提供者呢?

我们可以给服务调用方增加负载均衡能力,通过指定不同的算法来决定调用哪一个服务提供者,比如轮询、随机、根据性能动态调用等。

架构图:

image-20240318150718320

容错机制

问题 3:如果服务调用失败,应该如何处理呢?

为了保证分布式系统的高可用,我们通常会给服务的调用增加一定的容错机制,比如失败重试、降级调用其他接口等等。

image-20240318150838321

其他

除了上面几个经典设计外,如果想要做一个优秀的 RPC 框架,还要考虑很多问题:

  • 服务提供者下线了怎么办?需要一个失效节点剔除机制。
  • 服务消费者每次都从注册中心拉取信息,性能会不会很差?可以使用缓存来优化性能。
  • 如何优化 RPC 框架的传输通讯性能?比如选择合适的网络框架、自定义协议头、节约传输体积等。
  • 如何让整个框架更利于扩展?比如使用 Java 的 SPI 机制、配置化等等。

3.简易版RPC框架实现

3.1 项目初始化

首先建立一个空项目,然后在空项目中新建模块(不是子模块),名称如下:

image-20240318151444788

分别介绍几个模块:

  • example-common:示例代码的公共依赖,包括接口、Model 等
  • example-consumer:示例服务消费者代码
  • example-provider:示例服务提供者代码
  • yu-rpc-easy:简易版 RPC 框架

在示例项目中,我们将以一个最简单的用户服务为例,演示整个服务调用过程。下面我们依次实现上述的几个模块。

3.1.1 公共模块

这里先设定有一个User类和UserService

public class User implements Serializable {

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

以及一个获取用户的方法:

public interface UserService {

    /**
     * 获取用户
     *
     * @param user
     * @return 
     */
    User getUser(User user);
}

最终的目录结构:

image-20240318154307845

3.1.2 服务提供者

首先引入依赖:

  <dependencies>
        <dependency>
            <groupId>com.siyi</groupId>
            <artifactId>ea-rpc-easy</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.siyi</groupId>
            <artifactId>example-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!-- https://doc.hutool.cn/ -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.16</version>
        </dependency>
        <!-- https://projectlombok.org/ -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

然后实现UserService,并编写服务类

/**
 * @author Eric
 */
public class UserServiceImpl implements UserService {
    @Override
    public User getUser(User user) {
        System.out.println("用户名:" + user.getName());
        return user;
    }
}
/**
 * @author Eric
 */
public class EasyProviderExample {
    //TODO 提供服务
}

最终的目录结构:

image-20240318154229866

3.1.3服务消费者

首先引入依赖,与provider一致:

  <dependencies>
        <dependency>
            <groupId>com.siyi</groupId>
            <artifactId>ea-rpc-easy</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.siyi</groupId>
            <artifactId>example-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <!-- https://doc.hutool.cn/ -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.16</version>
        </dependency>
        <!-- https://projectlombok.org/ -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

创建启动类EasyConsumerExample:

 //TODO 获取USerService的实现类对象
        UserService userService = null;
        User user = new User();
        user.setName("张三");
        //调用
        User newUser = userService.getUser(user);
        if (newUser != null) {
            System.out.println(newUser.getName());
        } else {
            System.out.println("user == null");
        }

现在是无法获取到 userService 实例的,所以预留为 null。我们之后的目标是,能够通过 RPC 框架,快速得到一个支持远程调用服务提供者的代理对象,像调用本地方法一样调用 UserService 的方法。

最终的目录结构:

image-20240318155611319

3.2 web服务器

为了让服务提供者提供可远程访问的服务。那么,就需要一个 web 服务器,能够接受处理请求、并返回响应。

web 服务器的选择有很多,比如 Spring Boot 内嵌的 Tomcat、NIO 框架 Netty 和 Vert.x 等等。

这里选用Vert.x

Vert.x 官方文档:https://vertx.io/

3.2.1 Vert的学习

首先进入官网,这里我们使用的是vert.core,进入对应的文档:

Vert.x Core | Eclipse Vert.x (vertx.io)

  1. 创建实例:

Vertx vertx = Vertx.vertx();

  1. 接收 HTTP 请求:
server.requestHandler(request -> {
// This handler will be called every time an HTTP request is received at the server
request.response().end("hello world!");
});
  1. 创建http请求

HttpServer server = vertx.createHttpServer();

  1. 配置http服务器

如果您不需要默认值,可以通过在创建服务器时传入实例来配置服务器:HttpServerOptions

HttpServerOptions options = new HttpServerOptions().setMaxWebSocketFrameSize(1000000);

HttpServer server = vertx.createHttpServer(options);
  1. 服务器监听选定主机端口
HttpServer server = vertx.createHttpServer();
server.listen();

如果希望在服务器实际侦听时收到通知,可以为调用提供处理程序。 例如:listen

HttpServer server = vertx.createHttpServer();
server
.listen(8080, "myhost.com")
.onComplete(res -> {
if (res.succeeded()) {
System.out.println("Server is now listening!");
} else {
System.out.println("Failed to bind!");
}
});

要在请求到达时收到通知,您需要设置:requestHandler

HttpServer server = vertx.createHttpServer();
server.requestHandler(request -> {
// Handle the request in here
});
  1. 处理请求
vertx.createHttpServer().requestHandler(request -> {
  request.response().end("Hello world");
}).listen(8080);
3.2.2 编写web服务器接口HttpServer

定义统一的启动服务方法,便于后续拓展,比如实现多种不同web服务器,这里的方法都可以参考官网

public class VertHttpServerExample implements HttpServer {
    @Override
    public void start(int port) {
        // 1. 创建实例:
        Vertx vertx = Vertx.vertx();
        // 2. 创建http请求
        io.vertx.core.http.HttpServer server = vertx.createHttpServer();
        // 3. 监听端口的通知执行任务
        server.requestHandler(request -> {
            System.out.println("receive request:"+ request.method()+"request url:"+request.uri());
        // 4. 发送响应
            request.response().putHeader("content-type","text/plain").end("hello from vert.x HTTP Server");
        });
        // 5. 启动http服务器监听端口
        server
                .listen(8080,res->
                 {
                    if (res.succeeded()) {
                        System.out.println("Server is now listening!");
                    } else {
                        System.out.println("Failed to bind!");
                    }
                });
    }
}
3.2.3 验证web服务器能否接受请求

对提供者:EasyProviderExample 的main方法重写调用服务

public class EasyProviderExample {

    public static void main(String[] args) {
        HttpServer httpServer = new VertHttpServer();
        httpServer.start(8080);
    }
}

浏览器访问Localhost:8080

image-20240318204638372

3.3 本地服务注册器

在RPC模块中创建本地服务注册器,但是因为这里是简易版,直接注册到本地

这里可以使用线程安全的ConCurrentHashMap存储服务注册信息,key为服务名称,value为服务实现类,之后根据服务名称找到服务后,根据反射调用

首先这里我们先思考: 写一个存储注册信息需要注意什么?

  1. 线程安全
  2. 以服务名称存储作为key,服务实现类作为value
  3. 需要用到三个方法: 1. 注册 2. 获取服务 3. 删除服务

这里是实现类具体代码

public class LocalRegistry {
    public static final Map<String,Class<?>>map = new ConcurrentHashMap<>();
    /**
     * 注册服务
    */
    public static void register(String key,Class<?>implClass){
        map.put(key,implClass);
    }
    /**
     * 获取服务
     */
    public static Class<?> get(String key){
        return map.get(key);
    }
    /**
     * 删除服务
     */
    public static void delete(String key){
        map.remove(key);
    }
}

这里注意:

本地服务注册器和注册中心不一样,注册中心偏向于管理注册的服务,提供服务信息给消费者,而本地注册器是根据服务名获取到对应的实现类,是完成调用不可缺少的模块

而服务的提供者启动时,需要将服务注册到服务注册器中,因此这里修改EasyProviderExample

public class EasyProviderExample {

    public static void main(String[] args) {
        LocalRegistry.register(UserService.class.getName(),UserServiceImpl.class);
        HttpServer httpServer = new VertHttpServer();
        httpServer.start(8080);
    }
}

3.4 序列化器

在服务本地注册后,我们就可以根据请求信息取出实现类并调用方法了,但是在这之前,我们需要先解决传输中的序列化问题,因为java的对象是存活在jvm中的,需要在其他地方访问,或者网络中传输,就一定需要序列化和反序列化

这里介绍两个概念:

  • 序列化:将 Java 对象转为可传输的字节数组。
  • 反序列化:将字节数组转换为 Java 对象。

有很多种不同的序列化方式,比如 Java 原生序列化、JSON、Hessian、Kryo、protobuf 等。

这里为了方便选择java原生的序列化器

序列化接口

这是固定代码,可以不用记忆:

public class JdkSerializer implements Serializer{

    /**
     * 序列化
     *
     * @param object
     * @param <T>
     * @return
     * @throws IOException
     */
    @Override
    public <T> byte[] serialize(T object) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        objectOutputStream.writeObject(object);
        objectOutputStream.close();
        return outputStream.toByteArray();
    }


    /**
     * 反序列化
     *
     * @param bytes
     * @param type
     * @param <T>
     * @return
     * @throws IOException
     */
    @Override
    public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
        try {
            return (T) objectInputStream.readObject();
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        } finally {
            objectInputStream.close();
        }
    }
}

3.5 提供者处理调用 - 请求处理器(核心)

请求处理器是 RPC 框架的实现关键,它的作用是:处理接收到的请求,并根据请求参数找到对应的服务和方法,通过反射实现调用,最后封装返回结果并响应请求。

3.5.1 编写RPC实体类

因此在RPC模块中编写请求响应封装类

可以参考这里假设这是一个RPC的调用方法:

orderId = orderService.order(参数1, 参数2, 参数3)

请求类 RpcRequest 的作用是封装调用所需的信息,比如:

  • 服务名称
  • 方法名称
  • 调用参数的类型列表
  • 参数列表。

这些都是 Java 反射机制所需的参数。

RpcRequest如下:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcRequest {
    //服务名称
    private String name;
    //方法名称
    private String methodName;
    //调用参数的类型列表
    private Class<?>parameterTypes;
//    参数列表
    private Object args;
}

RpcResponse如下:

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RpcResponse {
    //响应数据
    private Object data;
    // 响应数据类型
    private Class<?>dataType;
    // 响应信息
    private String message;
    // 异常信息
    private Exception exception;
}
3.5.2 编写请求处理器

在RPC中编写请求处理器,HttpServerHandler

image-20240318214201472

业务流程如下:

  1. 当消费者请求时,反序列化请求为对象,从对象中获取参数
  2. 根据服务名称从本地注册器中找到对应的实现类
  3. 通过反射调用方法
  4. 最后对结果进行封装并序列化,写入响应

完整的代码:

public class HttpServerHandler implements Handler<HttpServerRequest> {
    @Override
    public void handle(HttpServerRequest request) {
        // 1. 转换为对象,从对象中获得参数
        //指定序列化器
        final Serializer serializer = new JdkSerializer();
        //记录日志
        System.out.println("receive request:" + request.method() + " " + "request uri:" + request.uri());
        //异步处理请求
        request.bodyHandler(body -> {
            byte[] bytes = body.getBytes();
            RpcRequest rpcRequest = null;
            try {
                rpcRequest = serializer.deserialize(bytes, RpcRequest.class);
            } catch (IOException e) {
                e.printStackTrace();
            }
            //2. 构建响应对象
            RpcResponse rpcResponse = new RpcResponse();
            if (rpcRequest == null) {
                rpcResponse.setMessage("rpcRequest is null");
                doResponse(request, rpcResponse, serializer);
            }

            //3. 从注册器获取实现类,反射调用
            Class<?> implClass = LocalRegistry.get(rpcRequest.getName());
            try {
                Method methods = implClass.getMethod(rpcRequest.getMethodName());
                Object result = methods.invoke(implClass.newInstance(), rpcRequest.getArgs());
                //4. 返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(methods.getReturnType());
                rpcResponse.setMessage("ok");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setException(e);
                rpcResponse.setMessage(e.getMessage());

            }
            doResponse(request, rpcResponse, serializer);

        });

    }

    void doResponse(HttpServerRequest httpServerRequest, RpcResponse response, Serializer serializer) {
        HttpServerResponse httpServerResponse = httpServerRequest
                .response()
                .putHeader("content-type", "application/json");
        try {
            byte[] bytes = serializer.serialize(response);
            httpServerResponse.end(Buffer.buffer(bytes));
        } catch (IOException e) {
            e.printStackTrace();
            httpServerResponse.end(Buffer.buffer());
        }
    }
}

需要注意,不同的 web 服务器对应的请求处理器实现方式也不同,比如 Vert.x 中是通过实现 Handler 接口来自定义请求处理器的。并且可以通过 request.bodyHandler 异步处理请求。

3.5.3 给 HttpServer 绑定请求处理器

修改 VertxHttpServer 的代码,通过 server.requestHandler 绑定请求处理器。
这里

public class VertxHttpServer implements HttpServer {

    /**
     * 启动服务器
     *
     * @param port
     */
    @Override
    public void start(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 HTTP 服务器
        io.vertx.core.http.HttpServer server = vertx.createHttpServer();

        // 监听端口并处理请求
        server.requestHandler(new HttpServerHandler());

        // 启动 HTTP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                System.out.println("Server is now listening on port " + port);
            } else {
                System.err.println("Failed to start server: " + result.cause());
            }
        });
    }
}

至此,引入了 RPC 框架的服务提供者模块,已经能够接受请求并完成服务调用了。

3.6 消费者发起调用

首先在这里我们回顾一下遗留的代码:

  public static void main(String[] args) {
        //TODO 获取USerService的实现类对象
        UserService userService = null;
        User user = new User();
        user.setName("张三");
        //调用
        User newUser = userService.getUser(user);
        if (newUser != null) {
            System.out.println(newUser.getName());
        } else {
            System.out.println("user == null");
        }
    }

只要能够获取到 UserService 对象(实现类),就能跑通整个流程。
但 UserService 的实现类从哪来呢?难道从common中复制过来吗?那要RPC框架还有什么用呢?

其实分布式系统中,我们调用其他项目或团队提供的接口时,一般只关注请求参数和响应结果,而不关注具体实现。

我们可以通过生成代理对象来简化消费方的调用。
代理的实现方式大致分为 2 类:静态代理和动态代理,下面依次实现。

静态代理

静态代理是指为每一个特定类型的接口或对象,编写一个代理类。

比如在 example-consumer 模块中,创建一个静态代理 UserServiceProxy,实现 UserService 接口和 getUser 方法。

其实也就是使用hutool发起请求调用,但是这里注意HttpResponse实现了closable接口,这里使用try source

public class UserServiceProxy implements UserService {
    @Override
    public User getUser(User user) {
        //指定序列化器
        Serializer serializer = new JdkSerializer();
        //发起请求
        RpcRequest rpcRequest = RpcRequest.builder()
                .name(UserService.class.getName())
                .methodName("getUser")
                .args(new Object[]{user})
                .parameterTypes(new Class[]{User.class})
                .build();

        try {
            byte[] bytes = serializer.serialize(rpcRequest);
            byte[] result;
            try (HttpResponse httpResponse = HttpRequest.post("http:localhost:8080")
                    .body(bytes)
                    .execute()) {
                result = httpResponse.bodyBytes();
            }

            RpcResponse response = serializer.deserialize(result, RpcResponse.class);
            return (User) response.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

然后修改 EasyConsumerExample,new 一个代理对象并赋值给 userService,就能完成调用:

/**
 * 简易服务消费者示例
 */
public class EasyConsumerExample {

    public static void main(String[] args) {
        // 静态代理
        UserService userService = new UserServiceProxy();
        
        ...
    }
}

但是这样缺点很明显,就是每多一个类型则需要多谢一个代理类,因此我们使用动态代理

动态代理

这里首先来复习一下动态代理方法的参数:

  • Object proxy:代理对象本身。
  • Method method:被调用的方法,即代理对象上调用的方法。
  • Object[] args:被调用的方法的参数。

然后知道这些,其实就可以把代码复制过来了

package com.siyi.earpc.proxy;

import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.siyi.earpc.model.RpcRequest;
import com.siyi.earpc.model.RpcResponse;
import com.siyi.earpc.serializer.JdkSerializer;
import com.siyi.earpc.serializer.Serializer;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class ServiceProxy implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //指定序列化器
        Serializer serializer = new JdkSerializer();
        //发送请求
        RpcRequest rpcRequest = RpcRequest.builder()
                .name(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .args(args)
                .parameterTypes(method.getParameterTypes())
                .build();

        try {
            byte[] bytes = serializer.serialize(rpcRequest);
            byte[] result;
            // TODO 这里硬编码可以依靠注册中心硬编码解决
            try (HttpResponse httpResponse = HttpRequest.post("http:localhost:8080")
                    .body(bytes)
                    .execute()) {
                result = httpResponse.bodyBytes();
            }

            RpcResponse response = serializer.deserialize(result, RpcResponse.class);
            return  response.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

之后,在RPC中创建动态代理工厂 ServiceProxyFactory,作用是根据指定类创建动态代理对象,目录结构如图:

image-20240319105246123

这里,首先复习一下Proxy的参数:

  • ClassLoader loader:这是用于加载代理类的类加载器。通常,可以使用目标类的类加载器或者直接使用系统类加载器。
  • Class[] interfaces:这是一个接口数组,指定了代理对象需要实现的接口类型。这个参数告诉Proxy类,代理对象应该实现哪些接口,并且会根据这些接口的规范来处理方法调用。需要注意的是,动态代理只能代理接口,不能代理普通的类或者抽象类。
  • InvocationHandler handler:这是一个实现了InvocationHandler接口的实例。当代理对象的方法被调用时,会自动调用handlerinvoke方法。在这个方法中,你可以定义代理对象的行为,例如可以在调用实际方法之前或之后执行一些额外的操作。

这里是使用了 工厂设计模式,来简化对象的创建过程,代码如下:

public class ServiceProxyFactory {

    /**
     * 根据服务类获取代理对象
     *
     * @param serviceClass
     * @param <T>
     * @return
     */
    public static <T> T getProxy(Class<T> serviceClass) {
        return (T) Proxy.newProxyInstance(
                serviceClass.getClassLoader()
                , new Class[]{serviceClass}
                , new ServiceProxy());
    }
}

最后,在EasyConsumerExample 中,就可以通过调用工厂来为 UserService 获取动态代理对象了。

UserService userService = ServiceProxyFactory.getProxy(UserService.class);

4.测试验证

先debug调试提供者,之后调试消费者,可以看到UserService.getUser()实际调用的是代理类,进入代理类看到:

image-20240319113440117

全局配置加载

为什么需要配置加载?

在 RPC 框架运行的过程中,会涉及到很多的配置信息,比如注册中心的地址、序列化方式、网络服务器端口号等等。

  1. 在我们之前的流程当中,我们使用了硬编码
  2. RPC 框架是需要被其他项目作为服务提供者或者服务消费者引入的,我们应当允许引入框架的项目通过编写配置文件来 自定义配置
  3. 服务提供者和服务消费者需要编写相同的 RPC 配置。

常见的RPC配置项有:

  • 注册中心地址:服务提供者和服务消费者都需要指定注册中心的地址,以便进行服务的注册和发现。
  • 服务接口:服务提供者需要指定提供的服务接口,而服务消费者需要指定要调用的服务接口。
  • 序列化方式:服务提供者和服务消费者都需要指定序列化方式,以便在网络中传输数据时进行序列化和反序列化。
  • 网络通信协议:服务提供者和服务消费者都需要选择合适的网络通信协议,比如 TCP、HTTP 等。
  • 超时设置:服务提供者和服务消费者都需要设置超时时间,以便在调用服务时进行超时处理。
  • 负载均衡策略:服务消费者需要指定负载均衡策略,以决定调用哪个服务提供者实例。
  • 服务端线程模型:服务提供者需要指定服务端线程模型,以决定如何处理客户端请求。

可以参考 Dubbo:https://cn.dubbo.apache.org/zh-cn/overview/mannual/java-sdk/reference-manual/config/api/

1.读取配置项

一般情况我们可以使用 Java 的 Properties 类自行编写,但是使用一些第三方工具库,比如 Hutool 的 Setting 模块,可以直接读取指定名称的配置文件中的部分配置信息,并且转换成 Java 对象,非常方便。

参考官方文档:https://doc.hutool.cn/pages/Props

这里说明一下:在Spring中,一般我们读取的配置文件名称为 application.properties,还可以通过指定文件名称后缀的方式来区分多环境,比如 application-prod.properties 表示生产环境、 application-test.properties 表示测试环境。

2.项目初始化

这里新建一个ea-rpc-core模块,可以将ea-rpc-easy复制到这里并修改模块名称从磁盘重新加载,在此基础上修改:

  1. 引入依赖:
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.4.14</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>RELEASE</version>
    <scope>test</scope>
</dependency>
  1. 将消费者和生产者pom中的ea-rpc-easy切换为ea-rpc-core
 <groupId>com.siyi</groupId>
    <artifactId>ea-rpc-core</artifactId>
    <version>1.0-SNAPSHOT</version>

3.在 config 包下新建配置类 RpcConfig,用于保存配置信息。

这里可以根据需求填写默认值

package com.siyi.earpc.config;

import lombok.Data;


/**
 * RPC 框架配置
 * @author Eric
 */
@Data
public class RpcConfig {

    /**
     * 名称
     */
    private String name = "ea-rpc";

    /**
     * 版本号
     */
    private String version = "1.0";

    /**
     * 服务器主机名
     */
    private String serverHost = "localhost";

    /**
     * 服务器端口号
     */
    private Integer serverPort = 8080;

}


  1. utils下创建ConfigUtils,用于读取配置文件并返回配置对象,可以简化调用。
package com.siyi.earpc.utils;

import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;

/**
 * @author Eric
 */
public class ConfigUtils {
    public static <T> T loadConfig(Class<T> tClass, String prefix) {
        return loadConfig(tClass, prefix, "");
    }

    public static <T> T loadConfig(Class<T> tClass, String prefix, String environment) {
        StringBuilder configBuilder = new StringBuilder("application");
        if (StrUtil.isNotBlank(environment)) {
            configBuilder.append("_").append(environment);
        }
        configBuilder.append(".properties");
        Props props = new Props(configBuilder.toString());
        return props.toBean(tClass, prefix);
    }
}

这里注意,可以根据需要拓展为支持yml配置,这里只实现了properties

  1. 在 constant 包中新建 RpcConstant 接口,用于存储 RPC 框架相关的常量。
package com.siyi.earpc.constant;


/**
 * @author Administrator
 */
public interface RpcConstant {
    /**
     * 默认配置文件加载前缀
     */
    String DEFAULT_CONFIG_PREFIX = "rpc";

}

3.全局配置对象

这里我们需要想:

RPC 框架中需要维护一个全局的配置对象。在引入 RPC 框架的项目启动时,从配置文件中读取配置并创建对象实例,之后就可以集中地从这个对象中获取配置信息,而不用每次加载配置时再重新读取配置、并创建新的对象,减少了性能开销。

没错,我们可以使用单例模式,并且换一个更优雅的命名,使用 RpcApplication 类作为 RPC 项目的启动入口,并且维护项目全局用到的变量。

在这里的代码中我们使用DDL检查锁,以及懒加载

package com.siyi.earpc;

import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.constant.RpcConstant;
import com.siyi.earpc.utils.ConfigUtils;
import lombok.extern.slf4j.Slf4j;

/**
 * RPC 框架应用
 * 初始化全局变量,使用DDL的实现单例模式
 */
@Slf4j
public class RpcApplication {
    private static volatile RpcConfig rpcConfig;

    /**
     * 框架初始化,支持传入自定义配置
     *
     * @param newRpcConfig
     */
    public static void init(RpcConfig newRpcConfig) {
        rpcConfig = newRpcConfig;
        log.info("rpc init, config = {}", newRpcConfig.toString());
    }

    /**
     * 初始化
     */
    public static void init() {
        RpcConfig newRpcConfig;
        try {
            newRpcConfig = ConfigUtils.loadConfig(RpcConfig.class, RpcConstant.DEFAULT_CONFIG_PREFIX);
        } catch (Exception e) {
            //配置加载失败,使用默认值
            newRpcConfig = new RpcConfig();
        }
        init(newRpcConfig);


    }

    /**
     * 获取配置
     *
     * @return rpcConfig
     */
    public static RpcConfig getRpcConfig() {
        if (rpcConfig == null) {
            synchronized (RpcApplication.class) {
                if (rpcConfig == null) {
                    init();
                }
            }
        }
        return rpcConfig;
    }
}

到这,就可以去测试了

4.测试

分别在消费者和生产者创建示例:

public class RpcConsumerExample {
    public static void main(String[] args) {
        RpcConfig rpc = ConfigUtils.loadConfig(RpcConfig.class, RpcConstant.DEFAULT_CONFIG_PREFIX);
        System.out.println(rpc);

    }
}
public class RpcProviderExample {

    public static void main(String[] args) {
        RpcApplication.init();
        LocalRegistry.register(UserService.class.getName(),UserServiceImpl.class);
        HttpServer httpServer = new VertxHttpServer();
        httpServer.start(RpcApplication.getRpcConfig().getServerPort());
    }
}

然后在消费者的properties中写配置:

rpc.name=ea-rpc
rpc.version=2.0
rpc.serverPort=8080

image-20240328105930525

分别启动生产者和消费者得到输出:

RpcConfig(name=ea-rpc, version=2.0, serverHost=localhost, serverPort=8080)

接口Mock

1.介绍

什么是Mock?

mock 是指模拟对象,通常用于测试代码中,特别是在单元测试中,便于我们跑通业务流程。

举个例子,用户服务要调用订单服务,伪代码如下:

 
class UserServiceImpl {

    void test() {
        doSomething();
        orderService.order();
        doSomething();
    }
}

如果订单服务还没上线,那么这个流程就跑不通,只能先把调用订单服务的代码注释掉。

但如果给 orderService 设置一个模拟对象,调用它的 order 方法时,随便返回一个值,就能继续执行后续代码,这就是 mock 的作用。

2.为什么要支持Mock

mock 服务并不是 RPC 框架的核心能力,但是它的开发成本并不高。而且给 RPC 框架支持 mock 后,开发者就可以轻松调用服务接口、跑通业务流程,不必依赖真实的远程服务

我们希望能够用最简单的方式 —— 比如一个配置,就让开发者使用 mock 服务。

3.实现

3.1 设计方案

前面也提到了,mock 的本质就是为要调用的服务创建模拟对象。

可是,如何创建模拟对象呢?

答案是:通过动态代理创建一个 调用方法时返回固定值 的对象

3.2 开发实现

  1. 配置Mock关闭开启的配置

我们可以支持开发者通过修改配置文件的方式开启 mock,那么首先给全局配置类 RpcConfig 新增 mock 字段,默认值为 false。

@Data
public class RpcConfig {
    ...
    
    /**
     * 模拟调用
     */
    private boolean mock = false;
}
  1. 代理类

在 Proxy 包下新增 MockServiceProxy 类,用于生成 mock 代理服务。

在这个类中,需要提供一个根据服务接口类型返回固定值的方法。

package com.siyi.earpc.proxy;

import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * @author Eric
 */
@Slf4j
public class MockServiceProxy implements InvocationHandler {
    /**
     * 调用代理
     *
     * @return
     * @throws Throwable
     */
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> returnType = method.getReturnType();
        log.info("mock invoke {}", method.getName());
        return getDefaultObject(returnType);
    }

    /**
     * 生成指定类型的默认值对象(可自行完善默认值逻辑)
     *
     * @param type
     * @return
     */
    private Object getDefaultObject(Class<?> type) {
        if (type.isPrimitive()) {
            if (type == boolean.class) {
                return false;
            } else if (type == short.class) {
                return (short) 0;
            } else if (type == int.class) {
                return 0;
            } else if (type == long.class) {
                return 0L;
            }
        }
        return null;
    }
}

getDefaultObject 方法,根据代理接口的 class 返回不同的默认值,比如针对 boolean 类型返回 false、对象类型返回 null 等。

  1. 服务代理工厂

ServiceProxyFactory 服务代理工厂新增获取 mock 代理对象的方法 getMockProxy。可以通过读取已定义的全局配置 mock 来区分创建哪种代理对象。

package com.siyi.earpc.proxy;

import com.siyi.earpc.RpcApplication;

import java.lang.reflect.Proxy;

public class ServiceProxyFactory {

    /**
     * 根据服务类获取代理对象
     *
     * @param serviceClass
     * @param <T>
     * @return
     */
    public static <T> T getProxy(Class<T> serviceClass) {
        if (RpcApplication.getRpcConfig().isMock()){
            return getMockProxy(serviceClass);
        }
        return (T) Proxy.newProxyInstance(
                serviceClass.getClassLoader()
                , new Class[]{serviceClass}
                , new ServiceProxy());
    }

    public static <T> T getMockProxy(Class<T> serviceClass) {
        return (T) Proxy.newProxyInstance(
                serviceClass.getClassLoader()
                ,new Class[]{serviceClass}
                ,new MockServiceProxy()
        );
    }
}

4.测试

可以在 example-common 模块的 UserService 中写个具有默认实现的新方法。等下需要调用该方法来测试 mock 代理服务是否生效,即查看调用的是模拟服务还是真实服务。

这里我们使用RpcProviderExample和EasyConsumerExample进行测试

EasyConsumerExample

加入:

long number = userService.getNumber();
System.out.println(number);

package com.siyi.example.consumer;

import com.siyi.earpc.proxy.ServiceProxyFactory;
import com.siyi.example.common.model.User;
import com.siyi.example.common.service.UserService;

public class EasyConsumerExample {
    public static void main(String[] args) {
        //静态代理
//        UserService userService = new UserServiceProxy();
        // 获取USerService的实现类对象
            UserService userService = ServiceProxyFactory.getProxy(UserService.class);
        User user = new User();
        user.setName("张三");
        //调用
        User newUser = userService.getUser(user);
        if (newUser != null) {
            System.out.println(newUser.getName());
        } else {
            System.out.println("user == null");
        }
        long number = userService.getNumber();
        System.out.println(number);
    }

}

RpcProviderExample

这里不变

UserService

这里方便测试可以随便写一个默认实现:

public interface UserService {

    /**
     * 获取用户
     *
     * @param user
     * @return
     */
    User getUser(User user);

    /**
     * 新方法 - 获取数字
     */
    default short getNumber() {
        return 1;
    }
}

properties

将properties中mock改为true,也就是开启

--其他代码--
rpc.mock=true

运行得到结果:

image-20240328164048487

5.总结

这里对流程可能有点模糊,所以我们总结一下:

  • 在动态代理中,我们发起请求去获取响应,因此,我们如果不需要调用的时候,可以在创建代理对象前,进行判断,如果为true直接不创建代理,直接使用默认值,这步是最核心的一部。

  • 另一个注意的点是,RpcApplication的懒加载,这里的代码可以再看看

序列化器与 SPI 机制

对于序列化器,我们自定义使用了JDK的序列化器,但是问题:

  1. 有没有更好的序列化器实现方式?
  2. 如何让使用框架的开发者指定使用的序列化器?
  3. 如何让使用框架的开发者自己定制序列化器?

首先明确,这里说的更好,指的是为了更快完成RPC的请求响应:更高的性能、或者更小的序列化结果

1.了解常见的序列化器

  • JSON
  • Hessian
  • Kryo
  • protobuf
序列化方式优点缺点
JSON- 易读性好,可读性强,便于人类理解和调试。
- 跨语言支持广泛,几乎所有编程语言都有 JSON 的解析和生成库。
- 序列化后的数据量相对较大。
- 不能很好地处理复杂的数据结构和循环引用,可能导致性能下降或者序列化失败。
- 性能上不是最优的,尤其在大量数据处理时
Hessian- 二进制序列化,序列化后的数据量较小,网络传输效率高。
- 支持跨语言,适用于分布式系统中的服务调用。
- 性能较 JSON 略低。
- 对象必须实现 Serializable 接口,限制了可序列化的对象范围。
- 使用相对较少,社区和文档可能不如其他序列化方式丰富。
Kryo- 高性能,序列化和反序列化速度快
- 支持循环引用和自定义序列化器,适用于复杂的对象结构。
- 无需实现 Serializable 接口,可以序列化任意对象。
- 不跨语言,只适用于 Java
- 对象的序列化格式不够友好,不易读懂和调试。
- 使用需要更多的手动配置,比如注册类。
Protobuf- 高效的二进制序列化,序列化后的数据量极小
- 跨语言支持,并且提供了多种语言的实现库。
- 支持版本化和向前 / 向后兼容性。
- 配置相对复杂,需要先定义数据结构的消息格式。
- 对象的序列化格式不易读懂,不便于调试。
- 学习曲线相对陡峭,对于简单项目可能过于复杂。

2.动态使用序列化器

场景:

Serializer serializer = new JdkSerializer();

问题:想要替换为别的序列化器,就必须修改所有的上述代码,太麻烦

解决:

定义一个key= 序列化器名称, value=序列化器实现类对象,的Map,根据名称从Map中获取即可

3.自定义序列化器设计

如何自己定义一个新的序列化器实现,但修改我们写好的框架代码?

只要我们的 RPC 框架能够读取到用户自定义的类路径,然后加载这个类,作为 Serializer 序列化器接口的实现即可。

这里需要实现需要借助SPI机制

3.1 SPI机制介绍

SPI(Service Provider Interface)服务提供接口是 Java 的机制,主要用于实现模块化开发和插件化扩展。

常见SPI的场景:

Servlet 容器、日志框架、ORM 框架、Spring 框架。这是 Java 开发者必须掌握的一个重要特性!

SPI的实现分为系统实现和自定义实现

3.2 系统实现

Java 内已经提供了 SPI 机制相关的 API 接口,可以直接使用,这种方式最简单。

  1. 首先在 resources 资源目录下创建 META-INF/services 目录,并且创建一个名称为要实现的接口的空文件。

  2. 在文件中填写自己定制的接口实现类的 完整类路径:

  3. 使用系统内置的 ServiceLoader 动态加载指定接口的实现类

// 指定序列化器
Serializer serializer = null;
ServiceLoader<Serializer> serviceLoader = ServiceLoader.load(Serializer.class);
for (Serializer service : serviceLoader) {
    serializer = service;
}

这种实现能够获取到所有文件中编写的实现类对象,选择一个使用即可

需要注意:

如果我们想定制多个不同的接口实现类,就没办法在框架中指定使用哪一个了,也就无法实现我们 “通过配置快速指定序列化器” 的需求。

3.3 自定义SPI的实现

自己定义 SPI 机制的实现,只要能够根据配置加载到类即可。

例如这里读取配置文件,可以得到K-V的映射,之后就可以根据用户配置的序列化器名称动态加载指定实现类对象

jdk=com.siyi.earpc.serializer.JdkSerializer
hessian=com.siyi.earpc.serializer.HessianSerializer
json=com.siyi.earpc.serializer.JsonSerializer
kryo=com.siyi.earpc.serializer.KryoSerializer

4.开发实现

4.1 序列化器

分别实现 JSON、Kryo 和 Hessian 这三种主流的序列化器,引入依赖:

<!-- 序列化 -->
<!-- https://mvnrepository.com/artifact/com.caucho/hessian -->
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.66</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.esotericsoftware/kryo -->
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>5.6.0</version>
</dependency>

之后在在序列化器包 serializer 中分别实现这三种序列化器:

JSON

>JSON 序列化器的实现相对复杂,要考虑一些对象转换的兼容性问题,比如 Object 数组在序列化后会丢失类型。

package com.siyi.earpc.serializer;


import com.fasterxml.jackson.databind.ObjectMapper;
import com.siyi.earpc.model.RpcRequest;
import com.siyi.earpc.model.RpcResponse;
import java.io.IOException;
/**
 * @author Eric
*/
public class JsonSerializer implements Serializer {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Override
    public <T> byte[] serialize(T obj) throws IOException {
        return OBJECT_MAPPER.writeValueAsBytes(obj);
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> classType) throws IOException {
        T obj = OBJECT_MAPPER.readValue(bytes, classType);
        if (obj instanceof RpcRequest) {
            return handleRequest((RpcRequest) obj, classType);
        }
        if (obj instanceof RpcResponse) {
            return handleResponse((RpcResponse) obj, classType);
        }
        return obj;
    }

    /**
     * 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
     *
     * @param rpcRequest rpc 请求
     * @param type       类型
     * @return {@link T}
     * @throws IOException IO异常
     */
    private <T> T handleRequest(RpcRequest rpcRequest, Class<T> type) throws IOException {
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Object[] args = rpcRequest.getArgs();

        // 循环处理每个参数的类型
        for (int i = 0; i < parameterTypes.length; i++) {
            Class<?> clazz = parameterTypes[i];
            // 如果类型不同,则重新处理一下类型
            if (!clazz.isAssignableFrom(args[i].getClass())) {
                byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
                args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
            }
        }
        return type.cast(rpcRequest);
    }

    /**
     * 由于 Object 的原始对象会被擦除,导致反序列化时会被作为 LinkedHashMap 无法转换成原始对象,因此这里做了特殊处理
     *
     * @param rpcResponse rpc 响应
     * @param type        类型
     * @return {@link T}
     * @throws IOException IO异常
     */
    private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
        // 处理响应数据
        byte[] dataBytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
        rpcResponse.setData(OBJECT_MAPPER.readValue(dataBytes, rpcResponse.getDataType()));
        return type.cast(rpcResponse);
    }
}

导入依赖:

 <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.1</version> <!-- 请根据你的需求选择合适的版本 -->
        </dependency>

Kryo

Kryo 本身是线程不安全的,所以需要使用 ThreadLocal 保证每个线程有一个单独的 Kryo 对象实例。

package com.siyi.earpc.serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

/**
 * @author Eric
 */
public class KryoSerializer implements Serializer {
    /**
     * kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
     */
    private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        // 设置动态动态序列化和反序列化类,不提前注册所有类(可能有安全问题)
        kryo.setRegistrationRequired(false);
        return kryo;
    });

    @Override
    public <T> byte[] serialize(T obj) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        KRYO_THREAD_LOCAL.get().writeObject(output, obj);
        output.close();
        return byteArrayOutputStream.toByteArray();
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> classType) {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        Input input = new Input(byteArrayInputStream);
        T result = KRYO_THREAD_LOCAL.get().readObject(input, classType);
        input.close();
        return result;
    }
}

Hessian

package com.siyi.earpc.serializer;

import com.caucho.hessian.io.HessianInput;
import com.caucho.hessian.io.HessianOutput;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;


/**
 * @author Eric
 */
public class HessianSerializer implements Serializer {
    @Override
    public <T> byte[] serialize(T object) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        HessianOutput ho = new HessianOutput(bos);
        ho.writeObject(object);
        return bos.toByteArray();
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> tClass) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
        HessianInput hi = new HessianInput(bis);
        return (T) hi.readObject(tClass);
    }
}

4.2 动态使用序列化器

4.2.1定义序列化器Key
public interface SerializerKeys {

    String JDK = "jdk";
    String JSON = "json";
    String KRYO = "kryo";
    String HESSIAN = "hessian";

}
4.2.2定义序列化工厂

序列话对象可以服用,因此我们可以使用设计模式中的 工厂模式 + 单例模式 来简化创建和获取序列化器对象的操作。

package com.siyi.earpc.serializer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author Eric
 */
public class SerializerFactory {

    /**
     * 序列化映射(用于实现单例)
     */
    private static final Map<String, Serializer> KEY_SERIALIZER_MAP = new HashMap<String, Serializer>() {{
        put(SerializerKeys.JDK, new JdkSerializer());
        put(SerializerKeys.JSON, new JsonSerializer());
        put(SerializerKeys.KRYO, new KryoSerializer());
        put(SerializerKeys.HESSIAN, new HessianSerializer());
    }};

    /**
     * 默认序列化器
     */
    private static final Serializer DEFAULT_SERIALIZER = KEY_SERIALIZER_MAP.get("jdk");

    /**
     * 获取实例
     *
     * @param key
     * @return
     */
    public static Serializer getInstance(String key) {
        return KEY_SERIALIZER_MAP.getOrDefault(key, DEFAULT_SERIALIZER);
    }

}

4.2.3应用

全局配置类 RpcConfig 中补充序列化器的配置,代码如下

public class RpcConfig {
    ...
    
    /**
     * 序列化器
     */
    private String serializer = SerializerKeys.JDK;
}

之后将:

  • ServiceProxy
  • HttpServerHandler

两个类中的序列化器改为动态使用即可

修改为:

 //指定序列化器
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());

最终:

image-20240328192240107

4.3 自定义序列化器

4.3.1指定SPI配置目录和文件

这里我们知道:

系统内置的 SPI 机制会加载 resources 资源目录下的 META-INF/services 目录,那我们自定义的序列化器可以如法炮制,改为读取 META-INF/rpc 目录

  • 用户自定义 SPI:META-INF/rpc/custom。用户可以在该目录下新建配置,加载自定义的实现类。
  • 系统内置 SPI:META-INF/rpc/system。RPC 框架自带的实现类,比如之前开发好的 JdkSerializer

在此,所有实现类都可以通过SPI机制动态加载,不需要硬编码使用Map维护实现类了

配置文件:

jdk=com.siyi.earpc.serializer.JdkSerializer
hessian=com.siyi.earpc.serializer.HessianSerializer
json=com.siyi.earpc.serializer.JsonSerializer
kryo=com.siyi.earpc.serializer.KryoSerializer

4.3.2 编写 SpiLoader 加载器

实现关键:

  1. 用 Map 来存储已加载的配置信息K-V
  2. 扫描指定路径,读取每个配置文件,获取到K-V
  3. 定义获取实例方法,根据用户传入的接口和键名,从 Map 中找到对应的实现类,然后通过反射获取到实现类对象。可以维护一个对象实例缓存,创建过一次的对象从缓存中读取即可。

完整代码:

package com.siyi.earpc.spi;

import cn.hutool.core.io.resource.ResourceUtil;
import com.siyi.earpc.serializer.Serializer;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class SpiLoader {
    //存储以及加载的类
    private static Map<String, Map<String, Class<?>>> loaderMap = new ConcurrentHashMap<>();
    //对象实例缓存
    private static Map<String, Object> instanceCache = new ConcurrentHashMap<>();
    //系统SPI目录
    public static final String RPC_SYSTEM_SPI_DIR = "META-INF/rpc/system";
    //用户自定义SPI目录
    public static final String RPC_CUSTOM_SPI_DIR = "META-INF/rpc/custom";
    //扫描路径
    private static final String[] SCAN_DIRS = new String[]{RPC_SYSTEM_SPI_DIR, RPC_CUSTOM_SPI_DIR};
    //动态加载类列表
    private static final List<Class<?>> LOAD_CLASS_LIST = Arrays.asList(Serializer.class);

    //加载所有类型
    public static void loadAll() {
        log.info("加载所有SPI");
        for (Class<?> aClass : LOAD_CLASS_LIST) {

        }
    }

    //加载某个类型
    public static Map<String, Class<?>> load(Class<?> loadClass) {
        log.info("加载类型为 {} 的 SPI", loadClass.getName());
        // 扫描路径,用户自定义的 SPI 优先级高于系统 SPI
        Map<String, Class<?>> keyClassMap = new HashMap<>();
        for (String scanDir : SCAN_DIRS) {
            List<URL> resources = ResourceUtil.getResources(scanDir + loadClass.getName());
            for (URL resource : resources) {
                try {
                    InputStreamReader inputStreamReader = new InputStreamReader(resource.openStream());
                    BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                    String line;
                    while ((line = bufferedReader.readLine()) != null) {
                        String[] strArray = line.split("=");
                        if (strArray.length > 1) {
                            String key = strArray[0];
                            String className = strArray[1];
                            keyClassMap.put(key, Class.forName(className));
                        }
                    }
                } catch (Exception e) {
                    log.error("spi resource load error", e);
                }
            }
        }
        loaderMap.put(loadClass.getName(), keyClassMap);
        return keyClassMap;
    }

}

配置文件是使用了 ResourceUtil.getResources,而不是通过文件路径获取。因为如果框架作为依赖被引入,是无法得到正确文件路径的,这段代码可以多看看,

4.3.3 重构序列化器工厂

完整代码:

package com.siyi.earpc.serializer;

import com.siyi.earpc.spi.SpiLoader;

/**
 * @author Eric
 */
public class SerializerFactory {
    static {
        SpiLoader.load(Serializer.class);
    }
    /**
     * 默认序列化器
     */
    private static final Serializer DEFAULT_SERIALIZER = new JdkSerializer();
    /**
     * 获取实例
     *
     * @param key
     * @return
     */
    public static Serializer getInstance(String key) {
        return SpiLoader.getInstance(Serializer.class, key);
    }
}

5.测试

这里注意:

如果使用RpcProviderExample测试,需要给resource中也配上配置,这里踩个坑

  1. 修改消费者和生产者示例项目中的配置文件,指定不同的序列化器,比如 hessian

  2. 依次启动生产者和消费者,验证能否正常完成 RPC 请求和响应。

之后测试custom

  1. 写一个类实现 Serializer 接口
  2. 在 custom 目录下编写 SPI 配置文件,加载自己写的实现类

image-20240330173740207

注册中心基本实现

首先回忆一下流程:

image-20240330233837277

1.设计方案:

这里先思考一下注册中心需要什么?

  1. 数据分布式存储:集中的注册信息数据存储、读取和共享
  2. 服务注册:服务提供者上报服务信息到注册中心
  3. 服务发现:服务消费者从注册中心拉取服务信息
  4. 心跳检测:定期检查服务提供者的存活状态
  5. 服务注销:手动剔除节点、或者自动剔除失效节点
  6. 更多优化点:比如注册中心本身的容错、服务消费者缓存等。

技术选型和需求分析:

  1. 需要一个能够集中存储和读取数据的中间件,还需要有数据过期、数据监听的能力,便于我们移除失效节点、更新节点列表等。
  2. 要考虑它的性能、高可用性、高可靠性、稳定性、数据一致性、社区的生态和活跃度**(以及可用性和可靠性)**
  3. 主流的注册中心实现中间件有 ZooKeeper、Redis 等,这里使用Etcd

2.Etcd 入门

GitHub:https://github.com/etcd-io/etcd

Etcd 是一个 Go 语言实现的、开源的、分布式 的键值存储系统,它主要用于分布式系统中的服务发现、配置管理和分布式锁等场景。

特点:

  • 简单:定义明确、面向用户的 API (gRPC)
  • 安全:自动 TLS,具有可选的客户端证书身份验证
  • 快速:基准 10,000 次写入/秒
  • 可靠:使用 Raft 正确分发

Etcd和云原生有着密切的关系,通常被作为云原生应用的基础设施,经典的容器管理平台 k8s 就使用了 Etcd 来存储集群配置信息、状态信息、节点信息等。

image-20240330234339099

2.1 Etcd的数据结构

  • Key(键):Etcd 中的基本数据单元,类似于文件系统中的文件名。每个键都唯一标识一个值,并且可以包含子键,形成类似于路径的层次结构。
  • Value(值):与键关联的数据,可以是任意类型的数据,通常是字符串形式。

有两个常用的特性:

  1. Lease(租约):用于对键值对进行 TTL 超时设置,即设置键值对的过期时间。当租约过期时,相关的键值对将被自动删除。
  2. Watch(监听):可以监视特定键的变化,当键的值发生变化时,会触发相应的通知。

2.2 Raft算法

一个 Raft 集群包括若干服务器,以典型的 5 服务器集群举例。在任意的时间,每个服务器一定会处于以下三个状态中的一个:

  • Leader:负责发起心跳,响应客户端,创建日志,同步日志。
  • Candidate:Leader 选举过程中的临时角色,由 Follower 转化而来,发起投票参与竞选。
  • Follower:接受 Leader 的心跳和日志同步数据,投票给 Candidate。

在正常的情况下,只有一个服务器是 Leader,剩下的服务器是 Follower。Follower 是被动的,它们不会发送任何请求,只是响应来自 Leader 和 Candidate 的请求。

这里来说一下日志复制:

当客户端发送写请求时,领导者首先将写操作写入自己的日志中,并将写操作的日志条目分发给其他节点,其他节点收到日志后也将其写入自己的日志中。一旦 大多数节点(即半数以上的节点)都将该日志条目成功写入到自己的日志中,该日志条目就被视为已提交,领导者会向客户端发送成功响应。

raft 保证以下两个性质:

  • 在两个日志里,有两个 entry 拥有相同的 index 和 term,那么它们一定有相同的 cmd
  • 在两个日志里,有两个 entry 拥有相同的 index 和 term,那么它们前面的 entry 也一定相同

通过“仅有 Leader 可以生成 entry”来保证第一个性质,第二个性质需要一致性检查来进行保证。

为了使得 Follower 的日志和自己的日志一致,Leader 需要找到 Follower 与它日志一致的地方,然后删除 Follower 在该位置之后的日志,接着把这之后的日志发送给 Follower。

总结就是:

如果领导者节点宕机或失去联系,Raft 算法会在其他节点中 选举出新的领导者,从而保证系统的可用性和一致性。新的领导者会继续接收客户端的写请求,并负责将写操作复制到其他节点上,从而保持数据的一致性。

如果不太理解理论可以先试试这个playground:

http://play.etcd.io/play

当我们停止一个节点的时候可以发现,新的Leader是等了一小段间隔才选出了新的Leader

image-20240331095123342

这段间隔其实不是固定时间的,这个可以看Raft算法的任期:

image-20240331095215814

这里的蓝色就是选举,也就是选择新的Leader,t3就是一轮选举没有出现新的Leader,再次进行一轮选举

下面再看一种情况:

image-20240331095424135

当我们这种时候,停止node1,可以发现并不会出现新的Leader,因为剩下两个节点投票都是1,谁都不服谁,

这种情况我们称为**脑裂**

image-20240331095608984

之后再次重启节点,就会出现新的Leader

接下来介绍一下存储和读取,可以发现支持前缀读取:

image-20240331100045108

3.3 Etcd的安装

  1. GitHub上下载压缩包: Releases · etcd-io/etcd (github.com)
  2. 解压在本地电脑,目录结构如图,这里的etcd是服务端,etcdctl是客户端,etcdutl:备份恢复工具

image-20240331101701265

  1. 在这个目录下,打开cmd运行:etcdctl --version,帮助,命令等
  2. 参照官网,存储第一个K-V:
etcdctl put greeting "Hello, etcd"
etcdctl get greeting
#输出:
greeting
Hello, etcd

运行脚本后,服务默认占用 2379 和 2380 端口,作用分别如下:

  • 2379:提供 HTTP API 服务,和 etcdctl 交互
  • 2380:集群中节点间通讯

3.4 Etcd的可视化工具

类似Redis Manager,这里也有两种可视化工具:

更推荐 etcdkeeper,安装成本更低,学习使用更方便。这里的安装很简单,去官网把压缩包安装后,按照官网操作即可

压缩完成后打开:etcdkeeper.exe

之后进入地址:http://127.0.0.1:8080/etcdkeeper

然后我们重新存储一个值,进入地址观看结果,成功!

3.5 Etcd 的Java客户端

etcd 主流的 Java 客户端是 jetcd:https://github.com/etcd-io/jetcd。

注意,Java 版本必须大于 11!

maven,这里需要加入版本,这里使用的是0.7.7

<dependency>
  <groupId>io.etcd</groupId>
  <artifactId>jetcd-core</artifactId>
  <version>${jetcd-version}</version>
</dependency>

用法(这里官网有Demo),所以这里直接演示Java中完整代码

package com.siyi.earpc.register;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author Eric
 */
public class EtcdRegister {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Client client = Client.builder().endpoints("http://localhost:2379")
                .build();
        KV kvClient = client.getKVClient();
        ByteSequence key = ByteSequence.from("test_key".getBytes());
        ByteSequence value = ByteSequence.from("test_value".getBytes());

// put the key-value
        kvClient.put(key, value).get();

// get the CompletableFuture
        CompletableFuture<GetResponse> getFuture = kvClient.get(key);

// get the value from CompletableFuture
        GetResponse response = getFuture.get();

// delete the key
        kvClient.delete(key).get();
    }
}

在上述代码中,我们使用 KVClient 来操作 etcd 写入和读取数据。除了 KVClient 客户端外,Etcd 还提供了很多其他客户端,这里可以只做了解(大多数情况了解前三个就够了

  1. kvClient:用于对 etcd 中的键值对进行操作。通过 kvClient 可以进行设置值、获取值、删除值、列出目录等操作。
  2. leaseClient:用于管理 etcd 的租约机制。租约是 etcd 中的一种时间片,用于为键值对分配生存时间,并在租约到期时自动删除相关的键值对。通过 leaseClient 可以创建、获取、续约和撤销租约。
  3. watchClient:用于监视 etcd 中键的变化,并在键的值发生变化时接收通知。
  4. clusterClient:用于与 etcd 集群进行交互,包括添加、移除、列出成员、设置选举、获取集群的健康状态、获取成员列表信息等操作。
  5. authClient:用于管理 etcd 的身份验证和授权。通过 authClient 可以添加、删除、列出用户、角色等身份信息,以及授予或撤销用户或角色的权限。
  6. maintenanceClient:用于执行 etcd 的维护操作,如健康检查、数据库备份、成员维护、数据库快照、数据库压缩等。
  7. lockClient:用于实现分布式锁功能,通过 lockClient 可以在 etcd 上创建、获取、释放锁,能够轻松实现并发控制。
  8. electionClient:用于实现分布式选举功能,可以在 etcd 上创建选举、提交选票、监视选举结果等。

Debug运行,观察可以看到版本号,这是因为 etcd 中的每个键都有一个与之关联的版本号,用于跟踪键的修改历史。当一个键的值发生变化时,其版本号也会增加。

image-20240331110227827

通过使用 etcd 的 Watch API,可以监视键的变化,并在发生变化时接收通知。这种版本机制使得 etcd 在分布式系统中能够实现乐观并发控制、一致性和可靠性的数据访问。

3.注册中心实现

3.1 设计

了解用法后,接下来思考一下怎么设计注册中心

  1. key 如何设计?
  2. value 如何设计?
  3. key 什么时候过期?

这里有两种设计结构:

  • 层级结构

将服务理解为文件夹、将服务对应的多个节点理解为文件夹下的文件,那么可以通过服务名称,用前缀查询的方式查询到某个服务的所有节点。

image-20240331210301611

  • 列表结构。将所有的服务节点以列表的形式整体作为 value。

image-20240331210608381

对于 ZooKeeper 和 Etcd 这种支持层级查询的中间件,用第一种结构会更清晰;对于 Redis,由于本身就支持列表数据结构,可以选择第二种结构。

最后,要给key设置过期时间,这样当服务节点掉线后,可以及时移除

3.2 开发实现

现在我们以及知道了key是**/业务前缀/服务名/服务节点地址:**

那么value,就是:

3.2.1 注册信息

在 model 包下新建 ServiceMetaInfo 类,封装服务的注册信息,包括服务名称、服务版本号、服务地址、服务分组等。

这里需要加入两个方法,获取key,而且可以把版本号和分组都放到服务键名中,就可以在查询时根据这些参数获取对应版本和分组的服务了。

package com.siyi.earpc.model;

import com.siyi.earpc.constant.RpcConstant;

/**
 * @author Eric
 */
public class ServiceMetaInfo {
    /**
     * 服务名称
     */
    private String serviceName;
    /**
     * 服务版本号
     */
    private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;
    /**
     * 服务地址
     */
    private String serviceAddress;
     /**
     * 服务域名
     */
    private String serviceHost;
    /**
     * 服务端口
     */
    private int servicePort;
    /**
     * 服务分组
     */
    private String serviceGroup = "default";

    /**
     * 获取服务键名
     *
     * @return 服务键名
     */
    public String getServiceKey() {
        // 后续可扩展服务分组
//  return String.format("%s:%s:%s", serviceName, serviceVersion, serviceGroup);
        return String.format("%s:%s", serviceName, serviceVersion);
    }
    /**
     * 获取服务注册节点键名
     *
     * @return  服务注册节点键名
     */
    public String getServiceNodeKey() {
        return String.format("%s/%s", getServiceKey(), serviceAddress);
    }
}

因为我们使用了版本号服务,所以给 RpcRequest 对象补充服务版本号字段,可以先作为预留字段,默认值为 "1.0",后续再自行实现。

首先常量加入:

public interface RpcConstant { 
/**
     * 默认服务版本
     */
    String DEFAULT_SERVICE_VERSION = "1.0";
}

然后在RpcRequest使用

 public class RpcRequest implements Serializable {
     //其他代码
//    服务版本号
    private String serviceVersion  = RpcConstant.DEFAULT_SERVICE_VERSION;
 }
3.2.2注册中心配置

在 config 包下编写注册中心配置类 RegistryConfig,让用户配置连接注册中心所需的信息,比如注册中心类别、注册中心地址、用户名、密码、连接超时时间等

package com.siyi.earpc.config;

import lombok.Data;

@Data
public class RegistryConfig {
    /**
     * 注册中心类别
     */
    private String registry = "Etcd";
    /**
     * 注册中心地址
     */
    private String address = "http://localhost:2380";
    /**
     * 用户名
     */
    private String username;
    /**
     * 密码
     */
    private String password;
    /**
     * 注册中心连接超时时间
     */
    private Long timeout = 10000L;
}

3.2.3注册中心接口

为了遵循可扩展设计,这里我们写一个接口,后续可以实现多种不同的注册中心,并且和序列化器一样,可以使用 SPI 机制动态加载。

接口我们实现的功能有:

初始化、注册服务、注销服务、服务发现(获取服务节点列表)、服务销毁等方法。

package com.siyi.earpc.register;

import com.siyi.earpc.config.RegistryConfig;
import com.siyi.earpc.model.ServiceMetaInfo;

import java.util.List;

/**
 * @author Eric
 */
public interface Registry {
    /**
     * 服务注册
     *
     * @param registryConfig 注册中心配置
     */
    void init(RegistryConfig registryConfig);

    /**
     * 服务注册
     *
     * @param serviceMetaInfo 服务元信息
     */
    void register(ServiceMetaInfo serviceMetaInfo) throws Exception;

    /**
     * 服务注销
     *
     * @param serviceMetaInfo 服务元信息
     */
    void unregister(ServiceMetaInfo serviceMetaInfo) throws Exception;

    /**
     * 服务发现
     *
     * @param serviceName 服务名称
     * @return 服务地址
     */
    List<ServiceMetaInfo> discovery(String serviceName) throws Exception;

    /**
     * 服务销毁
     */
    void destroy();
}

3.2.4注册中心的实现

在 registry 目录下新建 EtcdRegistry 类,实现注册中心接口,每一个方法完成如下:

  1. 初始化

这里其实就是Demo中的创建客户端,但是这里需要加入RegistryConfig中的超时时间

定义 Etcd 键存储的根路径为 /rpc/,为了区分不同的项目。

    private static final String ETCD_ROOT_PATH = "/rpc/";
    private Client client;

    private KV kvClient;

    /**
     * 服务注册
     *
     * @param registryConfig 注册中心配置
     */
    @Override
    public void init(RegistryConfig registryConfig) {
         client = Client.builder()
                .endpoints(registryConfig.getAddress())
                .connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
                .build();
        kvClient = client.getKVClient();
    }
  1. 服务注册

这里使用了Lease租约,Etcd的API中,put和get都是使用Option,例如:PutOptionGetOption,以及这里使用了Etcd中的ByteSequence,用于转换字节

public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
    Lease leaseClient = client.getLeaseClient();
    //创建一个30秒的租约
    long leaseId = leaseClient.grant(30).get().getID();
    String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
    ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
    ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);
    //将K--V关联到租约上
    PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
    kvClient.put(key, value, putOption).get();
}
  1. 服务注销

这里代码很好理解就是删除

 @Override
    public void unregister(ServiceMetaInfo serviceMetaInfo) throws Exception {
        kvClient.delete(ByteSequence.from(ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(),
                StandardCharsets.UTF_8)).get();

    }
  1. 服务发现

这里可能lambda表达式会有点难以理解,但是核心主旨其实就是将Etcd形式的K-V转换为我们需要的**ServiceMetaInfo**对象

public List<ServiceMetaInfo> discovery(String serviceName) throws Exception {
    String prefix = ETCD_ROOT_PATH + serviceName + "/";
    try {
        GetOption getOption = GetOption.builder().isPrefix(true).build();
        ByteSequence key = ByteSequence.from(prefix, StandardCharsets.UTF_8);
        List<KeyValue> kvs = kvClient.get(key, getOption).get().getKvs();
        //将json字符串转换为对象
        return kvs.stream().map(keyValue -> {
            String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
            return JSONUtil.toBean(value, ServiceMetaInfo.class);
        }).collect(Collectors.toList());
    } catch (Exception e) {
        throw new Exception("服务获取失败");
    }
}
  1. 服务销毁

这里有一个小问题我,为什么不能直接设置为null呢?原因是:

  1. 再次使用: 如果在destroy方法之后,还有可能会重新初始化或再次使用这些客户端对象,那么将它们设为null后就需要在使用前检查是否为null并重新创建。这可能会增加代码的复杂度。
  2. 并发访问: 如果你的应用是多线程的,并且存在其他线程可能在destroy方法执行的同时访问clientkvClient的情况,直接设置为null可能会导致NullPointerException。确保对象不被设置为null,而是通过其他方式(例如使用状态标记)来控制访问,可以避免这类并发问题。
 public void destroy() {
        System.out.println("当前节点下线");
        //关闭客户端,这里不能设置为null
        if (client != null) {
            client.close();
        }
        if (kvClient != null) {
            kvClient.close();
        }
    }
完整代码
package com.siyi.earpc.register;

import cn.hutool.json.JSONUtil;
import com.siyi.earpc.config.RegistryConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import io.etcd.jetcd.*;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author Eric
 */
public class EtcdRegister implements Registry {
    private static final String ETCD_ROOT_PATH = "/rpc/";
    private Client client;

    private KV kvClient;

    /**
     * 服务注册
     *
     * @param registryConfig 注册中心配置
     */
    @Override
    public void init(RegistryConfig registryConfig) {
         client = Client.builder()
                .endpoints(registryConfig.getAddress())
                .connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
                .build();
        kvClient = client.getKVClient();
    }

    /**
     * 服务注册
     *
     * @param serviceMetaInfo 服务元信息
     */
    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        Lease leaseClient = client.getLeaseClient();
        //创建一个30秒的租约
        long leaseId = leaseClient.grant(30).get().getID();
        String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
        ByteSequence key = ByteSequence.from(registerKey, StandardCharsets.UTF_8);
        ByteSequence value = ByteSequence.from(JSONUtil.toJsonStr(serviceMetaInfo), StandardCharsets.UTF_8);
        //将K--V关联到租约上
        PutOption putOption = PutOption.builder().withLeaseId(leaseId).build();
        kvClient.put(key, value, putOption).get();
    }

    /**
     * 服务注销
     *
     * @param serviceMetaInfo 服务元信息
     */
    @Override
    public void unregister(ServiceMetaInfo serviceMetaInfo) throws Exception {
        kvClient.delete(ByteSequence.from(ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey(),
                StandardCharsets.UTF_8)).get();

    }

    /**
     * 服务发现
     *
     * @param serviceName 服务名称
     * @return 服务地址
     */
    @Override
    public List<ServiceMetaInfo> discovery(String serviceName) throws Exception {
        String prefix = ETCD_ROOT_PATH + serviceName + "/";
        try {
            GetOption getOption = GetOption.builder().isPrefix(true).build();
            ByteSequence key = ByteSequence.from(prefix, StandardCharsets.UTF_8);
            List<KeyValue> kvs = kvClient.get(key, getOption).get().getKvs();
            //将json字符串转换为对象
            return kvs.stream().map(keyValue -> {
                String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                return JSONUtil.toBean(value, ServiceMetaInfo.class);
            }).collect(Collectors.toList());
        } catch (Exception e) {
            throw new Exception("服务获取失败");
        }
    }

    /**
     * 服务销毁
     */
    @Override
    public void destroy() {
        System.out.println("当前节点下线");
        //关闭客户端,这里不能设置为null
        if (client != null) {
            client.close();
        }
        if (kvClient != null) {
            kvClient.close();
        }
    }
}

3.3 支持拓展注册中心

支持拓展也就是开发者能够填写配置来指定使用的注册中心,并且支持自定义注册中心,让框架更易用、更利于扩展。在之前的序列化器中我们以及实现了类似,这里仿照即可

3.3.1注册中心常量

在 registry 包下新建 RegistryKeys 类,列举所有支持的注册中心键名。

package com.siyi.earpc.register;

/**
 * @author Eric
 */
public interface RegistryKeys {

    String ETCD = "etcd";

    String ZOOKEEPER = "zookeeper";

}
3.3.2 工厂模式

在 registry 包下新建 RegistryFactory 类,代码可以完全参考之前的SerializerFactory略作修改

package com.siyi.earpc.register;

import com.siyi.earpc.serializer.Serializer;
import com.siyi.earpc.spi.SpiLoader;

/**
 * @author Eric
 */
public class RegistryFactory {
    private static final Registry DEFAULT_REGISTRY = new EtcdRegistry();
    static {
        SpiLoader.load(Registry.class);
    }
    /**
     * 获取实例
     *
     * @param key 注册中心key
     * @return 注册中心实例
     */
    public static Registry getInstance(String key) {
        return SpiLoader.getInstance(Registry.class, key);
    }

}

3.3.3 SPI文件

META-INFrpc/system 目录下编写注册中心接口的 SPI 配置文件,文件名称为: com.siyi.earpc.register.Registry

etcd=com.siyi.earpc.register.EtcdRegistry
3.3.4 注册中心的初始化

由于服务提供者和服务消费者都需要和注册中心建立连接,是一个 RPC 框架启动必不可少的环节,所以可以将初始化流程放在 RpcApplication 类中。

修改init方法:

public static void init(RpcConfig newRpcConfig) {
        rpcConfig = newRpcConfig;
        log.info("rpc init, config = {}", newRpcConfig.toString());
        //注册中心初始化
        RegistryConfig registryConfig = new RegistryConfig();
        Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
        registry.init(registryConfig);
        log.info("registry init, config = {}", registryConfig.toString());
    }

然后在RpcConfig中加入配置

/**
     * 注册中心配置
     */
    private RegistryConfig registryConfig = new RegistryConfig();

3.4 完成调用流程

接下来就是服务消费者的调用了,服务消费者需要先从注册中心获取节点信息,再得到调用地址并执行。

  1. 需要给 ServiceMetaInfo 类增加一个方法,便于获取可调用的地址,代码如下:
/**
 * 获取完整服务地址
 *
 * @return
 */
public String getServiceAddress() {
    if (!StrUtil.contains(serviceHost, "http")) {
        return String.format("http://%s:%s", serviceHost, servicePort);
    }
    return String.format("%s:%s", serviceHost, servicePort);
}
  1. 修改服务代理 ServiceProxy 类,更改调用逻辑。

这里首先通过反射方法获取到服务名,步骤在注释中写出:

   // 依靠注册中心解决硬编码问题
             String serviceName = method.getDeclaringClass().getName();
            //获取注册中心
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            //获取注册中心的实例
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            //将服务名和版本号赋值给serviceMetaInfo
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            //得到服务名后,这里获取服务元信息
            List<ServiceMetaInfo> discoveryList = registry.discovery(serviceMetaInfo.getServiceKey());
            if (CollUtil.isEmpty(discoveryList)) {
                throw new RuntimeException("暂无服务地址");
            }
            //之后获取服务地址
            //TODO 暂时只取第一个,后续可扩展负载均衡策略
            ServiceMetaInfo selectedServiceMetaInfo  = discoveryList.get(0);
            try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
                    .body(bytes)
                    .execute()) {
                result = httpResponse.bodyBytes();
            }

完整代码如下:

package com.siyi.earpc.proxy;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.config.RegistryConfig;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.constant.RpcConstant;
import com.siyi.earpc.model.RpcRequest;
import com.siyi.earpc.model.RpcResponse;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.register.Registry;
import com.siyi.earpc.register.RegistryFactory;
import com.siyi.earpc.serializer.Serializer;
import com.siyi.earpc.serializer.SerializerFactory;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;

public class ServiceProxy implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //指定序列化器
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
        //发送请求
        RpcRequest rpcRequest = RpcRequest.builder()
                  .name(method.getDeclaringClass().getName())
                .methodName(method.getName())
                .args(args)
                .parameterTypes(method.getParameterTypes())
                .build();

        try {
            byte[] bytes = serializer.serialize(rpcRequest);
            byte[] result;
            // 依靠注册中心解决硬编码问题
            String serviceName = method.getDeclaringClass().getName();
            //获取注册中心
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            //获取注册中心的实例
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            //将服务名和版本号赋值给serviceMetaInfo
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            //得到服务名后,这里获取服务元信息
            List<ServiceMetaInfo> discoveryList = registry.discovery(serviceMetaInfo.getServiceKey());
            if (CollUtil.isEmpty(discoveryList)) {
                throw new RuntimeException("暂无服务地址");
            }
            //之后获取服务地址
            //TODO 暂时只取第一个,后续可扩展负载均衡策略
            ServiceMetaInfo selectedServiceMetaInfo  = discoveryList.get(0);
            try (HttpResponse httpResponse = HttpRequest.post(selectedServiceMetaInfo.getServiceAddress())
                    .body(bytes)
                    .execute()) {
                result = httpResponse.bodyBytes();
            }

            RpcResponse response = serializer.deserialize(result, RpcResponse.class);
            return response.getData();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}

测试

首先验证注册中心能否正常完成服务注册、注销、服务发现。

引入单元测试,编写单元测试类 RegistryTest,代码如下:

package com.siyi.earpc.register;

import com.siyi.earpc.config.RegistryConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class RegistryTest {
    final Registry registry = new EtcdRegistry();

    @Before
    public void init() {
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("http://localhost:2379");
        registry.init(registryConfig);
    }

    @Test
    public void register() throws Exception {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.register(serviceMetaInfo);
        serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1235);
        registry.register(serviceMetaInfo);
        serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("2.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.register(serviceMetaInfo);
    }

    @Test
    public void unregister() throws Exception {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        serviceMetaInfo.setServiceHost("localhost");
        serviceMetaInfo.setServicePort(1234);
        registry.unRegister(serviceMetaInfo);
    }

    @Test
    public void discovery() throws Exception {
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName("myService");
        serviceMetaInfo.setServiceVersion("1.0");
        String serviceKey = serviceMetaInfo.getServiceKey();
        List<ServiceMetaInfo> serviceMetaInfoList = registry.discovery(serviceKey);
        Assert.assertNotNull(serviceMetaInfoList);
    }
}

测试通过后,打开界面查看结果

image-20240401130046390

这里可以修改数据继续增加数据,能够发现是层级结构很清晰

完整流程测试

example-provider 模块下新增服务提供者示例类,需要初始化 RPC 框架并且将服务手动注册到注册中心上。

修改代码为注册中心:

package com.siyi.example.provider;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.register.LocalRegistry;
import com.siyi.earpc.register.Registry;
import com.siyi.earpc.register.RegistryFactory;
import com.siyi.earpc.server.HttpServer;
import com.siyi.earpc.server.VertxHttpServer;
import com.siyi.example.common.service.UserService;

/**
 * @author Eric
 */
public class RpcProviderExample {

    public static void main(String[] args) {
        RpcApplication.init();
        // 注册服务
        String serviceName = UserService.class.getName();
        LocalRegistry.register(serviceName, UserServiceImpl.class);
        // 将服务注册到注册中心
        RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName(serviceName);
        serviceMetaInfo.setServiceAddress(rpcConfig.getServerHost()+":"+rpcConfig.getServerPort());
        String serverHost = rpcConfig.getServerHost();
        Integer serverPort = rpcConfig.getServerPort();
        System.out.println(serviceMetaInfo.getServiceAddress());
        try {
            registry.register(serviceMetaInfo);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        HttpServer httpServer = new VertxHttpServer();
        httpServer.start(RpcApplication.getRpcConfig().getServerPort());
    }
}

这里注意,因为我们使用的EtcdKeeper占用了8080,所以这里测试的时候将服务端口改为了8081,之后启动消费者,完成调用

windows查看占用: netstat -aon | findstr "8080"

image-20240402003015840

踩坑

这里因为我的**ServiceMetaInfogetServiceAddressData的方法重名导致了重写,因此注意不要将这里方法改为重名所以这里使用方法名:getServiceAddr**

注册中心拓展

注册中心现在基本以及完成,但是我们需要思考现在的不足:

  1. 数据一致性:服务提供者如果下线了,注册中心需要即时更新,剔除下线节点。否则消费者可能会调用到已经下线的节点。
  2. 性能优化:服务消费者每次都需要从注册中心获取服务,可以使用缓存进行优化。
  3. 高可用性:保证注册中心本身不会宕机。
  4. 可扩展性:实现更多其他种类的注册中心。

1.心跳检测和续期机制

1.1 心跳检测的介绍

那么什么是心跳检测呢?

心跳检测(俗称 heartBeat)是一种用于监测系统是否正常工作的机制。它通过定期发送 心跳信号(请求)来检测目标系统的状态。

也就是说,接收方在一定时间内没有收到心跳信号或者未能正常响应请求,就会认为目标系统故障或不可用,从而触发相应的处理或告警机制。

这种机制在分布式、微服务系统中,比如集群管理、服务健康检查等。用处很常见

我们只需要写一个接口,当定期调用这个接口正常的时候,说明这个接口是正常的,实现:

@RestController
class HealthCheckController {

    // 健康检查接口
    @GetMapping("/actuator/health")
    public String healthCheck() {
        // 在这里可以添加其他健康检查逻辑,例如检查数据库连接、第三方服务等

        // 返回一个简单的健康状态
        return "OK";
    }
}

1.2 方案

很明显,实现心跳检测一般需要 2 个关键:

  • 定时
  • 网络请求

这里实现思路也很简单,利用Etcd的过期机制:

简单来说就是:

让服务提供者注册后,不断重置时间(但是注意这里的续约一定要小于组约时间,因为如果大于等于的话就已经删除了),当从注册中心访问的时候,访问不到这个服务,说明这个节点就G了

步骤:

  1. 服务提供者向 Etcd 注册自己的服务信息,并在注册时设置 TTL(生存时间)。
  2. Etcd 在接收到服务提供者的注册信息后,会自动维护服务信息的 TTL,并在 TTL 过期时删除该服务信息。
  3. 服务提供者定期请求 Etcd 续签自己的注册信息,重写 TTL。

这里设置,允许一次容错的机会

那么现在有个问题:

每个服务提供者都需要找到自己注册的节点、续期自己的节点,但问题是,

怎么找到当前服务提供者项目自己的节点呢?

可以充分利用本地的特性,在服务提供者本地维护一个 已注册节点集合,注册时添加节点 key 到集合中,只需要续期集合内的 key 即可。

1.3 实现

首先:

给注册中心 Registry 接口补充心跳检测方法,代码如下

public interface Registry {

    ...
    
    /**
     * 心跳检测(服务端)
     */
    void heartBeat();
}

然后在Register实现中加入本地注册集合缓存

/**
 * 本机注册的节点 key 集合(用于维护续期)
 */
private final Set<String> localRegisterNodeKeySet = new HashSet<>();

然后在注册和注销的时候,需要分别将节点加入到缓存中

 @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        ···
            
        //将注册的节点key加入到本地注册的节点key集合中
        localRegisterNodeKeySet.add(registerKey);
    }
  @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) throws Exception {
        String registerKey = ETCD_ROOT_PATH + serviceMetaInfo.getServiceNodeKey();
        kvClient.delete(ByteSequence.from(registerKey,
                StandardCharsets.UTF_8)).get();
        localRegisterNodeKeySet.remove(registerKey);
    }

实现heartBeat:

  /**
     * 心跳检测(服务端)
     */
    @Override
    public void heartBeat() {
        CronUtil.schedule("*/10 * * * * *", new Task() {

            /**
             * 执行作业
             * <p>
             * 作业的具体实现需考虑异常情况,默认情况下任务异常在监听中统一监听处理,如果不加入监听,异常会被忽略<br>
             * 因此最好自行捕获异常后处理
             */
            @Override
            public void execute() {
                for (String key : localRegisterNodeKeySet){
                    try {
                        List<KeyValue> keyValues = kvClient.get(ByteSequence.from(key, StandardCharsets.UTF_8))
                                .get()
                                .getKvs();
                        //节点已经过期,需要重新注册
                        if (keyValues.isEmpty()) {
                            continue;
                        }
                        //续约(没有过期)
                        KeyValue keyValue = keyValues.get(0);
                        String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                        ServiceMetaInfo serviceMetaInfo = JSONUtil.toBean(value, ServiceMetaInfo.class);
                        register(serviceMetaInfo);
                    } catch (Exception e) {
                        throw new RuntimeException(key+ "续约失败",e);
                    }
                }
            }
        });
        //支持秒级别定时任务
        CronUtil.setMatchSecond(true);
        CronUtil.start();
    }

之后,还需要在初始化注册中心的时候,开启心跳检测

 @Override
    public void init(RegistryConfig registryConfig) {
         client = Client.builder()
                .endpoints(registryConfig.getAddress())
                .connectTimeout(Duration.ofMillis(registryConfig.getTimeout()))
                .build();
        kvClient = client.getKVClient();
        //心跳检测
        heartBeat();
    }

这种方法实现的好处就是,即时 Etcd 注册中心的数据出现了丢失,通过心跳检测机制也会重新注册节点信息。

测试

完善之前的 RegistryTest 单元测试代码:

@Test
    public void heartBeat() throws Exception {
        // init 方法中已经执行心跳检测了
        register();
        // 阻塞 1 分钟
        Thread.sleep(60 * 1000L);
    }

之后使用EtcdKeeper查看TTL会一直刷新:

image-20240402110829773

2.服务节点下线机制

当服务提供者节点宕机时,应该从注册中心移除掉已注册的节点,否则会影响消费端调用。所以我们需要设计一套服务节点下线机制。

2.1 设计

服务节点下线又分为:

  1. 主动下线:服务提供者项目正常退出时,主动从注册中心移除注册信息。
  2. 被动下线:服务提供者项目异常推出时,利用 Etcd 的 key 过期机制自动移除。

这里被动下线已经可以利用 Etcd 的机制实现了,我们主要开发主动下线。

2.2 实现

JVM 的 ShutdownHook 是 Java 虚拟机提供的一种机制,允许开发者在 JVM 即将关闭之前执行一些清理工作或其他必要的操作,例如关闭数据库连接、释放资源、保存临时数据等。

  1. 完善 Etcd 注册中心的 destroy 方法,补充下线节点的逻辑。
  public void destroy() {
        System.out.println("当前节点下线");
        // 下线节点
        // 遍历本节点所有的 key
        for (String key : localRegisterNodeKeySet) {
            try {
                kvClient.delete(ByteSequence.from(key, StandardCharsets.UTF_8)).get();
            } catch (Exception e) {
                throw new RuntimeException(key + "节点下线失败");
            }
        }
        //关闭客户端,这里不能设置为null
        if (client != null) {
            client.close();
        }
        if (kvClient != null) {
            kvClient.close();
        }
    }
  1. RpcApplication 的 init 方法中,注册 Shutdown Hook,当程序正常退出时会执行注册中心的 destroy 方法。
 public static void init(RpcConfig newRpcConfig) {
        rpcConfig = newRpcConfig;
        log.info("rpc init, config = {}", newRpcConfig.toString());
        //注册中心初始化
        RegistryConfig registryConfig = new RegistryConfig();
        Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
        registry.init(registryConfig);
        log.info("registry init, config = {}", registryConfig);
     
        //注册shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("shutdown hook has been executed");
            registry.destroy();
        }));
    }

测试

启动服务提供者,然后观察服务是否成功被注册,如果成功则:正常停止服务提供者,然后观察服务信息是否被删除

3.消费端服务缓存

正常情况下,服务节点信息列表的更新频率是不高的,所以在服务消费者从注册中心获取到服务节点信息列表后,完全可以 缓存在本地,下次就不用再请求注册中心获取了,能够提高性能。

3.1 增加本地缓存

这里需要想:我们什么时候将注册中心的信息加入到缓存呢?

答案是:使用服务发现的时候,优先从缓存中获取,如果没有就从Etcd中获取,并加入到缓存

registry 包下新增缓存类 RegistryServiceCache,代码如下:

package com.siyi.earpc.register;

import com.siyi.earpc.model.ServiceMetaInfo;

import java.util.List;

/**
 * @author Eric
 */
public class RegistryServiceCache {
    /**
     * 服务缓存
     */
    List<ServiceMetaInfo> serviceCache;

    /**
     * 写缓存
     */
    void writeCache(List<ServiceMetaInfo> serviceCache) {
        this.serviceCache = serviceCache;
    }

    /**
     * 读缓存
     */
    List<ServiceMetaInfo> readCache() {
        return this.serviceCache;
    }

    /**
     * 清空缓存
     */
    void clearCache() {
        this.serviceCache = null;
    }
}

3.2 使用本地缓存

修改 EtcdRegisty 的代码,使用本地缓存对象:

/**
 * 注册中心服务缓存
 */
private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();

然后修改注册中心中获取服务的方式:

  public List<ServiceMetaInfo> discovery(String serviceName) throws Exception {
        //优先从缓存中获取
        List<ServiceMetaInfo> serviceMetaInfos = registryServiceCache.readCache();
        if (serviceMetaInfos != null) {
            return serviceMetaInfos;
        }
        //从注册中心获取
        String prefix = ETCD_ROOT_PATH + serviceName + "/";
        try {
            GetOption getOption = GetOption.builder().isPrefix(true).build();
            ByteSequence key = ByteSequence.from(prefix, StandardCharsets.UTF_8);
            List<KeyValue> kvs = kvClient.get(key, getOption).get().getKvs();
            //将json字符串转换为对象
            List<ServiceMetaInfo> serviceMetaInfoList = kvs.stream().map(keyValue -> {
                String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                return JSONUtil.toBean(value, ServiceMetaInfo.class);
            }).collect(Collectors.toList());

            //将服务地址写入缓存
            registryServiceCache.writeCache(serviceMetaInfoList);
            return serviceMetaInfoList;
        } catch (Exception e) {
            throw new Exception("服务获取失败");
        }
    }

写到这里可能会想一个问题:直接从缓存中读取,万一有节点突然下线怎么办?那就需要用到监听机制了用于更新缓存

3.3 服务缓存更新 - 监听机制

3.3.1 设计

当服务注册信息发生变更(比如节点下线)时,需要即时更新消费端缓存

问题是,怎么知道服务注册信息的变更时间呢?

这就需要我们使用 Etcd 的 watch 监听机制,当监听的某个 key 发生修改或删除时,就会触发事件来通知监听者。

流程

image-20240402121148811

这里首先思考一个很重要的问题:

  1. 我们什么时候创建监听器呢?
  2. 监听应该在服务端还是消费端?

由于我们的目标是更新缓存,缓存是在服务消费端维护和使用的,所以也应该是服务消费端去 watch。

那么比较合适的位置就是服务发现方法(serviceDiscovery)。可以对本次获取到的所有服务节点 key 进行监听。

还需要防止重复监听同一个 key,可以通过定义一个已监听 key 的集合来实现。

3.3.2 开发
  1. Registry 注册中心接口补充监听 key 的方法,代码如下:
public interface Registry {

    /**
     * 监听(消费端)
     *
     * @param serviceNodeKey
     */
    void watch(String serviceNodeKey);
}
  1. EtcdRegistry 类中,新增监听 key 的集合。可以使用 ConcurrentHashSet 防止并发冲突,代码如下:
/**
 * 正在监听的 key 集合
 */
private final Set<String> watchingKeySet = new ConcurrentHashSet<>();
  1. EtcdRegistry 类中实现监听 key 的方法。
 /**
     * 监听(消费端)
     *
     * @param serviceNodeKey  服务节点key
     */
    @Override
    public void watch(String serviceNodeKey) {
        Watch watchClient = client.getWatchClient();
        //监听的key
        boolean newWatch = watchingKeySet.add(serviceNodeKey);
        watchClient.watch(ByteSequence.from(serviceNodeKey, StandardCharsets.UTF_8), watchResponse -> {
            for (WatchEvent event : watchResponse.getEvents()) {
                switch (event.getEventType()) {
                    case DELETE:
                        //删除节点
                        registryServiceCache.clearCache();
                        System.out.println("删除节点:" + event.getKeyValue().getKey().toString(StandardCharsets.UTF_8));
                        break;
                    case PUT:
                        //TODO 添加节点
                    default:
                        break;
                }
            }
        });
    }
  1. 对key进行监听,修改服务端服务发现的方法
  public List<ServiceMetaInfo> discovery(String serviceName) throws Exception {
        ···
        //从注册中心获取
        String prefix = ETCD_ROOT_PATH + serviceName + "/";
        try {
            GetOption getOption = GetOption.builder().isPrefix(true).build();
            ByteSequence keyBytes = ByteSequence.from(prefix, StandardCharsets.UTF_8);
            List<KeyValue> kvs = kvClient.get(keyBytes, getOption).get().getKvs();
            //将json字符串转换为对象
            List<ServiceMetaInfo> serviceMetaInfoList = kvs.stream().map(keyValue -> {
                String key = keyValue.getKey().toString(StandardCharsets.UTF_8);
                //监听服务节点
                watch(key);
                
                String value = keyValue.getValue().toString(StandardCharsets.UTF_8);
                return JSONUtil.toBean(value, ServiceMetaInfo.class);
            }).collect(Collectors.toList());

            //将服务地址写入缓存
           ···
    }

测试

使用如下步骤,通过 debug 进行测试:

  1. 先启动服务提供者
  2. 修改服务消费者项目,连续调用服务 3 次,通过 debug 可以发现,第一次查注册中心、第二次查询缓存。
    //调用
        User newUser = userService.getUser(user);
        User newUser1= userService.getUser(user);
        User newUser2 = userService.getUser(user);
        User newUser3= userService.getUser(user);
        User newUser4 = userService.getUser(user);

第二次调用:

image-20240402125311808

  1. 在第三次要调用服务时,下线服务提供者,可以在注册中心看到节点的注册 key 已被删除。

这里断点打到删除节点的代码,发现当下线生产者时,成功删除了节点

  1. 继续向下执行,发现第三次调用服务时,又重新从注册中心查询,说明缓存已经被更新。

这里我们结合了上一步的服务去除机制,当停止服务提供者的时候,也就是从注册中心去除了节点,这里也就从注册中心无法查到了

4.基于 ZooKeeper 的注册中心实现

  1. 安装 ZooKeeper
  2. 引入客户端依赖
  3. 实现接口
  4. SPI 补充 ZooKeeper 注册中心

4.1 下载安装

下载bin.tar.gz格式:

Index of /zookeeper (apache.org)

一般我们会使用 Apache Curator 来操作 ZooKeeper,可以参考官方文档:

https://curator.apache.org/docs/getting-started

引入依赖:

<!-- zookeeper -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>5.6.0</version>
</dependency>

完整代码如下:

package com.siyi.earpc.register;

import cn.hutool.core.collection.ConcurrentHashSet;
import com.siyi.earpc.config.RegistryConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * @author Eric
 */
@Slf4j
public class ZooKeeperRegistry implements Registry {
    private CuratorFramework client;

    private ServiceDiscovery<ServiceMetaInfo> serviceDiscovery;

    /**
     * 本机注册的节点 key 集合(用于维护续期)
     */
    private final Set<String> localRegisterNodeKeySet = new HashSet<>();

    /**
     * 注册中心服务缓存
     */
    private final RegistryServiceCache registryServiceCache = new RegistryServiceCache();

    /**
     * 正在监听的 key 集合
     */
    private final Set<String> watchingKeySet = new ConcurrentHashSet<>();

    /**
     * 根节点
     */
    private static final String ZK_ROOT_PATH = "/rpc/zk";


    /**
     * 服务注册
     *
     * @param registryConfig 注册中心配置
     */
    @Override
    public void init(RegistryConfig registryConfig) {
        //构建
        client = CuratorFrameworkFactory
                .builder()
                .connectString(registryConfig.getAddress())
                .retryPolicy(new ExponentialBackoffRetry(Math.toIntExact(registryConfig.getTimeout()), 3))
                .build();

        //构建serviceDiscovery实例
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaInfo.class)
                .client(client)
                .basePath(ZK_ROOT_PATH)
                .serializer(new JsonInstanceSerializer<>(ServiceMetaInfo.class))
                .build();
        //启动
        try {
            // 启动 client 和 serviceDiscovery
            client.start();
            serviceDiscovery.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 服务注册
     *
     * @param serviceMetaInfo 服务元信息
     */
    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        serviceDiscovery.registerService(buildServiceInstance(serviceMetaInfo));
        // 添加节点信息到本地缓存
        String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
        localRegisterNodeKeySet.add(registerKey);
    }

    /**
     * 服务注销
     *
     * @param serviceMetaInfo 服务元信息
     */
    @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) throws Exception {
        try {
            serviceDiscovery.unregisterService(buildServiceInstance(serviceMetaInfo));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        // 从本地缓存移除
        String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
        localRegisterNodeKeySet.remove(registerKey);
    }

    /**
     * 服务发现
     *
     * @param serviceName 服务名称
     * @return 服务地址
     */
    @Override
    public List<ServiceMetaInfo> discovery(String serviceName) throws Exception {
        // 优先从缓存获取服务
        List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceCache.readCache();
        if (cachedServiceMetaInfoList != null) {
            return cachedServiceMetaInfoList;
        }

        try {
            // 查询服务信息
            Collection<ServiceInstance<ServiceMetaInfo>> serviceInstanceList = serviceDiscovery.queryForInstances(serviceName);

            // 解析服务信息
            List<ServiceMetaInfo> serviceMetaInfoList = serviceInstanceList.stream()
                    .map(ServiceInstance::getPayload)
                    .collect(Collectors.toList());

            // 写入服务缓存
            registryServiceCache.writeCache(serviceMetaInfoList);
            return serviceMetaInfoList;
        } catch (Exception e) {
            throw new RuntimeException("获取服务列表失败", e);
        }
    }

    /**
     * 服务销毁
     */
    @Override
    public void destroy() {
        log.info("当前节点下线");
        // 下线节点(这一步可以不做,因为都是临时节点,服务下线,自然就被删掉了)
        for (String key : localRegisterNodeKeySet) {
            try {
                client.delete().guaranteed().forPath(key);
            } catch (Exception e) {
                throw new RuntimeException(key + "节点下线失败");
            }
        }

        // 释放资源
        if (client != null) {
            client.close();
        }
    }

    /**
     * 心跳检测(服务端)
     */
    @Override
    public void heartBeat() {
        // 不需要心跳机制,建立了临时节点,如果服务器故障,则临时节点直接丢失
    }

    /**
     * 监听(消费端)
     *
     * @param serviceNodeKey
     */
    @Override
    public void watch(String serviceNodeKey) {
        String watchKey = ZK_ROOT_PATH + "/" + serviceNodeKey;
        boolean newWatch = watchingKeySet.add(watchKey);
        if (newWatch) {
            CuratorCache curatorCache = CuratorCache.build(client, watchKey);
            curatorCache.start();
            curatorCache.listenable().addListener(
                    CuratorCacheListener
                            .builder()
                            .forDeletes(childData -> registryServiceCache.clearCache())
                            .forChanges(((oldNode, node) -> registryServiceCache.clearCache()))
                            .build()
            );
        }
    }

    private ServiceInstance<ServiceMetaInfo> buildServiceInstance(ServiceMetaInfo serviceMetaInfo) {
        String serviceAddress = serviceMetaInfo.getServiceHost() + ":" + serviceMetaInfo.getServicePort();
        try {
            return ServiceInstance
                    .<ServiceMetaInfo>builder()
                    .id(serviceAddress)
                    .name(serviceMetaInfo.getServiceKey())
                    .address(serviceAddress)
                    .payload(serviceMetaInfo)
                    .build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

之后加入SPI支持

com.siyi.earpc.register.ZooKeeperRegistry

最后更新配置:

rpc.registryConfig.registry=zookeeper
rpc.registryConfig.address=localhost:2181

自定义协议

1.需求分析

目前的 RPC 框架,我们使用 Vert.x 的 HttpServer 作为服务提供者的服务器,代码实现比较简单,其底层网络传输使用的是 HTTP 协议。

但是这样也有很多的缺点:

一般情况下,RPC 框架会比较注重性能,而 HTTP 协议中的头部信息、请求响应格式较 “重”,会影响网络传输性能。有很多请求或是响应标头,例如:

image-20240429200221349

2.设计方案

自定义 RPC 协议可以分为 2 大核心部分:

  • 自定义网络传输
  • 自定义消息结构

2.1网络传输

网络传输设计的目标是:选择一个能够高性能通信的网络协议和传输方式。

首先,HTTP 协议的头信息是比较大的,会影响传输性能。并且HTTP 本身属于无状态协议,这意味着每个 HTTP 请求都是独立的,每次请求 / 响应都要重新建立和关闭连接,也会影响性能。

考虑到这点,在 HTTP/1.1 中引入了持久连接(Keep-Alive),允许在单个 TCP 连接上发送多个 HTTP 请求和响应,避免了每次请求都要重新建立和关闭连接的开销。

其他的不论,单独说网络的TCP/IP模型(应用层,传输层,网络层,物理层)中,使用传输层肯定是比应用层快的

因此,使用 TCP 协议完成网络传输,有更多的自主设计空间。

2.2消息结构设计

消息结构设计的目标是:用 最少的 空间传递 需要的 信息。

2.2.1如何使用最少的空间

我们在自定义消息结构时,想要节省空间,就要尽可能使用更轻量的类型,比如 byte 字节类型,只占用 1 个字节、8 个 bit 位,相比其他数据类型,占用的空间更小

注:Java 中实现 bit 位运算拼接相对比较麻烦,所以权衡开发成本,我们设计消息结构时,尽量给每个数据凑到整个字节。

2.2.2消息内需要哪些信息

分析 HTTP 请求结构,我们能够得到 RPC 消息所需的信息:

  • 魔数:作用是安全校验,防止服务器处理了非框架发来的乱七八糟的消息(类似 HTTPS 的安全证书)
  • 版本号:保证请求和响应的一致性(类似 HTTP 协议有 1.0/2.0 等版本)
  • 序列化方式:来告诉服务端和客户端如何解析数据(类似 HTTP 的 Content-Type 内容类型)
  • 类型:标识是请求还是响应?或者是心跳检测等其他用途。(类似 HTTP 有请求头和响应头)
  • 状态:如果是响应,记录响应的结果(类似 HTTP 的 200 状态代码)
  • 请求 id:唯一标识某个请求
  • 请求体:发送内容数据

考虑到半包和粘包的问题,每次传输的数据可能是不完整的,所以我们需要在消息头中新增一个字段 请求体数据长度,保证能够完整地获取 body 内容信息。

因此,我们有以下结构:

image-20240429201752415

实际上,这些数据应该是紧凑的,请求头信息总长 17 个字节。也就是说,上述消息结构,本质上就是拼接在一起的一个字节数组。我们后续实现时,需要有 消息编码器消息解码器,编码器先 new 一个空的 Buffer 缓冲区**,然后按照顺序向缓冲区依次写入这些数据;解码器在读取时也按照顺序依次读取,就能还原出编码前的数据。**

Dubbo协议设计:

image-20240429202204938

通过这种约定的方式,我们就不用记录头信息了。比如 magic 魔数,不用存储 “magic” 这个字符串,而是读取第一个字节(前 8 bit)就能获取到。

3.开发实现

3.1 消息结构

  1. 新建协议消息类 ProtocolMessage,将消息头单独封装为一个内部类,消息体可以使用泛型类型,完整代码如下:
package com.siyi.earpc.protocol;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {
    /**
     * 消息头
     */
    private Header header;
    /**
     * 消息体
     */
    private T body;
    @Data
    public static class Header {
        /**
         * 魔数
         */
        private byte magic;
        /**
         * 协议版本
         */
        private byte version;
        /**
         * 序列化器
         */
        private byte serializer;
        /**
         * 状态
         */
        private byte status;
        /**
         * 消息类型
         */
        private byte type;
        /**
         * 消息 ID
         */
        private long requestId;
        /**
         * 数据长度
         */
        private int bodyLength;

    }


}

  1. 新建协议常量类 ProtocolConstant,记录了和自定义协议有关的关键信息,比如消息头长度、魔数、版本号
package com.siyi.earpc.protocol;

/**
 * @author Eric
 */
public interface ProtocolConstant {
    /**
     * 消息头长度
     */
    int MESSAGE_HEADER_LENGTH = 17;
    /**
     * 魔数
     */
    byte MAGIC = 0x1;

    /**
     * 协议版本号
     */
    byte PROTOCOL_VERSION = 0x1;
}

  1. 新建消息字段的枚举类,比如:协议状态枚举,暂时只定义成功、请求失败、响应失败三种枚举值:
package com.siyi.earpc.protocol;

import lombok.Getter;

/**
 * @author Eric
 */

@Getter
public enum ProtocolMessageStatusEnum {
    OK("ok", 20),
    BAD_REQUEST("badRequest", 40),
    BAD_RESPONSE("badResponse", 50);

    private final String text;
    private final int value;

     ProtocolMessageStatusEnum(String text, int value) {
        this.text = text;
        this.value = value;
    }
    // 通过value获取对应的枚举
    public static ProtocolMessageStatusEnum fromValue(int value) {
         for(ProtocolMessageStatusEnum statusEnum : ProtocolMessageStatusEnum.values()) {
             if(statusEnum.value == value) {
                 return statusEnum;
             }
         }
         return null;
    }

}

协议消息类型枚举,包括请求、响应、心跳、其他。代码如下:

package com.siyi.earpc.protocol;

/**
 * @author Eric
 * 协议消息类型枚举
 */

public enum ProtocolMessageTypeEnum {
    REQUEST(0),
    RESPONSE(1),
    HEART_BEAT(2),
    OTHERS(3);
    private final byte key;

    ProtocolMessageTypeEnum(int key) {
        this.key = (byte) key;
    }
    // 通过key获取对应的枚举
    public static ProtocolMessageTypeEnum fromKey(byte key) {
        for(ProtocolMessageTypeEnum typeEnum : ProtocolMessageTypeEnum.values()) {
            if(typeEnum.key == key) {
                return typeEnum;
            }
        }
        return null;
    }
}

协议消息的序列化器枚举,跟我们 RPC 框架已支持的序列化器对应。代码如下:

package com.siyi.earpc.protocol;

import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 协议消息序列化枚举
 * @author Eric
 */
@Getter
public enum ProtocolMessageSerializerEnum {
    JDK(0, "jdk"),
    JSON(1, "json"),
    KRYO(2, "kryo"),
    HESSIAN(3, "hessian");

    private final int key;

    private final String value;

    ProtocolMessageSerializerEnum(int key, String value) {
        this.key = key;
        this.value = value;
    }
    /**
     * 获取值列表
     *
     * @return
     */
    public static List<String> getValues() {
        return Arrays.stream(values()).map(item -> item.value).collect(Collectors.toList());
    }

    /**
     * 根据 key 获取枚举
     *
     * @param key
     * @return
     */
    public static ProtocolMessageSerializerEnum getEnumByKey(int key) {
        for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
            if (anEnum.key == key) {
                return anEnum;
            }
        }
        return null;
    }


    /**
     * 根据 value 获取枚举
     *
     * @param value
     * @return
     */
    public static ProtocolMessageSerializerEnum getEnumByValue(String value) {
        if (ObjectUtil.isEmpty(value)) {
            return null;
        }
        for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
            if (anEnum.value.equals(value)) {
                return anEnum;
            }
        }
        return null;
    }
}

3.2 网络传输

首先新建 server.tcp 包,将所有 TCP 服务相关的代码放到该包中。

  1. TCP 服务器实现

新建 VertxTcpServer 类,跟之前写的 VertxHttpServer 类似,先创建 Vert.x 的服务器实例,然后定义处理请求的方法,比如回复 “Hello, client!”,最后启动服务器。

package com.siyi.earpc.server;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;

/**
 * @author Eric
 * HTTP 服务器
 */
public class VertxHttpServer implements HttpServer {
    private byte[] handleRequest(byte[] requestData) {
        // 在这里编写处理请求的逻辑,根据 requestData 构造响应数据并返回
        // 这里只是一个示例,实际逻辑需要根据具体的业务需求来实现
        return "Hello, client!".getBytes();
    }

    /**
     * 启动服务器
     *
     * @param port
     */
    @Override
    public void start(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(socket -> {
            // 处理连接
            socket.handler(buffer -> {
                // 处理请求数据
                byte[] requestData = buffer.getBytes();
                // 处理请求并返回响应数据,这里进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应等
                byte[] responseData = handleRequest(requestData);
                System.out.println(Buffer.buffer(responseData));
            });


            // 启动 HTTP 服务器并监听指定端口
            server.listen(port, result -> {
                if (result.succeeded()) {
                    System.out.println("Server is now listening on port " + port);
                } else {
                    System.err.println("Failed to start server: " + result.cause());
                }
            });
        });
    }
}

上述代码中的 socket.write 方法,就是在向连接到服务器的客户端发送数据。注意发送的数据格式为 Buffer,这是 Vert.x 为我们提供的字节数组缓冲区实现

  1. TCP 客户端实现

新建 VertxTcpClient 类,先创建 Vert.x 的客户端实例,然后定义处理请求的方法,比如回复 “Hello, server!”,并建立连接。

  1. 可以先进行简单的测试,先启动服务器,再启动客户端,能够在控制台看到它们互相打招呼的输出。

image-20240429212247929

3.3 编(解)码器

Vert.x 的 TCP 服务器收发的消息是 Buffer 类型,不能直接写入一个对象。因此,我们需要编码器和解码器,将 Java 的消息对象和 Buffer 进行相互转换。

image-20240429212436021

  1. 首先实现消息编码器

在 protocol 包下新建 ProtocolMessageEncoder,核心流程是依次向 Buffer 缓冲区写入消息对象里的字段。

package com.siyi.earpc.protocol;

import com.siyi.earpc.serializer.Serializer;
import com.siyi.earpc.serializer.SerializerFactory;
import io.vertx.core.buffer.Buffer;

import java.io.IOException;
/**
 * 协议消息编码器
 * @author Eric
 */
public class ProtocolMessageEncoder {
    public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
        if (protocolMessage == null || protocolMessage.getHeader() == null) {
            return Buffer.buffer();
        }
        ProtocolMessage.Header header = protocolMessage.getHeader();
        Buffer buffer = Buffer.buffer();
        buffer.appendByte(header.getMagic());
        buffer.appendByte(header.getVersion());
        buffer.appendByte(header.getSerializer());
        buffer.appendByte(header.getStatus());
        buffer.appendByte(header.getType());
        buffer.appendLong(header.getRequestId());
        //获取序列化器的枚举
        ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
        if (serializerEnum == null) {
            return Buffer.buffer();
        }
        //获取序列化器实例
        Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
        //序列化消息体
        byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
        // 写入 body 长度和数据
        buffer.appendInt(bodyBytes.length);
        buffer.appendBytes(bodyBytes);
        return buffer;

    }
}

  1. 实现消息解码器

在 protocol 包下新建 ProtocolMessageDecoder,核心流程是依次从 Buffer 缓冲区的指定位置读取字段,构造出完整的消息对象。

  1. 编写单元测试类,先编码再解码,以测试编码器和解码器的正确性。
package com.siyi.earpc.protocol;


import cn.hutool.core.util.IdUtil;
import com.siyi.earpc.constant.RpcConstant;
import com.siyi.earpc.model.RpcRequest;
import io.vertx.core.buffer.Buffer;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class ProtocolMessageTest {
    @Test
    public void testEncodeAndDecode() throws IOException {
        // 构造消息
        ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
        ProtocolMessage.Header header = new ProtocolMessage.Header();
        header.setMagic(ProtocolConstant.MAGIC);
        header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
        header.setSerializer((byte) ProtocolMessageSerializerEnum.JDK.getKey());
        header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
        header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
        header.setRequestId(IdUtil.getSnowflakeNextId());
        header.setBodyLength(0);
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setName("myService");
        rpcRequest.setMethodName("myMethod");
        rpcRequest.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
        rpcRequest.setParameterTypes(new Class[]{String.class});
        rpcRequest.setArgs(new Object[]{"aaa", "bbb"});
        protocolMessage.setHeader(header);
        protocolMessage.setBody(rpcRequest);

        Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
        ProtocolMessage<?> message = ProtocolMessageDecoder.decode(encodeBuffer);
        Assert.assertNotNull(message);
    }
}

3.4 请求处理器(服务提供者)

可以使用 netty 的 pipeline 组合多个 handler(比如编码 => 解码 => 请求 / 响应处理)

请求处理器的类似之前的 HttpServerHandler,我们需要开发一个 TcpServerHandler,用于处理请求。和 HttpServerHandler 的区别只是在获取请求、写入响应的方式上,需要调用上面开发好的编码器和解码器。

通过实现 Vert.x 提供的 Handler<NetSocket> 接口,可以定义 TCP 请求处理器。

实现基本和HttpServerHandler相似

package com.siyi.earpc.server.tcp;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.model.RpcRequest;
import com.siyi.earpc.model.RpcResponse;
import com.siyi.earpc.protocol.ProtocolMessage;
import com.siyi.earpc.protocol.ProtocolMessageDecoder;
import com.siyi.earpc.protocol.ProtocolMessageEncoder;
import com.siyi.earpc.protocol.ProtocolMessageStatusEnum;
import com.siyi.earpc.register.LocalRegistry;
import com.siyi.earpc.serializer.Serializer;
import com.siyi.earpc.serializer.SerializerFactory;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import java.io.IOException;
import java.lang.reflect.Method;

/**
 * @author Eric
 * TCP服务端处理器
 */
public class TcpServerHandler implements Handler<NetSocket> {
    /**
     * 处理请求
     *
     * @param netSocket socket
     */
    @Override
    public void handle(NetSocket netSocket) {
        // 1. 转换为对象,从对象中获得参数
        //指定序列化器
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
        //记录日志

        netSocket.handler(buffer -> {
            ProtocolMessage<RpcRequest> protocolMessage;
            try {
                protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
            } catch (IOException e) {
                throw new RuntimeException("协议消息解码错误");
            }
            RpcRequest rpcRequest = protocolMessage.getBody();
            //处理请求
            //构建响应对象
            RpcResponse rpcResponse = new RpcResponse();

            //从注册器获取实现类,反射调用
            Class<?> implClass = LocalRegistry.get(rpcRequest.getName());
            try {
                Method methods = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                Object result = methods.invoke(implClass.newInstance(), rpcRequest.getArgs());
                //返回结果
                rpcResponse.setData(result);
                rpcResponse.setDataType(methods.getReturnType());
                rpcResponse.setMessage("ok");
            } catch (Exception e) {
                e.printStackTrace();
                rpcResponse.setException(e);
                rpcResponse.setMessage(e.getMessage());

            }
            //返回响应
            ProtocolMessage.Header header = new ProtocolMessage.Header();
            header.setType((byte) ProtocolMessageStatusEnum.OK.getValue());
            ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
            try {
                Buffer encode = ProtocolMessageEncoder.encode(rpcResponseProtocolMessage);
                netSocket.write(encode);
            } catch (IOException e) {
                throw new RuntimeException("协议消息编码错误");
            }

        });
    }
}

3.5 请求发送(服务消费者)

调整服务消费者发送请求的代码,改 HTTP 请求为 TCP 请求。

public class ServiceProxy implements InvocationHandler {
    //...原来的代码
// 发送 TCP 请求
            Vertx vertx = Vertx.vertx();
            NetClient netClient = vertx.createNetClient();
            CompletableFuture<RpcResponse> responseCompletableFuture = new CompletableFuture<>();
            netClient.connect(selectedServiceMetaInfo.getServicePort(), selectedServiceMetaInfo.getServiceHost(), res -> {
                if (res.succeeded()) {
                    System.out.println("Connected to server");
                    io.vertx.core.net.NetSocket socket = res.result();
                    //发送数据
                    //构造协议消息
                    ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
                    ProtocolMessage.Header header = new ProtocolMessage.Header();
                    header.setMagic(ProtocolConstant.MAGIC);
                    header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
                    header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
                    header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
                    header.setRequestId(IdUtil.getSnowflakeNextId());
                    protocolMessage.setHeader(header);
                    protocolMessage.setBody(rpcRequest);
                    try {
                        Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
                        socket.write(encodeBuffer);
                    } catch (IOException e) {
                        throw new RuntimeException("协议消息编码错误");
                    }
                    //接受响应
                    socket.handler(buffer -> {
                        try {
                            ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = ProtocolMessageDecoder.decode(buffer);
                            responseCompletableFuture.complete(rpcResponseProtocolMessage.getBody());
                        } catch (IOException e) {
                            throw new RuntimeException("协议消息解码错误");
                        }
                    });
                } else {
                    System.out.println("Failed to connect to server: " + res.cause().getMessage());

                }
            });
            RpcResponse rpcResponse = responseCompletableFuture.get();
            netClient.close();
            return rpcResponse.getData();
        } catch (Exception e) {
            e.printStackTrace();
        }


        return null;
    }

由于 Vert.x 提供的请求处理器是异步、反应式的,我们为了更方便地获取结果,可以使用 CompletableFuture 转异步为同步,参考代码如下:

CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(xxx,
    result -> {
        // 完成了响应
        responseFuture.complete(rpcResponseProtocolMessage.getBody());
    });
);
// 阻塞,直到响应完成,才会继续向下执行
RpcResponse rpcResponse = responseFuture.get();

4.测试

修改服务提供者 ProviderExample 代码,改为启动 TCP 服务器。完整代码如下:

package com.siyi.example.provider;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.register.LocalRegistry;
import com.siyi.earpc.register.Registry;
import com.siyi.earpc.register.RegistryFactory;
import com.siyi.earpc.server.HttpServer;
import com.siyi.earpc.server.VertxHttpServer;
import com.siyi.earpc.server.tcp.VertxTcpClient;
import com.siyi.earpc.server.tcp.VertxTcpServer;
import com.siyi.example.common.service.UserService;

/**
 * @author Eric
 */
public class RpcProviderExample {

    public static void main(String[] args) {
        RpcApplication.init();
        // 注册服务
        String serviceName = UserService.class.getName();
        LocalRegistry.register(serviceName, UserServiceImpl.class);
        // 将服务注册到注册中心
        RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName(serviceName);
        serviceMetaInfo.setServiceAddress(rpcConfig.getServerHost()+":"+rpcConfig.getServerPort());
        String serverHost = rpcConfig.getServerHost();
        Integer serverPort = rpcConfig.getServerPort();
        System.out.println(serviceMetaInfo.getServiceAddress());
        try {
            registry.register(serviceMetaInfo);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        //启动TCP服务
        VertxTcpServer vertxTcpServer = new VertxTcpServer();
        vertxTcpServer.start(RpcApplication.getRpcConfig().getServerPort());

    }
}

5.粘包半包问题解决

什么是粘包和半包?

理想情况下,假如我们客户端 连续 2 次 要发送的消息是:

// 第一次
Hello, server!Hello, server!Hello, server!Hello, server!
// 第二次
Hello, server!Hello, server!Hello, server!Hello, server!

但服务端收到的消息情况可能是:

  1. 每次收到的数据更少了,这种情况叫做 半包
// 第一次
Hello, server!Hello, server!
// 第二次
Hello, server!Hello, server!Hello, server!
  1. 每次收到的数据更多了,这种情况叫做 粘包
// 第三次
Hello, server!Hello, server!Hello, server!Hello, server!Hello, server!

5.1 半包粘包问题演示

  1. 修改 TCP 客户端代码,连续发送 1000 次消息
package com.siyi.earpc.server.tcp;

import io.vertx.core.Vertx;

/**
 * @author Eric
 * TCP客户端
 */
public class VertxTcpClient {
    public void start() {
        Vertx vertx = Vertx.vertx();
        vertx.createNetClient().connect(8888, "localhost", res -> {
            if (res.succeeded()) {
                System.out.println("Connected to TCP server");
                io.vertx.core.net.NetSocket socket = res.result();
                for (int i = 0; i < 1000; i++) {
                    // 发送数据
                    socket.write("Hello, server!Hello, server!Hello, server!Hello, server!");
                }
                // 接收响应
                socket.handler(buffer -> {
                    System.out.println("Received response from server: " + buffer.toString());
                });
            } else {
                System.out.println("Failed to connect: " + res.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpClient().start();
    }
}
  1. 修改 TCP 服务端代码,打印出每次收到的消息
package com.siyi.earpc.server.tcp;

import com.siyi.earpc.server.HttpServer;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;

/**
 * @author Eric
 * TCP服务器
 */
public class VertxTcpServer implements HttpServer {
    private byte[] handleRequest(byte[] requestData) {
        // 在这里编写处理请求的逻辑,根据 requestData 构造响应数据并返回
        // 这里只是一个示例,实际逻辑需要根据具体的业务需求来实现
        return "Hello, client!".getBytes();
    }

    /**
     * 启动服务器
     *
     * @param port
     */
    @Override
    public void start(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(socket -> {
            // 处理连接
            socket.handler(buffer -> {
                String testMessage = "Hello, server!Hello, server!Hello, server!Hello, server!";
                int messageLength = testMessage.getBytes().length;
                if (buffer.getBytes().length < messageLength) {
                    System.out.println("半包, length = " + buffer.getBytes().length);
                    return;
                }
                if (buffer.getBytes().length > messageLength) {
                    System.out.println("粘包, length = " + buffer.getBytes().length);
                    return;
                }
                String str = new String(buffer.getBytes(0, messageLength));
                System.out.println(str);
                if (testMessage.equals(str)) {
                    System.out.println("good");
                }


//                // 处理请求数据
//                byte[] requestData = buffer.getBytes();
//                // 处理请求并返回响应数据,这里进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应等
//                byte[] responseData = handleRequest(requestData);
//                socket.write(Buffer.buffer(responseData));
            });
        });


        // 启动 TCP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                System.out.println("Server is now listening on port " + port);
            } else {
                System.err.println("Failed to start server: " + result.cause());
            }
        });
    }
    public static void main(String[] args) {
        new VertxTcpServer().start(8888);
    }
}




  1. 检查控制台结果

image-20240429235919362

5.2 问题解决

如何解决粘包?

  1. 解决粘包的核心思路也是类似的:每次只读取指定长度的数据,超过长度的留着下一次接收到消息时再读取。
// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());

Vert.x 解决半包和粘包

在 Vert.x 框架中,可以使用内置的 RecordParser 完美解决半包粘包,它的作用是:保证下次读取到 特定长度 的字符。

package com.siyi.earpc.server.tcp;

import com.siyi.earpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;

/**
 * @author Eric
 * TCP服务器
 */
public class VertxTcpServer implements HttpServer {
    private byte[] handleRequest(byte[] requestData) {
        // 在这里编写处理请求的逻辑,根据 requestData 构造响应数据并返回
        // 这里只是一个示例,实际逻辑需要根据具体的业务需求来实现
        return "Hello, client!".getBytes();
    }

    /**
     * 启动服务器
     *
     * @param port
     */
    @Override
    public void start(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(socket -> {
            // 处理连接
//            socket.handler(buffer -> {
                String testMessage = "Hello, server!Hello, server!Hello, server!Hello, server!";
                int messageLength = testMessage.getBytes().length;

                // 构造parser
                RecordParser parser = RecordParser.newFixed(messageLength);
                parser.setOutput(new Handler<Buffer>() {
                    @Override
                    public void handle(Buffer buffer) {
                        String str = new String(buffer.getBytes());
                        System.out.println(str);
                        if (testMessage.equals(str)) {
                            System.out.println("good");
                        }
                    }
                });
                socket.handler(parser);


//                // 处理请求数据
//                byte[] requestData = buffer.getBytes();
//                // 处理请求并返回响应数据,这里进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应等
//                byte[] responseData = handleRequest(requestData);
//                socket.write(Buffer.buffer(responseData));
            });
//        });


        // 启动 TCP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                System.out.println("Server is now listening on port " + port);
            } else {
                System.err.println("Failed to start server: " + result.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpServer().start(8888);
    }
}




上述代码的核心是 RecordParser.newFixed(messageLength),为 Parser 指定每次读取固定值长度的内容。

输出结果

image-20240430103559334

  1. 实际运用中,消息体的长度是不固定的,所以要通过调整 RecordParser 的固定长度(变长)来解决。

那我们的思路可以是,将读取完整的消息拆分为 2 次:

  • 先完整读取请求头信息,由于请求头信息长度是固定的,可以使用 RecordParser 保证每次都完整读取。
  • 再根据请求头长度信息更改 RecordParser 的固定长度,保证完整获取到请求体。

修改TCP服务端代码

package com.siyi.earpc.server.tcp;

import com.siyi.earpc.server.HttpServer;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.parsetools.RecordParser;

/**
 * @author Eric
 * TCP服务器
 */
public class VertxTcpServer implements HttpServer {
    private byte[] handleRequest(byte[] requestData) {
        // 在这里编写处理请求的逻辑,根据 requestData 构造响应数据并返回
        // 这里只是一个示例,实际逻辑需要根据具体的业务需求来实现
        return "Hello, client!".getBytes();
    }

    /**
     * 启动服务器
     *
     * @param port
     */
    @Override
    public void start(int port) {
        // 创建 Vert.x 实例
        Vertx vertx = Vertx.vertx();

        // 创建 TCP 服务器
        NetServer server = vertx.createNetServer();

        // 处理请求
        server.connectHandler(socket -> {
            // 处理连接
            // 构造parser
            RecordParser parser = RecordParser.newFixed(8);
            // 设置输出处理器
            parser.setOutput(new Handler<Buffer>() {
                int size = -1;
                //完整读取
                // 一次完整的读取(头 + 体)
                Buffer resultBuffer = Buffer.buffer();

                @Override
                public void handle(Buffer buffer) {
                    if (-1 == size) {
                        // 读取消息体长度
                        size = buffer.getInt(4);
                        parser.fixedSizeMode(size);
                        // 写入头信息到结果
                        resultBuffer.appendBuffer(buffer);
                    } else {
                        // 写入体信息到结果
                        resultBuffer.appendBuffer(buffer);
                        System.out.println(resultBuffer.toString());
                        // 重置一轮
                        parser.fixedSizeMode(8);
                        size = -1;
                        resultBuffer = Buffer.buffer();
                    }

                }

            });
            socket.handler(parser);


//                // 处理请求数据
//                byte[] requestData = buffer.getBytes();
//                // 处理请求并返回响应数据,这里进行自定义的字节数组处理逻辑,比如解析请求、调用服务、构造响应等
//                byte[] responseData = handleRequest(requestData);
//                socket.write(Buffer.buffer(responseData));
        });
//        });


        // 启动 TCP 服务器并监听指定端口
        server.listen(port, result -> {
            if (result.succeeded()) {
                System.out.println("Server is now listening on port " + port);
            } else {
                System.err.println("Failed to start server: " + result.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpServer().start(8888);
    }
}




修改客户端代码:

package com.siyi.earpc.server.tcp;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;

/**
 * @author Eric
 * TCP客户端
 */
public class VertxTcpClient {
    public void start() {
        Vertx vertx = Vertx.vertx();
        vertx.createNetClient().connect(8888, "localhost", res -> {
            if (res.succeeded()) {
                System.out.println("Connected to TCP server");
                io.vertx.core.net.NetSocket socket = res.result();
                for (int i = 0; i < 1000; i++) {
                    // 发送数据
                    Buffer buffer = Buffer.buffer();
                    String str = "Hello, server!Hello, server!Hello, server!Hello, server!";
                    buffer.appendInt(0);
                    buffer.appendInt(str.getBytes().length);
                    buffer.appendBytes(str.getBytes());
                    socket.write(buffer);
                }
                // 接收响应
                socket.handler(buffer -> {
                    System.out.println("Received response from server: " + buffer.toString());
                });
            } else {
                System.out.println("Failed to connect: " + res.cause());
            }
        });
    }

    public static void main(String[] args) {
        new VertxTcpClient().start();
    }
}

5.3 封装半包粘包处理器

我们会发现,解决半包粘包问题还是有一定的代码量的,而且由于 ServiceProxy(消费者)和请求 Handler(提供者)都需要接受 Buffer,所以都需要半包粘包问题处理。

那我们就应该要想到:需要对代码进行封装复用了。

这里我们可以使用设计模式中的 装饰者模式,使用 RecordParser 对原有的 Buffer 处理器的能力进行增强。

装饰者模式可以简单理解为给对象穿装备,增强对象的能力。

server.tcp 包下新建 TcpBufferHandlerWrapper 类,实现并增强 Handler<Buffer> 接口。

package com.siyi.earpc.server.tcp;

import com.siyi.earpc.protocol.ProtocolConstant;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.parsetools.RecordParser;

/**
 * 装饰者模式(使用 recordParser 对原有的 buffer 处理能力进行增强)
 */
public class TcpBufferHandlerWrapper implements Handler<Buffer> {
    private final RecordParser recordParser;

    /**
     * @param bufferHandler buffer处理器
     *                      通过 bufferHandler 处理完整的 buffer
     */
    public TcpBufferHandlerWrapper(Handler<Buffer> bufferHandler) {
        this.recordParser = initRecordParser(bufferHandler);
    }

    @Override
    public void handle(Buffer event) {
        recordParser.handle(event);
    }

    private RecordParser initRecordParser(Handler<Buffer> bufferHandler) {
        //构造parser
        RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);
        //设置输出处理器

        parser.setOutput(new Handler<Buffer>() {
            int size = -1;
            //完整读取
            //一次完整的读取(头 + 体)
            Buffer resultBuffer = Buffer.buffer();

            @Override
            public void handle(Buffer buffer) {
                if (-1 == size) {
                    // 读取消息体长度
                    size = resultBuffer.getInt(13);
                    parser.fixedSizeMode(size);
                    // 写入头信息到结果
                    resultBuffer.appendBuffer(buffer);
                } else {
                    // 写入体信息到结果
                    resultBuffer.appendBuffer(buffer);
                    // 已拼接为完整 Buffer,执行处理
                    bufferHandler.handle(resultBuffer);
                    // 重置一轮
                    parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);
                    size = -1;
                    resultBuffer = Buffer.buffer();
                }
            }
        });


        return parser;
    }
}

其实就是把 RecordParser 的代码粘了过来,当调用处理器的 handle 方法时,改为调用 recordParser.handle

5.4 优化客户端调用

  1. 修改 TCP 请求处理器。
package com.siyi.earpc.server.tcp;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.model.RpcRequest;
import com.siyi.earpc.model.RpcResponse;
import com.siyi.earpc.protocol.ProtocolMessage;
import com.siyi.earpc.protocol.ProtocolMessageDecoder;
import com.siyi.earpc.protocol.ProtocolMessageEncoder;
import com.siyi.earpc.protocol.ProtocolMessageStatusEnum;
import com.siyi.earpc.register.LocalRegistry;
import com.siyi.earpc.serializer.Serializer;
import com.siyi.earpc.serializer.SerializerFactory;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;

import java.io.IOException;
import java.lang.reflect.Method;

/**
 * @author Eric
 * TCP服务端处理器
 */
public class TcpServerHandler implements Handler<NetSocket> {
    /**
     * 处理请求
     *
     * @param netSocket socket
     */
    @Override
    public void handle(NetSocket netSocket) {
        // 1. 转换为对象,从对象中获得参数
        //指定序列化器
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
        //记录日志
        TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
            // 处理请求代码

            netSocket.handler(b -> {
                ProtocolMessage<RpcRequest> protocolMessage;
                try {
                    protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(b);
                } catch (IOException e) {
                    throw new RuntimeException("协议消息解码错误");
                }
                RpcRequest rpcRequest = protocolMessage.getBody();
                //处理请求
                //构建响应对象
                RpcResponse rpcResponse = new RpcResponse();

                //从注册器获取实现类,反射调用
                Class<?> implClass = LocalRegistry.get(rpcRequest.getName());
                try {
                    Method methods = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
                    Object result = methods.invoke(implClass.newInstance(), rpcRequest.getArgs());
                    //返回结果
                    rpcResponse.setData(result);
                    rpcResponse.setDataType(methods.getReturnType());
                    rpcResponse.setMessage("ok");
                } catch (Exception e) {
                    e.printStackTrace();
                    rpcResponse.setException(e);
                    rpcResponse.setMessage(e.getMessage());

                }
                //返回响应
                ProtocolMessage.Header header = new ProtocolMessage.Header();
                header.setType((byte) ProtocolMessageStatusEnum.OK.getValue());
                ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
                try {
                    Buffer encode = ProtocolMessageEncoder.encode(rpcResponseProtocolMessage);
                    netSocket.write(encode);
                } catch (IOException e) {
                    throw new RuntimeException("协议消息编码错误");
                }

            });

        });
        netSocket.handler(bufferHandlerWrapper);
    }
}

  1. 修改客户端处理响应的代码。

把所有的请求响应逻辑提取出来,封装为单独的 VertxTcpClient 类,放在 server.tcp 包下。

package com.siyi.earpc.server.tcp;

import cn.hutool.core.util.IdUtil;
import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.model.RpcRequest;
import com.siyi.earpc.model.RpcResponse;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.protocol.*;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @author Eric
 * TCP客户端,加入请求处理逻辑
 */
public class VertxTcpClient {
    public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws ExecutionException, InterruptedException {
        Vertx netClient = Vertx.vertx();
        CompletableFuture<RpcResponse> responseCompletableFuture = new CompletableFuture<>();
        netClient.createNetClient().connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(), res -> {
            if (!res.succeeded()) {
                System.err.println("Failed to connect to TCP server");
                return;
            }
            NetSocket socket = res.result();
            // 发送请求
            ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
            ProtocolMessage.Header header = new ProtocolMessage.Header();
            header.setMagic(ProtocolConstant.MAGIC);
            header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
            header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
            header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
            header.setRequestId(IdUtil.getSnowflakeNextId());
            protocolMessage.setHeader(header);
            protocolMessage.setBody(rpcRequest);
            //编码请求
            try {
                Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
                socket.write(encodeBuffer);
            } catch (IOException e) {
                throw new RuntimeException("协议消息编码错误");
            }
            //接受响应
            TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
                try {
                    ProtocolMessage<RpcResponse> rpcResponseProtocolMessage = ProtocolMessageDecoder.decode(buffer);
                    responseCompletableFuture.complete(rpcResponseProtocolMessage.getBody());
                } catch (IOException e) {
                    throw new RuntimeException("协议消息解码错误");
                }

            });
            socket.handler(bufferHandlerWrapper);

        });
        RpcResponse rpcResponse = responseCompletableFuture.get();
        netClient.close();
        return rpcResponse;
    }

}

  1. 修改代理类代码
package com.siyi.earpc.proxy;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.config.RegistryConfig;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.constant.RpcConstant;
import com.siyi.earpc.model.RpcRequest;
import com.siyi.earpc.model.RpcResponse;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.protocol.*;
import com.siyi.earpc.register.Registry;
import com.siyi.earpc.register.RegistryFactory;
import com.siyi.earpc.serializer.Serializer;
import com.siyi.earpc.serializer.SerializerFactory;
import com.siyi.earpc.server.tcp.VertxTcpClient;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class ServiceProxy implements InvocationHandler {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //指定序列化器
        final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());
        //发送请求
        RpcRequest rpcRequest = RpcRequest.builder().name(method.getDeclaringClass().getName()).methodName(method.getName()).args(args).parameterTypes(method.getParameterTypes()).build();

        try {
            byte[] bytes = serializer.serialize(rpcRequest);
            byte[] result;
            // 依靠注册中心解决硬编码问题
            String serviceName = method.getDeclaringClass().getName();
            //获取注册中心
            RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            //获取注册中心的实例
            Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            //将服务名和版本号赋值给serviceMetaInfo
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
            //得到服务名后,这里获取服务元信息
            List<ServiceMetaInfo> discoveryList = registry.discovery(serviceMetaInfo.getServiceKey());
            if (CollUtil.isEmpty(discoveryList)) {
                throw new RuntimeException("暂无服务地址");
            }
            //之后获取服务地址
            //TODO 暂时只取第一个,后续可扩展负载均衡策略
            ServiceMetaInfo selectedServiceMetaInfo = discoveryList.get(0);
            
            // 发送 TCP 请求
            RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo);
            return rpcResponse.getData();
        } catch (Exception e) {
            throw new RuntimeException("调用失败");
        }
    }
}

负载均衡

1.需求分析

同一个服务可能会有多个服务提供者,但是目前我们消费者始终读取了第一个服务提供者节点发起调用,不仅会增大单个节点的压力,而且没有利用好其他节点的资源。

因此引入负载均衡,可以从服务提供者节点中,选择一个服务提供者发起请求,而不是每次都请求同一个服务提供者

2.负载均衡

什么是负载均衡?

负载均衡是一种用来分配网络或计算负载到多个资源上的技术。它的目的是确保每个资源都能够有效地处理负载、增加系统的并发量、避免某些资源过载而导致性能下降或服务不可用的情况

RPC框架中,负载均衡的作用是从一组可用的服务提供者中选择一个进行调用。

的负载均衡实现技术有 Nginx(七层负载均衡)、LVS(四层负载均衡)等

2.1 常见负载均衡算法

  1. 轮询(Round Robin):按照循环的顺序将请求分配给每个服务器,适用于各服务器性能相近的情况。

假如有 5 台服务器节点,请求调用顺序如下:

1,2,3,4,5,1,2,3,4,5

  1. 随机(Random):随机选择一个服务器来处理请求,适用于服务器性能相近且负载均匀的情况。

假如有 5 台服务器节点,请求调用顺序如下:

3,2,4,1,2,5,2,1,3,4

  1. 加权轮询(Weighted Round Robin):根据服务器的性能或权重分配请求,性能更好的服务器会获得更多的请求,适用于服务器性能不均的情况。

假如有 1 台千兆带宽的服务器节点和 4 台百兆带宽的服务器节点,请求调用顺序可能如下:

1,1,1,2, 1,1,1,3, 1,1,1,4, 1,1,1,5

  1. 最小连接数(Least Connections):选择当前连接数最少的服务器来处理请求,适用于长连接场景。

  2. IP Hash:根据客户端 IP 地址的哈希值选择服务器处理请求,确保同一客户端的请求始终被分配到同一台服务器上,适用于需要保持会话一致性的场景。

  3. 也可以根据请求中的其他参数进行 Hash,比如根据请求接口的地址路由到不同的服务器节点

2.2 一致性hash

首先回忆一下hash算法,简单计算是hash%N,N是数量,但是当N改变的时候,其他原有的hash都会失效,所以这是很大的缺陷,因此有了一致性hash算法

一致性哈希(Consistent Hashing)是一种经典的哈希算法,用于将请求分配到多个节点或服务器上,所以非常适用于负载均衡。

核心思想是将整个哈希值空间划分成一个环状结构,每个节点或服务器在环上占据一个位置,每个请求根据其哈希值映射到环上的一个点,然后顺时针寻找第一个大于或等于该哈希值的节点,将请求路由到该节点上。

image-20240430202040159

上图中,请求 A 会交给服务器 C 来处理。

一致性哈希还解决了 节点下线倾斜问题

  1. 节点下线:当某个节点下线时,其负载会被平均分摊到其他节点上,而不会影响到整个系统的稳定性,因为只有部分请求会受到影响。

如下图,服务器 C 下线后,请求 A 会交给服务器 A 来处理(顺时针寻找第一个大于或等于该哈希值的节点),而服务器 B 接收到的请求保持不变

如果是轮询取模算法,只要节点数变了,很有可能大多数服务器处理的请求都要发生变化,对系统的影响巨大。

image-20240430202259381

  1. 倾斜问题:通过虚拟节点的引入,将每个物理节点映射到多个虚拟节点上,使得节点在哈希环上的 分布更加均匀,减少了节点间的负载差异。

这种情况就称为 hash 环的倾斜。如下图所示:

image-20240430211849216

举个例子,节点很少的情况下,环的情况可能如下图:

image-20240430202509276

这样就会导致绝大多数的请求都会发给服务器 C,而服务器 A 的 “领地” 非常少,几乎不会有请求。

为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点,一个实际物理节点可以对应多个虚拟节点,虚拟节点越多,hash环上的节点就越多,缓存被均匀分布的概率就越大,hash环倾斜所带来的影响就越小,同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射

引入虚拟节点后,环的情况变为:

image-20240430202548156

这样一来,每个服务器接受到的请求会更容易平均。

3.开发实现、

3.1 多种负载均衡器的实现

这里实现轮询、随机、一致性 Hash 三种负载均衡算法。

在 RPC 项目中新建 loadbalancer 包,将所有负载均衡器相关的代码放到该包下。

首先w吗先看实现负载均衡的地方:

  //TODO 暂时只取第一个,后续可扩展负载均衡策略
            ServiceMetaInfo selectedServiceMetaInfo = discoveryList.get(0);

因为暂时没有实现,所以这里暂时取了第一个,从这里分析得到,负载均衡的返回值和参数

3.1.1 负载均衡接口

因此先编写负载均衡器通用接口。提供一个选择服务方法,接受请求参数和可用服务列表,可以根据这些信息进行选择。

package com.siyi.earpc.loadbalancer;

import com.siyi.earpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;

/**
 * @author Eric
 *  负载均衡器
 */
public interface LoadBalancer  {
    /**
     * 选择一个服务提供者
     * @param requestParams 请求参数
     * @param serviceMetaInfosList 服务提供者列表
     * @return 服务提供者
     */
    ServiceMetaInfo select(Map<String,Object>requestParams, List<ServiceMetaInfo> serviceMetaInfosList );
}

3.1.2 轮询负载均衡器

需要注意的只有下标容易线程安全,所以使用JUC的原子类

package com.siyi.earpc.loadbalancer;

import com.siyi.earpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Eric
 *  轮询负载均衡器
 */
public class RoundRobinLoadBalancer implements LoadBalancer {
    /**
     * 当前轮询的位置
     */
    private AtomicInteger currentIndex = new AtomicInteger(0);

    /**
     * 选择一个服务提供者
     *
     * @param requestParams        请求参数
     * @param serviceMetaInfosList 服务提供者列表
     * @return 服务提供者
     */
    @Override
    public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfosList) {
        if (serviceMetaInfosList.isEmpty()) {
            return null;
        }
        // 轮询
        int index = currentIndex.getAndIncrement() % serviceMetaInfosList.size();
        return serviceMetaInfosList.get(index);
    }
}

3.1.3 随机负载均衡器
package com.siyi.earpc.loadbalancer;

import com.siyi.earpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.Random;

/**
 * @author Eric
 * 随机负载均衡器
 */
public class RandomLoadBalancer implements LoadBalancer{
    // 随机负载均衡器索引
    private Random random = new Random();

    /**
     * 选择一个服务提供者
     *
     * @param requestParams        请求参数
     * @param serviceMetaInfosList 服务提供者列表
     * @return 服务提供者
     */
    @Override
    public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfosList) {
        int size = serviceMetaInfosList.size();
        if (size == 0) {
            return null;
        }
        if (size == 1) {
            return serviceMetaInfosList.get(0);
        }
        // 随机
        return serviceMetaInfosList.get(random.nextInt(size));
    }
}

3.1.4 一致性 Hash 负载均衡器

可以使用 TreeMap 实现一致性 Hash 环,该数据结构提供了 ceilingEntry 和 firstEntry 两个方法,便于获取符合算法要求的节点。

  • ceilingEntry(K key) 用于查找大于或等于给定键的最小键。
  • firstEntry() 用于获取 TreeMap 中的最小键。
package com.siyi.earpc.loadbalancer;

import com.siyi.earpc.model.ServiceMetaInfo;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
 * @author siyi
 * 一致性hash负载均衡器
 */
public class ConsistentHashLoadBalancer implements LoadBalancer {
    private static final int VIRTUAL_NODE_NUM = 100;
    private final TreeMap<Integer, ServiceMetaInfo> virtualInvokers = new TreeMap<>();

    /**
     * 选择一个服务提供者
     *
     * @param requestParams        请求参数
     * @param serviceMetaInfosList 服务提供者列表
     * @return 服务提供者
     */
    @Override
    public ServiceMetaInfo select(Map<String, Object> requestParams, List<ServiceMetaInfo> serviceMetaInfosList) {
        if (serviceMetaInfosList.isEmpty()) {
            return null;
        }
        for (ServiceMetaInfo serviceMetaInfo : serviceMetaInfosList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                int hash = getHash(serviceMetaInfo.getServiceAddress() + "$" + i);
                virtualInvokers.put(hash, serviceMetaInfo);
            }
        }
        //获取请求参数的hash值
        int hash = getHash(requestParams);
        //获取大于该hash值的第一个节点
        Map.Entry<Integer, ServiceMetaInfo> entry = virtualInvokers.ceilingEntry(hash);
        if (entry == null) {
            entry = virtualInvokers.firstEntry();
        }
        return entry.getValue();

    }

/**
 * hash算法
*/
    private int getHash(Object key) {
        return key.hashCode();
    }
}

注意:

  • 根据 requestParams 对象计算 Hash 值,只是简单地调用了对象的 hashCode 方法,也可以根据需求实现自己的 Hash 算法。
  • 每次调用负载均衡器时,都会重新构造 Hash 环,这是为了能够即时处理节点的变化。

3.2 支持配置和拓展负载均衡器

3.2.1负载均均衡常量

在 loadbalancer 包下新建 LoadBalancerKeys 类,列举所有支持的负载均衡器键名。

package com.siyi.earpc.loadbalancer;

public interface LoadBalancerKeys {

    /**
     * 轮询
     */
    String ROUND_ROBIN = "roundRobin";
    /**
     * 随机
     */
    String RANDOM = "random";
    /**
     * 一致性哈希
     */
    String CONSISTENT_HASH = "consistentHash";

}

3.2.2 工厂模式支持根据 key 从 SPI 获取负载均衡器对象实例。

在 loadbalancer 包下新建 LoadBalancerFactory 类,代码和之前的工厂很像,代码如下:

package com.siyi.earpc.loadbalancer;

import com.siyi.earpc.serializer.Serializer;
import com.siyi.earpc.spi.SpiLoader;

public class LoadBalancerFactory {
    static {
        SpiLoader.load(LoadBalancer.class);
    }
    /**
     * 默认负载均衡器(轮询)
     */
    private static final LoadBalancer DEFAULT_LOAD_BALANCER = new RoundRobinLoadBalancer();
    /**
     * 获取实例
     *
     * @param key
     * @return
     */
    public static LoadBalancer getInstance(String key) {
        return SpiLoader.getInstance(LoadBalancer.class, key);
    }
}
3.2.3 SPI配置文件
roundRobin=com.siyi.earpc.loadbalancer.RoundRobinLoadBalancer
random=com.siyi.earpc.loadbalancer.RandomLoadBalancer
consistentHash=com.siyi.earpc.loadbalancer.ConsistentHashLoadBalancer
3.2.4 全局配置

为 RpcConfig 全局配置新增负载均衡器的配置,代码如下:

@Data
public class RpcConfig {
    /**
     * 负载均衡器
     */
    private String loadBalancer = LoadBalancerKeys.ROUND_ROBIN;
}
3.3 应用

修改 ServiceProxy 的代码,将 “固定调用第一个服务节点” 改为 “调用负载均衡器获取一个服务节点”。

//...
if (CollUtil.isEmpty(discoveryList)) {
                throw new RuntimeException("暂无服务地址");
            }
            //通过负载均衡获取服务地址
            LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer());
            // 将调用方法名(请求路径)作为负载均衡参数
            Map<String, Object> requestParams = new HashMap<>();
            requestParams.put("methodName", rpcRequest.getMethodName());
            ServiceMetaInfo selectedServiceMetaInfo = loadBalancer.select(requestParams, discoveryList);
            // 发送 TCP 请求
//...

这里注意:这里给负载均衡器传入了一个 requestParams HashMap,并且将请求方法名作为参数放到了 Map 中。如果使用的是一致性 Hash 算法,那么会根据 requestParams 计算 Hash 值,调用相同方法的请求 Hash 值肯定相同,所以总会请求到同一个服务器节点上(因为Java集合的HashCode计算和集合中内容有关)

4.测试

首先编写单元测试类 LoadBalancerTest,代码如下:

package com.siyi.example.consumer;

import com.siyi.earpc.proxy.ServiceProxyFactory;
import com.siyi.example.common.model.User;
import com.siyi.example.common.service.UserService;

public class EasyConsumerExample {
    public static void main(String[] args) {
        //静态代理
//        UserService userService = new UserServiceProxy();
        // 获取USerService的实现类对象
        UserService userService = ServiceProxyFactory.getProxy(UserService.class);
        User user = new User();
        user.setName("张三");
        //调用
        User newUser = userService.getUser(user);
        User newUser1= userService.getUser(user);
        User newUser2 = userService.getUser(user);
        User newUser3= userService.getUser(user);
        User newUser4 = userService.getUser(user);
        if (newUser != null) {
            System.out.println(newUser.getName());
        } else {
            System.out.println("user == null");
        }
//        long number = userService.getNumber();
//        System.out.println(number);
    }

}

这里使用一致性hash会三次都一样,因为requestParam都一样,使用轮询则会改变

测试负载均衡调用:

首先在不同的端口启动 2 个服务提供者,然后启动服务消费者项目,通过 Debug 或者控制台输出来观察每次请求的节点地址。

重试机制

1.需求分析

目前,如果使用 RPC 框架的服务消费者调用接口失败,就会直接报错。

有时可能是服务提供者返回了错误,但有时可能只是网络不稳定或服务提供者重启等临时性问题。这种情况下,我们可能更希望服务消费者拥有自动重试的能力,提高系统的可用性。

2.设计方案

关于重试机制:重试机制的核心是 重试策略,一般来说,包含以下几个考虑点:

  • 什么时候、什么条件下重试?
  • 重试时间(确定下一次的重试时间)
  • 什么时候、什么条件下停止重试?
  • 重试后要做什么?

2.1 重试条件

什么时候、什么条件下重试?

如果我们希望提高系统的可用性,当由于网络等异常情况发生时,触发重试

2.2 重试时间

重试等待策略就比较丰富了,可能会用到一些算法,主流的重试时间算法有:

  • 固定重试间隔(Fixed Retry Interval):在每次重试之间使用固定的时间间隔。

比如近 5 次重试的时间点如下:

1s 2s 3s 4s 5s

  • 指数退避重试(Exponential Backoff Retry):在每次失败后,重试的时间间隔会以指数级增加,以避免请求过于密集。

比如近 5 次重试的时间点如下:

1s 3s(多等 2s) 7s(多等 4s) 15s(多等 8s) 31s(多等 16s)

  • 随机延迟重试(Random Delay Retry):在每次重试之间使用随机的时间间隔,以避免请求的同时发生。

  • 可变延迟重试(Variable Delay Retry):这种策略更 “高级” 了,根据先前重试的成功或失败情况,动态调整下一次重试的延迟时间。比如,根据前一次的响应时间调整下一次重试的等待时间。

以上的策略是可以组合使用的,一定要根据具体情况和需求灵活调整。比如可以先使用指数退避重试策略,如果连续多次重试失败,则切换到固定重试间隔策略。

2.3 停止重试

一般来说,重试次数是有上限的,否则随着报错的增多,系统同时发生的重试也会越来越多,造成雪崩。

主流的停止重试策略有:

  • 最大尝试次数:一般重试当达到最大次数时不再重试。(次数)
  • 超时停止:重试达到最大时间的时候,停止重试。(时间)

2.4 重试工作

最后一点是重试后要做什么事情?一般来说就是重复执行原本要做的操作,比如发送请求失败了,那就再发一次请求。

当重试次数超过上限时,往往还要进行其他的操作,比如:

  • 通知告警:让开发者人工介入
  • 降级容错:改为调用其他接口、或者执行其他操作

2.5 重试方案设计

回归到我们的 RPC 框架,消费者发起调用的代码如下:

try {
    // rpc 请求
    RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo);
    return rpcResponse.getData();
} catch (Exception e) {
    throw new RuntimeException("调用失败");
}

我们完全可以将 VertxTcpClient.doRequest 封装为一个可重试的任务,如果请求失败(重试条件),系统就会自动按照重试策略再次发起请求,不用开发者关心。

Java 中这里可以使用 Guava-Retrying 库轻松实现多种不同的重试算法

3.开发实现

这里实现两种: 不重试、固定重试间隔。

在 RPC 项目中新建 fault.retry 包,将所有重试相关的代码放到该包下。

  1. 先编写重试策略通用接口。提供一个重试方法,接受一个具体的任务参数,可以使用 Callable 类代表一个任务。
package com.siyi.earpc.fault.retry;

import com.siyi.earpc.model.RpcResponse;

import java.util.concurrent.Callable;

/**
 * @author Eric
 */
public interface RetryStrategy {
    /**
     * 重试
     *
     * @param callable 重试的方法
     * @return
     * @throws Exception
     */
    RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception;
}

  1. 引入 Guava-Retrying 重试库,代码如下:
<!-- https://github.com/rholder/guava-retrying -->
<dependency>
    <groupId>com.github.rholder</groupId>
    <artifactId>guava-retrying</artifactId>
    <version>2.0.0</version>
</dependency>
  1. 不重试策略实现。

就是直接执行一次任务,代码如下:

package com.siyi.earpc.fault.retry;

import com.siyi.earpc.model.RpcResponse;

import java.util.concurrent.Callable;

/**
 * @author Eric
 * 无重试策略
 */
public class NoRetryStrategy implements RetryStrategy{
    /**
     * 重试
     *
     * @param callable 重试的方法
     * @return
     * @throws Exception 重试的方法
     */
    @Override
    public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
        return callable.call();
    }
}
  1. 固定重试间隔策略实现。

使用 Guava-Retrying 提供的 RetryerBuilder 能够很方便地指定重试条件、重试等待策略、重试停止策略、重试工作等。

package com.siyi.earpc.fault.retry;

import com.github.rholder.retry.*;
import com.siyi.earpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
 * @author Eric
 * 固定间隔重试策略
 */
@Slf4j
public class FixedIntervalRetryStrategy implements RetryStrategy {
    /**
     * 重试
     *
     * @param callable 重试的方法
     * @return
     * @throws Exception
     */
    @Override
    public RpcResponse doRetry(Callable<RpcResponse> callable) throws Exception {
        // 定义重试器
        Retryer<RpcResponse> retryer = RetryerBuilder.<RpcResponse>newBuilder()
                // 发生异常时重试
                .retryIfExceptionOfType(Exception.class)
                // 每次重试等待3秒
                .withWaitStrategy(WaitStrategies.fixedWait(3L, TimeUnit.SECONDS))
                // 允许执行4次(首次执行 + 最多重试3次)
                .withStopStrategy(StopStrategies.stopAfterAttempt(3))
                .withRetryListener(new RetryListener() {
                    @Override
                    public <V> void onRetry(Attempt<V> attempt) {
                        log.info("重试第{}次", attempt.getAttemptNumber());
                    }
                })
                .build();
        return retryer.call(callable);
    }
}

重试策略如下:

  • 重试条件:使用 retryIfExceptionOfType 方法指定当出现 Exception 异常时重试。
  • 重试等待策略:使用 withWaitStrategy 方法指定策略,选择 fixedWait 固定时间间隔策略。
  • 重试停止策略:使用 withStopStrategy 方法指定策略,选择 stopAfterAttempt 超过最大重试次数停止。
  • 重试工作:使用 withRetryListener 监听重试,每次重试时,除了再次执行任务外,还能够打印当前的重试次数。

3.1 单元测试

package com.siyi.earpc.fault.retry;

import com.siyi.earpc.model.RpcResponse;
import org.junit.Test;

import static org.junit.Assert.*;

public class RetryStrategyTest {
    RetryStrategy retryStrategy = new NoRetryStrategy();
    @Test
    public void doRetry() throws Exception {
        try {
            RpcResponse rpcResponse = retryStrategy.doRetry(() -> {
                System.out.println("测试重试");
                throw new RuntimeException("模拟重试失败");
            });
            System.out.println(rpcResponse);
        } catch (Exception e) {
            System.out.println("重试多次失败");
            e.printStackTrace();
        }
    }
}

3.2 支持配置拓展重试

3.2.1 重试策略常量

fault.retry 包下新建 RetryStrategyKeys 类,列举所有支持的重试策略键名。

package com.siyi.earpc.fault.retry;

/**
 * @author Eric
 * 重试策略的key
 */
public interface RetryStrategyKeys {

    /**
     * 不重试
     */
    String NO = "no";

    /**
     * 固定时间间隔
     */
    String FIXED_INTERVAL = "fixedInterval";

}
3.2.2 工厂模式
package com.siyi.earpc.fault.retry;

import com.siyi.earpc.spi.SpiLoader;

/**
 * @author Eric
 * 重试策略工厂
 */
public class RetryStrategyFactory {

    static {
        SpiLoader.load(RetryStrategy.class);
    }

    /**
     * 默认重试器
     */
    private static final RetryStrategy DEFAULT_RETRY_STRATEGY = new NoRetryStrategy();

    /**
     * 获取实例
     *
     * @param key key
     * @return 重试策略
     */
    public static RetryStrategy getInstance(String key) {
        return SpiLoader.getInstance(RetryStrategy.class, key);
    }

}
3.2.3 配置文件

META-INFrpc/system 目录下编写重试策略接口的 SPI 配置文件,文件名:com.siyi.earpc.fault.retry.RetryStrategy

no=com.siyi.earpc.fault.retry.NoRetryStrategy
fixedInterval=com.siyi.earpc.fault.retry.FixedIntervalRetryStrategy
3.2.4 RpcConfig配置

为 RpcConfig 全局配置新增重试策略的配置,代码如下:

@Data
public class RpcConfig {
    /**
     * 重试策略
     */
    private String retryStrategy = RetryStrategyKeys.NO;
}

4.应用

修改 ServiceProxy 的代码,从工厂中获取重试器,并且将请求代码封装为一个 Callable 接口,作为重试器的参数,调用重试器即可。

修改的代码如下:

RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
RpcResponse rpcResponse = retryStrategy.doRetry(() ->
        VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
);

这里使用 Lambda 表达式将 VertxTcpClient.doRequest 封装为了一个匿名函数,简化了代码。

//...
// 发送 TCP 请求,使用重试策略
            RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
            RpcResponse rpcResponse = retryStrategy.doRetry(() ->
                    VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
            );
            return rpcResponse.getData();
//...

容错机制

1.需求分析

现在添加了重试机制,如果重试超过了一定次数仍然失败,我们又该怎么处理呢?

或者说当调用出现失败时,我们一定要重试么?有没有其他的策略呢?

2.设计方案

2.1容错机制

容错是指系统在出现异常情况时,可以通过一定的策略保证系统仍然稳定运行,从而提高系统的可靠性和健壮性。

在分布式系统中,容错机制尤为重要,因为分布式系统中的各个组件都可能存在网络故障、节点故障等各种异常情况。要顾全大局,尽可能消除偶发 / 单点故障对系统带来的整体影响。

例如降级策略:将分布式系统类比为一家公司,如果公司某个优秀员工请假了,需要 “触发容错”,让另一个普通员工顶上

2.2 容错策略

容错策略有很多种,常用的容错策略主要是以下几个:

  • Fail-Over 故障转移:一次调用失败后,切换一个其他节点再次进行调用,也算是一种重试。
  • Fail-Back 失败自动恢复:系统的某个功能出现调用失败或错误时,通过其他的方法,恢复该功能的正常。可以理解为降级,比如重试、调用其他服务等。
  • Fail-Safe 静默处理:系统出现部分非重要功能的异常时,直接忽略掉,不做任何处理,就像错误没有发生过一样。
  • Fail-Fast 快速失败:系统出现调用错误时,立刻报错,交给外层调用方处理

容错的实现方式:

  1. 重试:重试本质上也是一种容错的降级策略,系统错误后再试一次。
  2. 限流:当系统压力过大、已经出现部分错误时,通过限制执行操作(接受请求)的频率或数量,对系统进行保护。
  3. 降级:系统出现错误后,改为执行其他更稳定可用的操作。也可以叫做 “兜底” 或 “有损服务”,这种方式的本质是:即使牺牲一定的服务质量,也要保证系统的部分功能可用,保证基本的功能需求得到满足。
  4. 熔断:系统出现故障或异常时,暂时中断对该服务的请求,而是执行其他操作,以避免连锁故障。
  5. 超时控制:如果请求或操作长时间没处理完成,就进行中断,防止阻塞和资源占用。

根据对系统可靠性的需求,我们通常会结合多种策略或方法实现容错机制。

2.3 容错方案设计

这里有两种方案:

  • 先容错再重试。

当系统发生异常时,首先会触发容错机制,比如记录日志、进行告警等,然后可以选择是否进行重试。

这种方案其实是把重试当做容错机制的一种可选方案。

  • 先重试再容错。在发生错误后,首先尝试重试操作,如果重试多次仍然失败,则触发容错机制,比如记录日志、进行告警等。

我们可以结合这两种方案:

系统错误时,先通过重试操作解决一些临时性的异常,比如网络波动、服务端临时不可用等;如果重试多次后仍然失败,说明可能存在更严重的问题,这时可以触发其他的容错策略,比如调用降级服务、熔断、限流、快速失败等,来减少异常的影响,保障系统的稳定性和可靠性。

举个具体的例子:

  1. 系统调用服务 A 出现网络错误,使用容错策略 - 重试。
  2. 重试 3 次失败后,使用其他容错策略 - 降级。
  3. 系统改为调用不依赖网络的服务 B,完成操作。

3.开发实现

接下来实现 2 种最基本的容错策略:Fail-Fast 快速失败、Fail-Safe 静默处理。

在 RPC 项目中新建 fault.tolerant 包,将所有容错相关的代码放到该包下。

3.1 多种容错策略实现

  1. 先编写容错策略通用接口。提供一个容错方法,使用 Map 类型的参数接受上下文信息(可用于灵活地传递容错处理需要用到的数据),并且接受一个具体的异常类参数。
package com.siyi.earpc.fault.tolerant;

import com.siyi.earpc.model.RpcResponse;

import java.util.Map;

/**
 * @author Eric
 * 容错策略
 */
public interface TolerantStrategy {

    /**
     * 容错
     *
     * @param context 上下文,用于传递数据
     * @param e       异常
     * @return RpcResponse
     */
    RpcResponse doTolerant(Map<String, Object> context, Exception e);
}

3.2 快速失败容错策略

package com.siyi.earpc.fault.tolerant;

import com.siyi.earpc.model.RpcResponse;

import java.util.Map;

/**
 * @author Eric
 * 快速失败容错策略
 */
public class FailFastTolerantStrategy implements TolerantStrategy{
    /**
     * 容错
     *
     * @param context 上下文,用于传递数据
     * @param e       异常
     * @return RpcResponse
     */
    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        throw new RuntimeException("服务报错", e);
    }
}

3.3静默处理容错实现

遇到异常后,记录一条日志,然后正常返回一个响应对象,就好像没有出现过报错。

package com.siyi.earpc.fault.tolerant;

import com.siyi.earpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

/**
 * @author Eric
 * 静默容错策略
 */
@Slf4j
public class FailSafeTolerantStrategy implements TolerantStrategy {
    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        log.info("静默处理异常", e);
        return new RpcResponse();
    }
}

3.4 其他容错策略

故障恢复策略:

package com.siyi.earpc.fault.tolerant;

import com.siyi.earpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

/**
 * @author Eric
 * 降级到其他服务 - 容错策略
 */
@Slf4j
public class FailBackTolerantStrategy implements TolerantStrategy {
    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        // todo 可自行扩展,获取降级的服务并调用
        return null;
    }
}

故障转移策略:

package com.siyi.earpc.fault.tolerant;

import com.siyi.earpc.model.RpcResponse;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

/**
 * @author Eric
 * 故障转移容错策略
 */
@Slf4j
public class FailOverTolerantStrategy implements TolerantStrategy {
    @Override
    public RpcResponse doTolerant(Map<String, Object> context, Exception e) {
        // todo 可自行扩展,获取其他服务节点并调用
        return null;
    }
}

3.5支持配置

3.5.1 容错策略常量

fault.tolerant 包下新建 TolerantStrategyKeys 类,列举所有支持的容错策略键名。

package com.siyi.earpc.fault.tolerant;

/**
 * @author Eric
 * 容错策略的key
 */
public interface TolerantStrategyKeys {

    /**
     * 故障恢复
     */
    String FAIL_BACK = "failBack";

    /**
     * 快速失败
     */
    String FAIL_FAST = "failFast";

    /**
     * 故障转移
     */
    String FAIL_OVER = "failOver";

    /**
     * 静默处理
     */
    String FAIL_SAFE = "failSafe";

}
3.5.2 工厂模式

fault.tolerant 包下新建 TolerantStrategyFactory 类,代码如下:

package com.siyi.earpc.fault.tolerant;

import com.siyi.earpc.spi.SpiLoader;

/**
 * @author Eric
 * 容错策略工厂
 */
public class TolerantStrategyFactory {
    static {
        SpiLoader.load(TolerantStrategy.class);
    }

    /**
     * 默认容错策略
     */
    private static final TolerantStrategy DEFAULT_RETRY_STRATEGY = new FailFastTolerantStrategy();

    /**
     * 获取实例
     *
     * @param key 容错策略的key
     * @return 容错策略
     */
    public static TolerantStrategy getInstance(String key) {
        return SpiLoader.getInstance(TolerantStrategy.class, key);
    }

}
3.5.3 配置文件

META-INFrpc/system 目录下编写容错策略接口的 SPI 配置文件,文件名称为:com.siyi.earpc.fault.tolerant.TolerantStrategy

failBack=com.siyi.earpc.fault.tolerant.FailBackTolerantStrategy
failFast=com.siyi.earpc.fault.tolerant.FailFastTolerantStrategy
failOver=com.siyi.earpc.fault.tolerant.FailOverTolerantStrategy
failSafe=com.siyi.earpc.fault.tolerant.FailSafeTolerantStrategy
3.5.4 添加RpcConfig配置
@Data
public class RpcConfig {

    /**
     * 容错策略
     */
    private String tolerantStrategy = TolerantStrategyKeys.FAIL_FAST;
}

4.应用

只需要修改 ServiceProxy 的部分代码,在重试多次抛出异常时,从工厂中获取容错策略并执行即可。

//... 
// 发送 TCP 请求,使用重试策略
            RpcResponse rpcResponse;
            try {
                RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
                rpcResponse = retryStrategy.doRetry(() ->
                        VertxTcpClient.doRequest(rpcRequest, selectedServiceMetaInfo)
                );
            } catch (Exception e) {
                // 容错处理
                TolerantStrategy tolerantStrategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy());
                rpcResponse = tolerantStrategy.doTolerant(null, e);
            }

            return rpcResponse.getData();
//...

5.测试

首先启动服务提供者,然后使用 Debug 模式启动服务消费者,当服务消费者发起调用时,立刻停止服务提供者,就会看到调用失败后重试的情况。等待多次重试后,就可以看到容错策略的执行。

启动机制和注解驱动

1.需求分析

使用RPC框架的时候我们一般会关注这些地方:

  • 框架的知名度和用户数:尽量选主流的、用户多的,经过了充分的市场验证。
  • 生态和社区活跃度:尽量选社区活跃的、能和其他技术兼容的。
  • 简单易用易上手:最好能开箱即用,不用花很多时间去上手。这点可能是我们在做个人小型项目时最关注的,可以把精力聚焦到业务开发上。

目前的服务提供者:

package com.siyi.example.provider;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.register.LocalRegistry;
import com.siyi.earpc.register.Registry;
import com.siyi.earpc.register.RegistryFactory;
import com.siyi.earpc.server.HttpServer;
import com.siyi.earpc.server.VertxHttpServer;
import com.siyi.earpc.server.tcp.VertxTcpClient;
import com.siyi.earpc.server.tcp.VertxTcpServer;
import com.siyi.example.common.service.UserService;

/**
 * @author Eric
 */
public class RpcProviderExample {

    public static void main(String[] args) {
        RpcApplication.init();
        // 注册服务
        String serviceName = UserService.class.getName();
        LocalRegistry.register(serviceName, UserServiceImpl.class);
        // 将服务注册到注册中心
        RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName(serviceName);
        serviceMetaInfo.setServiceAddress(rpcConfig.getServerHost()+":"+rpcConfig.getServerPort());
        String serverHost = rpcConfig.getServerHost();
        Integer serverPort = rpcConfig.getServerPort();
        System.out.println(serviceMetaInfo.getServiceAddress());
        try {
            registry.register(serviceMetaInfo);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        //启动TCP服务
        VertxTcpServer vertxTcpServer = new VertxTcpServer();
        vertxTcpServer.start(RpcApplication.getRpcConfig().getServerPort());

    }
}

可以发现这个框架现在的使用并不方便

2.设计方案

怎么能让开发者用更少的代码启动框架?

2.1 启动机制分析

其实很简单,把所有启动代码封装成一个 专门的启动类 或方法,然后由服务提供者 / 服务消费者调用即可。

这里注意:服务提供者和服务消费者需要初始化的模块是不同的,比如服务消费者不需要启动 Web 服务器。

所以我们需要针对服务提供者和消费者分别编写一个启动类,如果是二者都需要初始化的模块,可以放到全局应用类 RpcApplication 中,复用代码的同时保证启动类的可维护、可扩展性

Dubbo类似设计:https://cn.dubbo.apache.org/zh-cn/overview/mannual/java-sdk/quick-start/api/

2.2 注解驱动设计

首先我们回忆Dubbo中的框架使用:

注解驱动:开发者只需要在服务提供者实现类打上一个 DubboService 注解,就能快速注册服务;同样的,只要在服务消费者字段打上一个 DubboReference 注解,就能快速使用服务。

如官网图片:

image-20240501124517611

实现注解驱动并不复杂,有 2 种常用的方式:

  1. 主动扫描:让开发者指定要扫描的路径,然后遍历所有的类文件,针对有注解的类文件,执行自定义的操作。
  2. 监听 Bean 加载:在 Spring 项目中,可以通过实现 BeanPostProcessor 接口,在 Bean 初始化后执行自定义的操作。

3.开发实现

3.1 服务提供者启动类

我们在 rpc 项目中新建包名 bootstrap,所有和框架启动初始化相关的代码都放到该包下。

新建 ProviderBootstrap 类,先直接复制之前服务提供者示例项目中的初始化代码,然后略微改造,支持用户传入自己要注册的服务。

因为注册服务时,我们需要填入多个字段,比如服务名称、服务实现类,所以我们可以将这些字段进行封装:

原有的代码:

String serviceName = UserService.class.getName();
LocalRegistry.register(serviceName, UserServiceImpl.class);

model 包下新建 ServiceRegisterInfo 类,代码如下:

package com.siyi.earpc.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author Eric
 * 服务注册信息
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServiceRegisterInfo<T> {
    // 服务名称
    private String serviceName;
    // 服务实现类
    private Class<?extends T>implClass;
}

服务提供者的初始化方法只需要接受封装的注册信息列表作为参数即可,修改参数将原来的服务提供者RpcProviderExample代码复制修改即可

这里使用?而不使用T最主要的一个原因是我们传入的类ServiceRegisterInfo可能并不是同一种,所以只能使用

public static void init(List<ServiceRegisterInfo<?>> serviceRegisterInfoList) {
    RpcApplication.init();
    // 注册服务
    for (ServiceRegisterInfo<?> serviceRegisterInfo : serviceRegisterInfoList) {
        String serviceName = serviceRegisterInfo.getServiceName();
        //注册服务到本地
        LocalRegistry.register(serviceName, serviceRegisterInfo.getImplClass());
        // 将服务注册到注册中心
        RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        //获取注册中心的实例
        Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
        ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
        serviceMetaInfo.setServiceName(serviceName);
        serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
        serviceMetaInfo.setServicePort(rpcConfig.getServerPort());

        System.out.println(serviceMetaInfo.getServiceAddress());
        try {
            registry.register(serviceMetaInfo);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    //启动TCP服务
    VertxTcpServer vertxTcpServer = new VertxTcpServer();
    vertxTcpServer.start(RpcApplication.getRpcConfig().getServerPort());
}

最后修改原有的服务提供者代码简单调用:

package com.siyi.example.provider;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.bootstrap.ProviderBootstrap;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.model.ServiceRegisterInfo;
import com.siyi.earpc.register.LocalRegistry;
import com.siyi.earpc.register.Registry;
import com.siyi.earpc.register.RegistryFactory;
import com.siyi.earpc.server.HttpServer;
import com.siyi.earpc.server.VertxHttpServer;
import com.siyi.earpc.server.tcp.VertxTcpClient;
import com.siyi.earpc.server.tcp.VertxTcpServer;
import com.siyi.example.common.service.UserService;

import java.util.ArrayList;
import java.util.List;

/**
 * @author Eric
 */
public class RpcProviderExample {

    public static void main(String[] args) {
        List<ServiceRegisterInfo<?>> registerInfoList = new ArrayList<>();
        ServiceRegisterInfo<UserService> serviceRegisterInfo = new ServiceRegisterInfo<>(UserService.class.getName(), UserServiceImpl.class);
        registerInfoList.add(serviceRegisterInfo);
        // 服务提供者初始化
        ProviderBootstrap.init(registerInfoList);

    }

}

3.2 服务消费者启动类

package com.siyi.example.consumer;

import com.siyi.earpc.bootstrap.ConsumerBootstrap;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.constant.RpcConstant;
import com.siyi.earpc.proxy.ServiceProxyFactory;
import com.siyi.earpc.utils.ConfigUtils;
import com.siyi.example.common.model.User;
import com.siyi.example.common.service.UserService;

/**
 * @author Eric
 */
public class RpcConsumerExample {
    public static void main(String[] args) {
        // 服务消费者初始化
        ConsumerBootstrap.init();
        // 服务消费者调用
        UserService userService = ServiceProxyFactory.getProxy(UserService.class);
        User user = new User();
        user.setName("张三");
        // 调用
        User newUser = userService.getUser(user);
        if (newUser != null) {
            System.out.println(newUser.getName()+"sada");
        } else {
            System.out.println("user == null");
        }
    }
}

3.3 Spring Boot Starter 注解驱动

创建一个新的项目模块,专门用于实现 Spring Boot Starter 注解驱动的 RPC 框架。

image-20240501143616053

image-20240501143011796

引入依赖

<dependency>
            <groupId>com.siyi</groupId>
            <artifactId>ea-rpc-core</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

3.4 定义注解

可以参考知名框架 Dubbo 的注解。比如:

  1. @EnableDubbo:在 Spring Boot 主应用类上使用,用于启用 Dubbo 功能。
  2. @DubboComponentScan:在 Spring Boot 主应用类上使用,用于指定 Dubbo 组件扫描的包路径。
  3. @DubboReference:在消费者中使用,用于声明 Dubbo 服务引用。
  4. @DubboService:在提供者中使用,用于声明 Dubbo 服务。
  5. @DubboMethod:在提供者和消费者中使用,用于配置 Dubbo 方法的参数、超时时间等。
  6. @DubboTransported:在 Dubbo 提供者和消费者中使用,用于指定传输协议和参数,例如传输协议的类型、端口等。

遵循最小可用化原则,我们只需要定义 3 个注解。

ea-rpc-spring-boot-starter 项目下新建 annotation 包,将所有注解代码放到该包下。

  1. @EnableRpc:用于全局标识项目需要引入 RPC 框架、执行初始化方法。
package com.siyi.earpcspringbootstarter.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

/**
 * @author Eric
 *启用RPC注解
 */
@Target(ElementType.TYPE)//作用在类上
@Retention(java.lang.annotation.RetentionPolicy.RUNTIME)//运行时生效
public @interface EnableRpc {
    /**
     * 需要启动server
     * @return 默认需要
     */
    boolean needServer() default true;
}

也可以将 EnableRpc 注解拆分为两个注解(比如 EnableRpcProvider、EnableRpcConsumer),分别用于标识服务提供者和消费者,但可能存在模块重复初始化的可能性。

  1. @RpcService:服务提供者注解,在需要注册和提供的服务类上使用。
package com.siyi.earpcspringbootstarter.annotation;

import com.siyi.earpc.constant.RpcConstant;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

/**
 * @author Eric
 */
@Target(java.lang.annotation.ElementType.TYPE)
@Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
public @interface RpcService {
    /**
     * 服务接口
     * @return 默认void
     */
    Class<?> interfaceClass()default void.class;
    /**
     * 服务版本
     * @return 默认版本号
     */
    String version() default RpcConstant.DEFAULT_SERVICE_VERSION;
}

  1. @RpcReference:服务消费者注解,在需要注入服务代理对象的属性上使用,类似 Spring 中的 @Resource 注解

RpcReference 注解中,需要指定调用服务相关的属性,比如服务接口类(可能存在多个接口)、版本号、负载均衡器、重试策略、是否 Mock 模拟调用等。

package com.siyi.earpcspringbootstarter.annotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;
/**
 * @author Eric
 * RPC消费者注解
 */
@Target(java.lang.annotation.ElementType.FIELD)// 作用在字段上
@Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
public @interface RpcReference {
    /**
     * 服务接口类
     */
    Class<?> interfaceClass() default void.class;

    /**
     * 版本
     */
    String serviceVersion() default RpcConstant.DEFAULT_SERVICE_VERSION;

    /**
     * 负载均衡器
     */
    String loadBalancer() default LoadBalancerKeys.ROUND_ROBIN;

    /**
     * 重试策略
     */
    String retryStrategy() default RetryStrategyKeys.NO;

    /**
     * 容错策略
     */
    String tolerantStrategy() default TolerantStrategyKeys.FAIL_FAST;

    /**
     * 模拟调用
     */
    boolean mock() default false;

}

3.5 注解驱动

在 starter 项目中新建 bootstrap 包,并且分别针对上面定义的 3 个注解新建启动类。

  1. Rpc 框架全局启动类 RpcInitBootstrap

我们的需求是,在 Spring 框架初始化时,获取 @EnableRpc 注解的属性,并初始化 RPC 框架。

可以实现 Spring 的 ImportBeanDefinitionRegistrar 接口,并且在 registerBeanDefinitions 方法中,获取到项目的注解和注解属性。

ImportBeanDefinitionRegistrar接口只有一个核心方法需要实现,即registerBeanDefinitions方法。该方法接受两个参数:

  • AnnotationMetadata metadata:包含有关当前正在处理的注解类的元数据信息,如注解的属性值、类名等。
  • BeanDefinitionRegistry registry:允许将新的Bean定义注册到Spring容器中。
package com.siyi.earpcspringstarter;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.server.tcp.VertxTcpServer;
import com.siyi.earpcspringstarter.annotation.EnableRpc;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;

/**
 * @author Eric
 * RPC框架初始化
 */
@Slf4j
public class RpcInitBootstrap implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        //获取EnableRpc注解的属性
        boolean needServer = (boolean) importingClassMetadata.getAnnotationAttributes(EnableRpc.class.getName()).get("needServer");
        //RPC框架初始化
        RpcApplication.init();
        //全局配置
        final RpcConfig rpcConfig = RpcApplication.getRpcConfig();
        //启动服务器
        if (needServer) {
            VertxTcpServer vertxTcpServer = new VertxTcpServer();
            vertxTcpServer.start(rpcConfig.getServerPort());
        } else {
            log.info("RPC服务端未启动");
        }
    }
}

从 Spring 元信息中获取到了 EnableRpc 注解的 needServer 属性,并通过它来判断是否要启动服务器。

  1. Rpc 服务提供者启动类 RpcProviderBootstrap

服务提供者启动类的作用是,获取到所有包含 @RpcService 注解的类,并且通过注解的属性和反射机制,获取到要注册的服务信息,并且完成服务注册。

怎么获取到所有包含 @RpcService 注解的类呢?

像前面设计方案中提到的,可以主动扫描包,也可以利用 Spring 的特性监听 Bean 的加载

此处我们选择后者,实现更简单,而且能直接获取到服务提供者类的 Bean 对象。

只需要让启动类实现 BeanPostProcessor 接口的 postProcessAfterInitialization 方法,就可以在某个服务提供者 Bean 初始化后,执行注册服务等操作了。

package com.siyi.earpcspringstarter.bootstrap;

import com.siyi.earpc.RpcApplication;
import com.siyi.earpc.config.RegistryConfig;
import com.siyi.earpc.config.RpcConfig;
import com.siyi.earpc.model.ServiceMetaInfo;
import com.siyi.earpc.register.LocalRegistry;
import com.siyi.earpc.register.Registry;
import com.siyi.earpc.register.RegistryFactory;
import com.siyi.earpcspringstarter.annotation.RpcService;
import org.springframework.beans.factory.config.BeanPostProcessor;

/**
 * @author Eric
 * RPC服务提供者
 */
public class RpcProviderBootstrap implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        //获取bean的类
        Class<?> beanClass = bean.getClass();
        //获取bean的注解
        RpcService rpcService = beanClass.getAnnotation(RpcService.class);
        if (rpcService != null) {
            //需要注册服务
            //1. 获取服务基本信息
            Class<?> interfaceClass = rpcService.interfaceClass();
            //默认值处理,如果是void.class,则获取接口
            if (interfaceClass == void.class) {
                //获取接口
                interfaceClass = beanClass.getInterfaces()[0];
            }
            String serviceName = interfaceClass.getName();
            String version = rpcService.version();
            //2. 注册服务
            LocalRegistry.register(serviceName, beanClass);
            //全局配置
            final RpcConfig rpcConfig = RpcApplication.getRpcConfig();
            //3. 服务注册
            RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
            Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
            ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
            serviceMetaInfo.setServiceName(serviceName);
            serviceMetaInfo.setServiceVersion(version);
            serviceMetaInfo.setServiceHost(rpcConfig.getServerHost());
            serviceMetaInfo.setServicePort(rpcConfig.getServerPort());
            try {
                registry.register(serviceMetaInfo);
            } catch (Exception e) {
                throw new RuntimeException("服务注册失败", e);
            }
        }
        return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
    }
}
  1. Rpc 服务消费者启动类 RpcConsumerBootstrap

和服务提供者启动类的实现方式类似,在 Bean 初始化后,通过反射获取到 Bean 的所有属性,如果属性包含 @RpcReference 注解,那么就为该属性动态生成代理对象并赋值。

package com.siyi.earpcspringstarter.bootstrap;

import com.siyi.earpc.proxy.ServiceProxyFactory;
import com.siyi.earpcspringstarter.annotation.RpcReference;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

import java.lang.reflect.Field;

/**
 * @author Eric
 * RPC服务消费者
 */
public class RpcConsumerBootstrap implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class<?> beanClass = bean.getClass();
        //遍历对象所有字段
        Field[] fields = beanClass.getFields();
        for (Field field : fields) {
            //获取字段上的注解
            RpcReference rpcReference = field.getAnnotation(RpcReference.class);
            if (rpcReference != null) {
                //获取接口类
                Class<?> interfaceClass = rpcReference.interfaceClass();
                //默认值处理,如果是void.class,则获取接口
                if (interfaceClass == void.class) {
                    //获取接口
                    interfaceClass = field.getType();
                }
                field.setAccessible(true);
                Object proxy = ServiceProxyFactory.getProxy(interfaceClass);
                try {
                    field.set(bean, proxy);
                    field.setAccessible(false);
                } catch (IllegalAccessException e) {
                    throw new RuntimeException("RPC服务消费者注入代理对象失败", e);
                }

            }
        }
        return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
    }
}
  1. 注册已编写的启动类。在 Spring 中加载我们已经编写好的启动类。

仅在用户使用 @EnableRpc 注解时,才启动 RPC 框架。所以,可以通过给 EnableRpc 增加 @Import 注解,来注册我们自定义的启动类,实现灵活的可选加载。

修改代码:

package com.siyi.earpcspringstarter.annotation;

import com.siyi.earpcspringstarter.bootstrap.RpcConsumerBootstrap;
import com.siyi.earpcspringstarter.bootstrap.RpcInitBootstrap;
import com.siyi.earpcspringstarter.bootstrap.RpcProviderBootstrap;
import org.springframework.context.annotation.Import;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

/**
 * @author Eric
 *启用RPC注解
 */
@Target(ElementType.TYPE)//作用在类上
@Retention(java.lang.annotation.RetentionPolicy.RUNTIME)//运行时生效
@Import({RpcConsumerBootstrap.class, RpcProviderBootstrap.class, RpcInitBootstrap.class})
public @interface EnableRpc {
    /**
     * 需要启动server
     * @return 默认需要
     */
    boolean needServer() default true;
}

4.测试

新建 2 个使用 Spring Boot 2 框架的项目。

  • 示例 Spring Boot 消费者:example-springboot-consumer
  • 示例 Spring Boot 提供者:example-springboot-provider

引入依赖

 <dependency>
            <groupId>com.siyi</groupId>
            <artifactId>ea-rpc-spring-starter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.siyi</groupId>
            <artifactId>example-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
  1. 示例服务提供者项目的入口类加上 @EnableRpc 注解,代码如下:
package com.siyi.examplespringbootprovider;

import com.siyi.earpcspringstarter.annotation.EnableRpc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableRpc
public class ExampleSpringbootProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(ExampleSpringbootProviderApplication.class, args);
    }

}

服务提供者提供一个简单的服务,代码如下

package com.siyi.examplespringbootprovider;

import com.siyi.earpcspringstarter.annotation.RpcService;
import com.siyi.example.common.model.User;
import com.siyi.example.common.service.UserService;
import org.springframework.stereotype.Service;

@Service
@RpcService
public class UserServiceImpl implements UserService {

    public User getUser(User user) {
        System.out.println("用户名:" + user.getName());
        return user;
    }
}
  1. 示例服务消费者的入口类加上 @EnableRpc(needServer = false) 注解,标识启动 RPC 框架,但不启动服务器。
package com.siyi.examplespringbootconsumer;

import com.siyi.earpcspringstarter.annotation.EnableRpc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Eric
 * needServer默认为true,表示需要启动server
 */
@SpringBootApplication
@EnableRpc(needServer = false)
public class ExampleSpringbootConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ExampleSpringbootConsumerApplication.class, args);
    }

}

消费者编写一个 Spring 的 Bean,引入 UserService 属性并打上 @RpcReference 注解,表示需要使用远程服务提供者的服务。

package com.siyi.examplespringbootconsumer;

import com.siyi.earpcspringstarter.annotation.RpcReference;
import com.siyi.example.common.model.User;
import com.siyi.example.common.service.UserService;
import org.springframework.stereotype.Service;

/**
 * @author Eric
 */
@Service
public class ExampleServiceImpl {

    @RpcReference
    private UserService userService;

    public void test() {
        User user = new User();
        user.setName("siyi");
        User resultUser = userService.getUser(user);
        System.out.println(resultUser.getName());
    }

}

服务消费者编写单元测试,验证能否调用远程服务:

package com.siyi.examplespringbootconsumer;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

import static org.junit.jupiter.api.Assertions.*;
@SpringBootTest
class ExampleServiceImplTest {

    @Resource
    private ExampleServiceImpl exampleService;

    @Test
    void test1() {
        exampleService.test();
    }
}

最终项目结构:

image-20240501215339951