博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
最简单的RPC框架实现
阅读量:6451 次
发布时间:2019-06-23

本文共 6283 字,大约阅读时间需要 20 分钟。

通过java原生的序列化,Socket通信,动态代理和反射机制,实现一个简单的RPC框架,由三部分组成:

1、服务提供者,运行再服务端,负责提供服务接口定义和服务实现类

2、服务发布者,运行再RPC服务端,负责将本地服务发布成远程服务,供其他消费者调用

3、本地服务代理,运行再RPC客户端,通过代理调用远程服务提供者,然后将结果进行封装返回给本地消费者

服务端接口定义和实现,如下:

代码清单 1-1 接口定义

public interface EchoService {    String echo(String ping);}

代码清单1-2

public class EchoServiceImpl implements EchoService {    public String echo(String ping) {        return ping != null ? ping + " -- I am ok." : "I am ok";    }}

代码清单1-3

import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.Executor;import java.util.concurrent.Executors;/** *  */public class RpcExporter {    static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());    public static void exporter(String hostName , int port) throws  Exception{        ServerSocket serverSocket = new ServerSocket();        serverSocket.bind(new InetSocketAddress(hostName,port));        try {            while (true){                executor.execute(new ExporterTask(serverSocket.accept()));            }        }finally {            serverSocket.close();        }    }    private static class ExporterTask implements Runnable{        Socket client = null;        public ExporterTask(Socket client) {            this.client = client;        }        public void run() {            ObjectInputStream inputStream = null;            ObjectOutputStream outputStream = null;            try {                inputStream = new ObjectInputStream(client.getInputStream());                String interfaceName = inputStream.readUTF();                Class
service = Class.forName(interfaceName); String methodName = inputStream.readUTF(); Class
[] parameterTypes = (Class
[])inputStream.readObject(); Object [] arguments = (Object [])inputStream.readObject(); Method method = service.getMethod(methodName,parameterTypes); Object result = method.invoke(service.newInstance(),arguments); outputStream = new ObjectOutputStream(client.getOutputStream()); outputStream.writeObject(result); }catch (Exception e){ e.printStackTrace(); }finally { if(outputStream != null){ try { outputStream.close(); }catch (Exception e){ e.printStackTrace(); } } if(inputStream != null){ try { inputStream.close(); }catch (Exception e){ e.printStackTrace(); } } if(client != null){ try { client.close(); }catch (Exception e){ e.printStackTrace(); } } } } }}

服务发布者的主要职责如下:

1、作为服务端,监控客户端的TCP连接,接收到新的客户端连接之后,将其封装成Task,由线程池执行

2、将客户端发送的码流反序列化成对象,反射调用实现者,获取执行结果

3、将执行结果对象序列化,通过socket发送给客户端

4、远程服务调用完成之后,释放Socket等连接字段,防止句柄泄露

RPC客户端本地服务代理源码如下:

代码清单1-4

package com.habit;import com.sun.org.apache.xml.internal.serializer.OutputPropertiesFactory;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.InetSocketAddress;import java.net.Socket;/** *  */public class RpcImporter {    public S importer(final Class
serviceClass, final InetSocketAddress address) { return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class
[]{serviceClass.getInterfaces()[0]}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; try { socket = new Socket(); socket.connect(address); outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeUTF(serviceClass.getName()); outputStream.writeUTF(method.getName()); outputStream.writeObject(method.getParameterTypes()); outputStream.writeObject(args); inputStream = new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); } finally { if (socket != null) { socket.close(); } if (outputStream != null) { outputStream.close(); } if (inputStream != null) { inputStream.close(); } } } }); }}

本地服务代理的主要功能:

1、将本地的接口调用转成JDK的动态代理,再动态代理中实现接口的远程调用

2、创建Socket客户端,根据指定地址链接远程服务提供者

3、将远程服务调用所需的接口类、方法名、参数列表等编码后发送给服务提供者

4、同步阻塞等待服务端返回应答,获取应答之后返回

测试代码:

代码清单1-5

package com.habit;import java.net.InetSocketAddress;/** *  */public class RpcTest {    public static void main(String[] args) {        new Thread(new Runnable() {            public void run() {                try {                    RpcExporter.exporter("localhost",8088);                }catch (Exception e){                    e.printStackTrace();                }            }        }).start();        RpcImporter
importer = new RpcImporter
(); EchoService echo = importer.importer(EchoServiceImpl.class,new InetSocketAddress("localhost",8088)); System.out.println(echo.echo("Are you ok?")); }}

创建一个异步发布服务端的线程并启动,用于接口rpc客户端的请求,根据请求参数调用服务实现类,返回结果给客户端

随后,创建客户端服务代理类,构建rpc请求参数,发起rpc请求,将调用结果输出到控制台,执行结果如下:

Are you ok? -- I am ok.

 

转载于:https://www.cnblogs.com/qunan/p/9127095.html

你可能感兴趣的文章
Java 日历
查看>>
center 安装lamp 环境
查看>>
Linux htop 工具使用
查看>>
android中使用sqlite、复制assets下的数据库到SD卡、支持大于1M的文件
查看>>
Android应用程序组件Content Provider的启动过程源代码分析(1)
查看>>
Android应用程序组件Content Provider在应用程序之间共享数据的原理分析(6)
查看>>
我的友情链接
查看>>
条件和条件语句
查看>>
Android中EditText的inputType属性值
查看>>
iOS 开发百问(1)
查看>>
动态代理的应用---AOP
查看>>
js中的prototype和__proto__
查看>>
深入浅出高性能服务发现、配置框架Nacos系列 2: Nacos项目结构介绍
查看>>
Linux定时任务
查看>>
项目手册3
查看>>
银海学院校园剧首映 师生团队编导
查看>>
CentOS一键配置rsync服务器脚本
查看>>
我的第一往篇博文
查看>>
SpringBoot启动分析
查看>>
Why Pascal is Not My Favorite Programming Language
查看>>