目录

Netty基础6.Netty实现RPC服务三

目录

Netty基础—6.Netty实现RPC服务三

大纲

1.RPC的相关概念

2.RPC服务调用端动态代理实现

3.Netty客户端之RPC远程调用过程分析

4.RPC网络通信中的编码解码器

5.Netty服务端之RPC服务提供端的处理

6.RPC服务调用端实现超时功能

5.Netty服务端之RPC服务提供端的处理

(1)RPC服务提供端NettyServer

(2)基于反射调用请求对象的目标方法

(1)RPC服务提供端NettyRpcServer

public class ServiceConfig {
    private String serviceName;//调用方的服务名称
    private Class serviceInterfaceClass;//服务接口类型
    private Class serviceClass;
    ...
}

public class NettyRpcServer {
    private static final Logger logger = LogManager.getLogger(NettyRpcServer.class);
    private static final int DEFAULT_PORT = 8998;
    private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();
    private int port;
    
    public NettyRpcServer(int port) {
        this.port = port;
    }
    
    public void start() {
        logger.info("Netty RPC Server Starting...");
        EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
        EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
            .group(bossEventLoopGroup, workerEventLoopGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline()
                    .addLast(new RpcDecoder(RpcRequest.class))
                    .addLast(new RpcEncoder(RpcResponse.class))
                    .addLast(new NettyRpcServerHandler(serviceConfigs));
                }
            })
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true);

            //到这一步为止,server启动了而且监听指定的端口号了
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            logger.info("Netty RPC Server started successfully, listened[" + port + "]");
            //进入一个阻塞的状态,同步一直等待到你的server端要关闭掉
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            logger.error("Netty RPC Server failed to start, listened[" + port + "]");
        } finally {
            bossEventLoopGroup.shutdownGracefully();
            workerEventLoopGroup.shutdownGracefully();
        }
    }
    
    //可以代理多个服务
    public void addServiceConfig(ServiceConfig serviceConfig) {
        this.serviceConfigs.add(serviceConfig);
    }
    
    public static void main(String[] args) {
        ServiceConfig serviceConfig = new ServiceConfig( "TestService", TestService.class, TestServiceImpl.class);
        NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);
        nettyRpcServer.addServiceConfig(serviceConfig);
        nettyRpcServer.start();
    }
}

(2)基于反射调用请求对象的目标方法

//RpcRequest类需要修改字段调整为如下所示
public class RpcRequest implements Serializable {
    private String requestId;
    private String className;
    private String methodName;
    private Class[] parameterTypes;//参数类型
    private Object[] args;//参数值
    private String invokerApplicationName;//调用方的服务名称
    private String invokerIp;//调用方的IP地址
    ...
}

public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(NettyRpcServerHandler.class);
    private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();
    
    public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {
        for (ServiceConfig serviceConfig : serviceConfigs) {
            String serviceInterfaceClass = serviceConfig.getServiceInterfaceClass().getName();
            serviceConfigMap.put(serviceInterfaceClass, serviceConfig);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcRequest rpcRequest = (RpcRequest)msg;
        logger.info("Netty RPC Server receives the request: " + rpcRequest);
        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setRequestId(rpcRequest.getRequestId());
        try {
            //此时我们要实现什么呢?
            //我们需要根据RpcRequest指定的class,获取到这个class
            //然后通过反射构建这个class对象实例
            //接着通过反射获取到这个RpcRequest指定方法和入参类型的method
            //最后通过反射调用,传入方法,拿到返回值

            //根据接口名字拿到接口实现类的名字后再获取类
            ServiceConfig serviceConfig = serviceConfigMap.get(rpcRequest.getServiceInterfaceClass());
            Class clazz = serviceConfig.getServiceClass();
            Object instance = clazz.newInstance();
            Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
            Object result = method.invoke(instance, rpcRequest.getArgs());

            //把rpc调用结果封装到响应里去
            rpcResponse.setResult(result);
            rpcResponse.setSuccess(RpcResponse.SUCCESS);
        } catch(Exception e) {
            logger.error("Netty RPC Server failed to response the request.", e);
            rpcResponse.setSuccess(RpcResponse.FAILURE);
            rpcResponse.setException(e);
        }
        ctx.write(rpcResponse);
        ctx.flush();
        logger.info("send RPC response to client: " + rpcResponse);
    }
}

6.RPC服务调用端实现超时功能

public class ReferenceConfig {
    private static final long DEFAULT_TIMEOUT = 5000;
    private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";
    private static final int DEFAULT_SERVICE_PORT = 8998;

    private Class serviceInterfaceClass;
    private String serviceHost;
    private int servicePort;
    private long timeout;
    ...
}

public class NettyRpcClient {
    private static final Logger logger = LogManager.getLogger(NettyRpcClient.class);

    private ReferenceConfig referenceConfig;
    private ChannelFuture channelFuture;
    private NettyRpcClientHandler nettyRpcClientHandler;
    
    public NettyRpcClient(ReferenceConfig referenceConfig) {
        this.referenceConfig = referenceConfig;
        this.nettyRpcClientHandler = new NettyRpcClientHandler(referenceConfig.getTimeout());
    }

    public void connect() {
        logger.info("connecting to Netty RPC server: " + referenceConfig.getServiceHost() + ":" + referenceConfig.getServicePort());
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
        .group(eventLoopGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.SO_KEEPALIVE, true)//长时间没有通信就发送一个检测包
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline()
                .addLast(new RpcEncoder(RpcRequest.class))
                .addLast(new RpcDecoder(RpcResponse.class))
                .addLast(new NettyRpcReadTimeoutHandler(referenceConfig.getTimeout()))
                .addLast(nettyRpcClientHandler);
            }
        });       

        try {
            if (referenceConfig.getServiceHost() != null && !referenceConfig.getServiceHost().equals("")) {
                channelFuture = bootstrap.connect(referenceConfig.getServiceHost(), referenceConfig.getServicePort()).sync();
                logger.info("successfully connected.");
            }
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }

    public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
        //标记一下请求发起的时间
        NettyRpcRequestTimeHolder.put(rpcRequest.getRequestId(), new Date().getTime());
        channelFuture.channel().writeAndFlush(rpcRequest).sync();
        RpcResponse rpcResponse = nettyRpcClientHandler.getRpcResponse(rpcRequest.getRequestId());
        logger.info("receives response from netty rpc server.");
        if (rpcResponse.isSuccess()) {
            return rpcResponse;
        }
        throw rpcResponse.getException();
    }
}

public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(NettyRpcReadTimeoutHandler.class);
    private long timeout;
    public NettyRpcReadTimeoutHandler(long timeout) {
        this.timeout = timeout;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse)msg;
        long requestTime = NettyRpcRequestTimeHolder.get(rpcResponse.getRequestId());
        long now = new Date().getTime();
        if (now - requestTime >= timeout) {
            rpcResponse.setTimeout(true);
            logger.error("Netty RPC response is marked as timeout status: " + rpcResponse);
        }
      
        //移除发起请求时间的标记
        NettyRpcRequestTimeHolder.remove(rpcResponse.getRequestId());
        ctx.fireChannelRead(rpcResponse);
    }
}

public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(NettyRpcClientHandler.class);
    private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;
    private ConcurrentHashMap<String, RpcResponse> rpcResponses = new ConcurrentHashMap<String, RpcResponse>();
    private long timeout;

    public NettyRpcClientHandler(long timeout) {
        this.timeout = timeout;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcResponse rpcResponse = (RpcResponse) msg;
        if (rpcResponse.getTimeout()) {
            logger.error("Netty RPC client receives the response timeout: " + rpcResponse);
        } else {
            rpcResponses.put(rpcResponse.getRequestId(), rpcResponse);
            logger.info("Netty RPC client receives the response: " + rpcResponse);
        }
    }

    public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {
        long waitStartTime = new Date().getTime();
        while (rpcResponses.get(requestId) == null) {
            try {
                long now = new Date().getTime();
                if (now - waitStartTime >= timeout) {
                    break;
                }
                Thread.sleep(GET_RPC_RESPONSE_SLEEP_INTERVAL);
            } catch (InterruptedException e) {
                logger.error("wait for response interrupted", e);
            }
        }
        RpcResponse rpcResponse = rpcResponses.get(requestId);
        if (rpcResponse == null) {
            logger.error("Get RPC response timeout.");
            throw new NettyRpcReadTimeoutException("Get RPC response timeout.");
        } else {
            rpcResponses.remove(requestId);
        }
        return rpcResponse;
    }
}