「分佈式」 自定義RPC框架-基於JAVA實現

整體思路

RPC(Remote Procedure Call),即遠程過程調用。使用RPC,可以像使用本地的程序一樣使用遠程計算機上的程序。RPC使得開發分佈式程序更加容易。下面是一個基於java的簡單的RPC實例,有助於學習dubbo或grpc等框架的原理。

原理分析

RPC採用客戶機/服務器模式。請求程序就是客戶端,而服務提供程序就是服務端。也就是說需要兩個角色,服務端和客戶端。首先,客戶端調用進程發送一個調用信息(調用的接口,方法名,方法傳入參數等)給服務端,然後等待應答信息。在服務器端,當一個調用信息到達,服務器獲得調用信息並解析執行調用的接口和方法,然後發送調用的方法返回值,然後等待下一個調用信息,最後,客戶端接收到服務端發送回來的方法返回信息。

以下是代碼

服務端

首先需要業務類,然後需要一個註冊中心,註冊中心可以把被調用的業務類註冊到一個map集合中,然後根據客戶端發送過來的調用信息執行相應的業務類對象的方法,並返回方法的返回值

創建需要發佈的業務類接口和具體實現類

<code>public interface HelloService {
Object sayHello(String name);
}

public class HelloServiceImpl implements HelloService {
@Override
public Object sayHello(String name) {
return "hello" + name;
}
}
/<code>

然後是服務端的主體類,就是註冊中心。定義三個方法start()初始化方法,stop()停止服務方法,register()註冊中心

<code>public interface Server {
void start();
void stop();
void register(Class service,Class serviceImpl);
}/<code>

具體實現類,首先聲明一個map集合來來存放業務類,key是業務類的接口名,value是接口對應的具體實現類class對象

<code>public class ServerCenter implements Server {
private static HashMap<string> serviceRegister=new HashMap<>(); private static int PORT=0; //根據本地計算機性能生成對應容量的線程池 private static ExecutorService servicePool=
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public ServerCenter() {

}

public ServerCenter(int port) {
this.PORT=port; }

@Override public void start() {
ServerSocket server=null; try {
server=new ServerSocket(); server.bind(new InetSocketAddress(PORT)); } catch (IOException e1) {
// TODO Auto-generated catch block e1.printStackTrace(); }

while(true) {
System.out.println("等待客戶端連接..."); Socket socket = null; try {
//服務器等待連接,每當有客戶端連接就開啟線程執行調用信息處理類 socket = server.accept(); servicePool.execute(new ServiceTask(socket)); } catch (IOException e) {
// TODO Auto-generated catch block e.printStackTrace(); }

}

}

@Override public void stop() {

servicePool.shutdown(); }

@Override public void register(Class service, Class serviceImpl) {

serviceRegister.put(service.getName(), serviceImpl); }

//具體調用信息處理類,解析客戶端發來的調用信息並執行對應的業務方法並相應方法的返回值 private class ServiceTask implements Runnable{

private Socket socket=null; public ServiceTask() {

}

public ServiceTask(Socket socket) {
this.socket = socket; }

@Override public void run() {
ObjectInputStream ois=null; ObjectOutputStream oos=null; try {
System.out.println("客戶端已連接"); ois=new ObjectInputStream(socket.getInputStream()); //獲取客戶端發來的接口名 String className=ois.readUTF(); //獲取客戶端發來的方法 String methodName=ois.readUTF(); //獲取客戶端發來的方法參數類型 Class[] methodTypes=(Class[]) ois.readObject(); //獲取客戶端發來的方法參數值 Object[] args =(Object[]) ois.readObject(); //從map中找到需要的接口並執行客戶端調用的方法 Class service=serviceRegister.get(className); Method method = service.getMethod(methodName,methodTypes); Object returns = method.invoke(service.newInstance(), args); oos=new ObjectOutputStream(socket.getOutputStream()); //返回方法執行的結果 oos.writeObject(returns); }catch (Exception e) {
e.printStackTrace(); }finally {
try {
//關閉資源 if(oos!=null)oos.close(); if(ois!=null)ois.close(); if(socket!=null)socket.close(); } catch (IOException e) {
// TODO Auto-generated catch block e.printStackTrace(); }
}

}
}
}/<string>/<code>

客戶端

客戶端使用動態代理來接受服務端的業務類返回值

<code>public class Client {
@SuppressWarnings("unchecked")
public static T getRemoteProxyObj(Class serviceInterface,InetSocketAddress addr) {

return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class>[]{serviceInterface}, new InvocationHandler() {

@Override public Object invoke(Object proxy, Method method, Object[] args) {

Socket socket = null; ObjectInputStream ois = null; ObjectOutputStream oos = null; Object result = null; try {
socket = new Socket(); socket.connect(addr); oos = new ObjectOutputStream(socket.getOutputStream()); //發送需要的接口名 oos.writeUTF(serviceInterface.getName()); //發送需要的方法名 oos.writeUTF(method.getName()); //方法參數類型 oos.writeObject(method.getParameterTypes()); //方法參數 oos.writeObject(args); ois = new ObjectInputStream(socket.getInputStream()); result = ois.readObject(); } catch (Exception e) {
e.printStackTrace(); } finally {
try {
if (oos != null) oos.close(); if (ois != null) ois.close(); if (socket != null) socket.close(); } catch (IOException e) {
// TODO Auto-generated catch block e.printStackTrace(); }
}

return result; }
}); }
}
/<code>

測試

服務端使用register()方法對HelloService類進行註冊並開啟服務等待客戶端連接

<code>public class ServerTest {
public static void main(String[] args) {

Server server=new ServerCenter(9999);
server.register(HelloService.class,HelloServiceImpl.class); server.start(); }

}/<code>

客戶端直接聲明需要調用的業務類的接口接受動態代理對象並執行需要的方法

<code>public class ClientTest {
public static void main(String[] args) throws ClassNotFoundException {

HelloService hs=Client.getRemoteProxyObj(Class.forName("com.jd.rpc.HelloService"), new InetSocketAddress("127.0.0.1", 9999)); System.out.println(hs.sayHello("world")); }
}/<code>

運行結果

helloworld


分享到:


相關文章: