Outline
Related concepts
Dynamic proxy implementation on the service call side
Analysis of RPC remote call process of client
Encoding and decoder in network communication
Processing of RPC service provider on the server side
The service call side realizes the timeout function
Related concepts
(1) What is RPC
(2) What is a static proxy
(3) What is dynamic proxy
(4) Dynamic proxy summary
(1) What is RPC
There is only one method interface locally, and this method needs to be called remotely locally. Calling this method is actually calling the dynamic proxy of the interface.
The underlying layer of the dynamic proxy will encapsulate the call to this method into a request, and then serialize the request into a binary data request, and then send the binary data request to the remote machine through Netty network communication.
The remote machine will start a Netty server to listen for connections and requests, then deserialize the binary data into a request object, then find the method to be called locally based on the request object, and finally call this method through reflection and get the result and return.
(2) Agent mode
The proxy mode is generally divided into two types: static proxy and dynamic proxy.
Static proxy: It is to create the proxy class in advance, and the proxy class has been compiled into bytecode before the program is run.
Dynamic proxy: It is to generate the proxy class at runtime, that is, the bytecode of the proxy class will be dynamically generated and loaded into the ClassLoader at runtime.
(3) Static proxy
If you want to enhance a method of a class that implements an interface, you can only re-implement the interface without affecting the original interface. If there are many classes to enhance, each class needs to be implemented again, which is more troublesome. For example, in the following example, if you also want to proxy the implementation class of the IReceiver interface, you also need to define a ProxyReceiver proxy class to implement the IReceiver interface. Because the specific proxy class needs to implement the interface of the proxy class.
//Step 1: Define the interface
public interface ISender {
public boolean send();
}
//Step 2: Define the real implementation class, the proxy class
public class SmsSender implements ISender {
public boolean send() {
("Sending msg");
return true;
}
}
//Step 3: Define the proxy class and encapsulate the implementation class
//Proxy class implements function extension without affecting the real class
//If you also want to proxy the implementation class of the IReceiver interface, you also need to define a ProxyReceiver to implement the IReceiver interface
public class ProxySender implements ISender {
private ISender sender;
public ProxySender(ISender sender) {
= sender;
}
public boolean send() {
("Before processing");
boolean result = ();
("After processing");
return result;
}
}
//Step 4: Client calls
@Test
public void testStaticProxy() {
ISender sender = new ProxySender(new SmsSender());
boolean result = ();
("Output result:" + result);
}
(4) Dynamic proxy
Dynamic proxy has more advantages than static proxy. Dynamic proxy not only does not require defining proxy classes, but can even specify the execution logic of proxy classes at runtime, thereby greatly improving system flexibility. For example, in the following JDK dynamic proxy example, you only need to define a JdkProxyHandler class that implements the InvocationHandler interface, and you can proxy the implementation classes of the ISender and IReceiver interfaces at the same time. Because the JdkProxyHandler class does not rely on the specific proxy class interface.
Currently, there are several ways to generate dynamic proxy classes:
JDK dynamic proxy: built into JDK, no need to introduce third-party jars, it is simple to use, but has weak functions.
CGLib and Javassist: Both are advanced bytecode generation libraries, with better overall performance than JDK dynamic proxy and powerful functions.
ASM: A low-level bytecode generation tool, which almost uses bytecode encoding, has the highest requirements for developers, and of course the best performance.
1.JDK dynamic proxy
First, you need to define an InvocationHandler class. This class needs to implement the invoke() method of the InvocationHandler interface to intercept calls to the proxy class interface method.
Then when the client calls the () method and passes in the proxy class interface and an InvocationHandler object encapsulating the proxy object, a proxy class will be generated dynamically and a proxy object that implements the proxy class interface will be returned.
In the future, the interface method of the proxy class can be called to the proxy object, and the corresponding method calls will be forwarded to the invoke() method of the InvocationHandler object, thereby realizing interception and enhancement when calling the proxy object's methods.
//Step 1: Define the interface
public interface ISender {
public boolean send();
}
//Step 2: Define the proxy class that implements the above interface
public class SmsSender implements ISender {
public boolean send() {
("Sending msg");
return true;
}
}
//Step 3: Define an InvocationHandler class
public class JdkProxyHandler implements InvocationHandler {
private Object target;
public JdkProxyHandler(Object target) {
= target;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
("Before processing");
Object result = (target, args);
("After processing");
return result;
}
}
//Step 4: Client calls
@Test
public void testJdkProxy() {
//Dynamicly generate a proxy class and return a proxy object that implements the proxy class interface
//Input parameters are: class loader, type of proxy class, InvocationHandler object encapsulating a proxy object
ISender sender = (ISender) (
(),
new Class[]{},
new JdkProxyHandler(new SmsSender())
);
//Calling the interface method of the proxy class to the proxy object
boolean result = ();
("Proxy object:" + ().getName());
("Output result:" + result);
}
JDK dynamic proxy dynamically generates proxy objects through the () method. The underlying layer of JDK dynamic proxy is implemented through the Java reflection mechanism, and the target object (proxy object) needs to inherit from an interface to generate its proxy class.
public class Proxy implements {
...
//This method has 3 parameters:
//loader: Which class loader to use to load the proxy object? The proxy that generates the target object needs to ensure that its class loader is the same, so the class loader of the target object needs to be passed as a parameter
//interfaces: List of interfaces that the proxy class needs to implement. JDK dynamic proxy technology requires that both the proxy class and the target object inherit from the same interface, so the interface of the target object needs to be passed as parameters.
//h: Call the processor, call a callback method that implements the InvocationHandler class, and the enhanced logic of the target object is in this implementation class
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException {
(h);
final Class<?>[] intfs = ();
final SecurityManager sm = ();
if (sm != null) {
checkProxyAccess((), loader, intfs);
}
//Get the type of the proxy class
Class<?> cl = getProxyClass0(loader, intfs);
try {
if (sm != null) {
checkNewProxyPermission((), cl);
}
//Create a proxy object instance through reflection
final Constructor<?> cons = (constructorParams);
final InvocationHandler ih = h;
if (!(())) {
(new PrivilegedAction<Void>() {
public Void run() {
(true);
return null;
}
});
}
return (new Object[]{h});
} catch (IllegalAccessException|InstantiationException e) {
throw new InternalError((), e);
} catch (InvocationTargetException e) {
Throwable t = ();
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new InternalError((), t);
}
} catch (NoSuchMethodException e) {
throw new InternalError((), e);
}
}
...
}
2. CGLib dynamic proxy
Unlike JDK dynamic proxy, CGLib dynamic proxy does not require the target object to implement itself as an interface, but only needs to implement an entry class that processes the proxy logic and implement the MethodInterceptor interface.
The characteristics of CGLib dynamic proxy are as follows:
Using CGLib to implement dynamic proxying is completely free from the limitation that the proxy class must implement itself from an interface. The underlying CGLib adopts the ASM bytecode generation framework, and using bytecode technology to generate proxy analogies is more efficient than using Java reflection. CGLib cannot proxy methods declared as final, because the CGLib principle is to dynamically generate subclasses of the proxy class.
//Dependencies need to be introduced before using CGLib dynamic proxy
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>3.3.0</version>
</dependency>
//Step 1: Define a proxy class
public class BdSender {
public boolean send() {
("Sending msg");
return true;
}
}
//Step 2: Implement an entry class that handles proxy logic and implements the MethodInterceptor interface
public class CglibProxyInterceptor implements MethodInterceptor {
private Enhancer enhancer = new Enhancer();
//Get the proxy class
//@param clazz proxy class
public Object getProxy(Class clazz) {
(clazz);
(this);
return ();
}
@Override
public Object intercept(Object object, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
("Before processing");
Object result = (object,args);
("After processing");
return result;
}
}
//Step 3: Client calls
@Test
public void testCglibProxy(){
BdSender sender = (BdSender) new CglibProxyInterceptor().getProxy();
boolean result = ();
("Proxy object:" + ().getName());
("Output result:" + result);
}
3.Javassist dynamic proxy
Javassist is an open source library for analyzing, editing and creating Java bytecodes, which can directly edit and generate Java-generated bytecodes. Compared with tools such as ASM, developers can dynamically change the structure of classes or generate classes without understanding virtual machine instructions.
There are two ways to generate dynamic proxy using Javassist:
Agent factory creation: It is necessary to implement an entry-level class for handling proxy logic and implement the MethodHandler interface, similar to CGLib.
Dynamic code creation: Bytecode can be generated through Java code. The dynamic proxy created in this way is very flexible and can even generate business logic at runtime.
Method 1: Create an agent factory
//Dependencies need to be introduced before using Javassist
<dependency>
<groupId></groupId>
<artifactId>javassist</artifactId>
<version>3.27.0-GA</version>
</dependency>
//Step 1: Define a proxy class
public class BdSender {
public boolean send() {
("Sending msg");
return true;
}
}
//Step 2: Implement a class that processes proxy logic and implements the MethodHandler interface
public class JavassistProxyHandler implements MethodHandler {
private ProxyFactory proxyFactory = new ProxyFactory();
//Get the proxy object
//@param clazz proxy class
public Object getProxy(Class clazz) throws Exception {
(clazz);
Class<?> factoryClass = ();
Object proxy = ();
((ProxyObject)proxy).setHandler(this);
return proxy;
}
@Override
public Object invoke(Object object, Method method, Method method1, Object[] args) throws Throwable {
("Before processing");
Object result = (object,args);
("After processing");
return result;
}
}
//Step 3: Client calls
@Test
public void testJavassistProxy() throws Exception {
BdSender sender = (BdSender) new JavassistProxyHandler().getProxy();
boolean result = ();
("Proxy object:" + ().getName());
("Output result:" + result);
}
Method 2: Dynamic code creation
//Step 1: Define a proxy class
public class BdSender {
public boolean send() {
("Sending msg");
return true;
}
}
//Step 2: Generate byte code
public class JavassistDynamicCodeProxy {
public static Object getProxy(Class clazz) throws Exception {
ClassPool mPool = ();
CtClass c0 = (());
//Define the proxy class name
CtClass mCtc = (() + "$$BytecodeProxy");
//Add parent class inheritance
(c0);
//Add class field information
CtField field = new CtField(c0, "real", mCtc);
();
(field);
//Add a constructor
CtConstructor constructor = new CtConstructor(new CtClass[]{c0},mCtc);
("{$ = $1;}"); // $0 represents this, $1 represents the first parameter of the constructor
(constructor);
//Add method
CtMethod ctMethod = ().getDeclaredMethod("send");
CtMethod newMethod = new CtMethod((), (),(), mCtc);
("{" +
"(\"Before processing\");" +
"boolean result = $();" +
"(\"After processing\");" +
"return result;}");
(newMethod);
//Generate dynamic classes
return ().getConstructor(clazz).newInstance(());
}
}
//Step 3: Client calls
@Test
public void testJavassisBytecodetProxy() throws Exception {
BdSender sender = (BdSender) ();
boolean result = ();
("Proxy object:" + ().getName());
("Output result:" + result);
}
(4) Dynamic proxy summary
JDK dynamic proxy needs to implement the InvocationHandler interface, CGLib dynamic proxy needs to implement the MethodInterceptor interface, and Javassist dynamic proxy needs to implement the MethodHandler interface or directly generate bytecode.
Dynamics in dynamic proxy are for static proxy. The advantage of dynamic proxy is not to eliminate the amount of code when writing static proxy classes, but to implement the proxy behavior of the proxy class when the proxy class is unknown.
Dynamic proxy implementation on the service call side
When an RPC service initiates a call to the target interface, it will first create a dynamic proxy class using, for example, JDK dynamic proxy method, and then the dynamic proxy class will send the call request to the target machine through Netty for processing.
So the key code is as follows:
() ->
() ->
(rpcRequest)
public class NettyRpcClientTest {
private static final Logger logger = ();
public static void main(String[] args) {
//Which interface was initiated when RPC calls were initiated
ReferenceConfig referenceConfig = new ReferenceConfig();
//Create dynamic proxy class
TestService testService = (TestService) (referenceConfig);
//The following call will go to the invoke() method
String result = ("zhangsan");
("rpc call finished: " + result);
}
}
public class RpcServiceProxy {
//Create a proxy
public static Object createProxy(ReferenceConfig referenceConfig) {
Return (
(),
new Class[]{()},
new ServiceProxyInvocationHandler(referenceConfig)
);
}
static class ServiceProxyInvocationHandler implements InvocationHandler {
private ReferenceConfig referenceConfig;
public ServiceProxyInvocationHandler(ReferenceConfig referenceConfig) {
= referenceConfig;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//Initiate the connection
NettyRpcClient nettyRpcClient = new NettyRpcClient(referenceConfig);
();
RpcRequest rpcRequest = new RpcRequest();
(().toString().replace("-", ""));
(().getName());
(());
(());
(args);
//Make RPC calls
RpcResponse rpcResponse = (rpcRequest);
return ();
}
}
}
public class NettyRpcClient {
private static final Logger logger = ();
private String serviceHost;
private int servicePort;
private ChannelFuture channelFuture;
private NettyRpcClientHandler nettyRpcClientHandler;
public NettyRpcClient(String serviceHost, int servicePort) {
= serviceHost;
= servicePort;
= new NettyRpcClientHandler();
}
public void connect() {
("connecting to Netty RPC server: " + serviceHost + ":" + servicePort);
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(eventLoopGroup)
.channel()
.option(ChannelOption.SO_KEEPALIVE, true)//Send a detection packet without communication for a long time
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
()
.addLast(new RpcEncoder())
.addLast(new RpcDecoder())
.addLast(new NettyRpcReadTimeoutHandler())
.addLast(nettyRpcClientHandler);
}
});
try {
if (serviceHost != null && !("")) {
//After establishing a connection through the connect() method, synchronous blocking will be performed through the sync() method
channelFuture = (serviceHost, servicePort).sync();
("successfully connected.");
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
().writeAndFlush(rpcRequest).sync();
RpcResponse rpcResponse = ();
("receives response from netty rpc server.");
if (()) {
return rpcResponse;
}
throw ();
}
}
Analysis of RPC remote call process of client
Execution logic of () method:
Note 1: After Netty's client and server establish a connection through the connect() method, it will block synchronously through the sync() method.
Note 2: RPC calls are actually written to the writeAndFlush() method of the Channel of the Netty client to write the request data, and also synchronously blocked through the sync() method, so that the response from the Netty server can be waited for, thereby obtaining the RPC call result.
Description 3: The request data written by writeAndFlush() will be processed through the pipeline of the client Channel, such as encoded into a binary byte array, and then transmitted to the server Channel.
Description 4: After receiving the requested data, the channel on the server will process it through its pipeline, such as decoding binary byte data into an object to reflect and call the corresponding method. Then the server encodes the result of the reflected call as response data and sends it back to the client. Finally, the response object obtained by the channel on the client receives the data decoded is the RPC call result.
public class NettyRpcClient {
...
//If you want to implement the timeout function, you need to set the initiation time of the remoteCall() method when it is executed
//Then calculate whether the timeout is in the channelRead() of NettyRpcClientHandler
public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
().writeAndFlush(rpcRequest).sync();
RpcResponse rpcResponse = ();
("receives response from netty rpc server.");
if (()) {
return rpcResponse;
}
throw ();
}
}
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = ();
private RpcResponse rpcResponse;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
= (RpcResponse) msg;
("Netty RPC client receives the response: " + rpcResponse);
}
public RpcResponse getRpcResponse() {
while (rpcResponse == null) {
try {
(5);
} catch (InterruptedException e) {
("wait for response interrupted", e);
}
}
return rpcResponse;
}
}
Encoding and decoder in network communication
(1) RPC's request response communication protocol
(2) Use Hessian for serialization and deserialization
(3) RPC encoder
(4) RPC decoder
(5) How to solve the problem of sticky and unpacking
(1) RPC's request response communication protocol
//RPC request
public class RpcRequest {
private String requestId;
private String className;
private String methodName;
private String[] parameterClasses;//parameter type
private Object[] parameters;//parameter value
private String invokerApplicationName;//The caller's service name
private String invokerIp;//The caller's IP address
...
}
//RPC response
public class RpcResponse {
private String requestId;
private boolean isSuccess;
private Object result;
private Throwable exception;
...
}
(2) Hessian serialization and deserialization
It is necessary to serialize the request object and the response object into a binary byte array, and deserialize the obtained binary byte array into a request object and a response object. Here, the Hessian framework is used to realize serialization and deserialization.
public class HessianSerialization {
//Serialization: Serialize objects into byte arrays
public static byte[] serialize(Object object) throws IOException {
//new one byte array output stream
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
//Output stream according to byte array new a Hessian serialized output stream
HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
//Use Hessian serialization output stream to write object
(object);
//Convert Hessian serialized output stream into byte array
byte[] bytes = ();
return bytes;
}
//Deserialization: Restore the byte array to an object
public static Object deserialize(byte[] bytes, Class clazz) throws IOException {
//Package a byte array input stream first
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
//Encapsulate the byte array input stream into the Hessian serialized input stream
HessianInput hessianInput = new HessianInput(byteArrayInputStream);
//Read an object from the Hessian serialization input stream
Object object = (clazz);
return object;
}
}
The following is a Hessian serialization test for RpcRequest and RpcResponse. Note: RpcRequest and RpcResponse must implement Serializable.
public class HessianSerializationTest {
public static void main(String[] args) throws Exception {
RpcRequest rpcRequest = new RpcRequest();//New first a RpcRequest object
(().toString().replace("-", ""));
("TestClass");
("sayHello");
(new String[]{"String"});
(new Object[]{"wjunt"});
("RpcClient");
("127.0.0.1");
byte[] bytes = (rpcRequest);//Serialize
();
RpcRequest deSerializedRpcRequest = (RpcRequest) (bytes, );
(deSerializedRpcRequest);
}
}
(3) RPC encoder
Coding can be understood as performing serialization operations, and decoding can be understood as performing deserialization operations.
The encoder RpcEncoder needs to inherit Netty's MessageToByteEncoder class, and the decoder RpcDecoder needs to inherit Netty's ByteToMessageDecoder class.
The logic of deserialization needs to be processed based on the encapsulation logic of the data during serialization. For example, the encoded data below is composed of byte array length + byte array, so deserialization requires writing the corresponding logic based on this.
public class RpcEncoder extends MessageToByteEncoder {
//The target class to be serialized
private Class<?> targetClass;
public RpcEncoder(Class<?> targetClass) {
= targetClass;
}
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
//Is the incoming object o an instance object of the class specified by Encoder?
if ((o)) {
byte[] bytes = (o);
//Write the serialized byte array into byteBuf
//First write the data length to byteBuf, this length is the length of bytes of 4 bytes
();
// Then write the complete bytes array to byteBuf
(bytes);
}
}
}
(4) RPC decoder
The main steps of the decoder are as follows:
Step 1: Message length verification and read index mark
Step 2: Verification of negative message length and unpacking verification
Step 3: Handling unpacking problems and resetting the read index
Step 4: Deserialize the byte array into the specified class
public class RpcDecoder extends ByteToMessageDecoder {
private static final int MESSAGE_LENGTH_BYTES = 4;
private static final int MESSAGE_LENGTH_VALID_MINIMUM_VALUE = 0;
private Class<?> targetClass;
public RpcDecoder(Class<?> targetClass) {
= targetClass;
}
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//1. Message length verification
//First check the number of bytes in the message length, which is the number of bytes currently readable bytes byteBuf, which must reach 4 bytes, and you can continue to move downwards at this time.
if (() < MESSAGE_LENGTH_BYTES) {
return;
}
//2. Read index mark
//For the readerIndex that can be read byteBuf, mark it, that is, read index mark it,
//In the future, you can use this mark mark to retrieve and restart the read reading to re-invest a readerIndex position before it.
();
//3. Read message length
//Read 4 bytes int, int represents the length of the message bytes
int messageLength = ();
//4. Verification of negative message length
//If the message length is less than 0 at this time, it means that the communication has already failed at this time.
if (messageLength < MESSAGE_LENGTH_VALID_MINIMUM_VALUE) {
();
}
//5. Unpacking and verification
//Judge whether the number of readable bytes is less than the message length. If so, unpacking occurs. You need to reset the read index of byteBuf and read it again next time.
//()Read after reading 4 bytes ()
//If the message byte data is not received in full at this time, then the number of bytes that can be read is smaller than the message byte length. This is the classic unpacking problem.
//At this time, you need to read the index and reset it, and no data processing will be performed this time
if (() < messageLength) {
();
//After unpacking occurs, wait for the next data input before analysis
//There is a for loop in the EventLoop that continuously listens to the Channel reading events;
//When the data is still being transmitted, since the transmission is a continuous process, the Channel will always generate read events during the transmission of data;
//In this process, as long as you cycle back and execute the judgment, you will definitely be satisfied with listening to the Channel's read event;
// Therefore, when the data has not been transferred yet, the for loop is executed to determine whether there is a Channel read event, and this unpacking problem will occur;
//So as long as the return is not processed and the read index is reset, the next time the for loop arrives, the read event of the Channel can be reprocessed;
return;
}
//6. Deserialize the byte array into the specified class
byte[] bytes = new byte[messageLength];
(bytes);
Object object = (bytes, targetClass);
(object);
}
}
(5) How to solve the problem of sticky and unpacking
First, when encoding a data packet, you need to add 4 bytes of int type at the beginning of the data packet. After that, any data packet must be read from the 4 bytes int() value, and then read the subsequent specified number of bytes according to the int value. All of them are read before it can be proved that a complete byte array is read. This solves the problem of sticky packet semi-package, and its principle is similar to the length domain-based decoder LengthFieldBasedDecoder.
Processing of RPC service provider on the server side
(1) NettyServer, RPC service provider
(2) Target method of request object based on reflection call
(1) NettyRpcServer, RPC service provider
public class ServiceConfig {
private String serviceName;//The caller's service name
private Class serviceInterfaceClass;//Service interface type
private Class serviceClass;
...
}
public class NettyRpcServer {
private static final Logger logger = ();
private static final int DEFAULT_PORT = 8998;
private List<ServiceConfig> serviceConfigs = new CopyOnWriteArrayList<ServiceConfig>();
private int port;
public NettyRpcServer(int port) {
= port;
}
public void start() {
("Netty RPC Server Starting...");
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossEventLoopGroup, workerEventLoopGroup)
.channel()
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
()
.addLast(new RpcDecoder())
.addLast(new RpcEncoder())
.addLast(new NettyRpcServerHandler(serviceConfigs));
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Until this step, the server has started and listened for the specified port number
ChannelFuture channelFuture = (port).sync();
("Netty RPC Server started successfully, listened[" + port + "]");
// Enter a blocking state, and the synchronization will be waited until your server end needs to be shut down.
().closeFuture().sync();
} catch (InterruptedException e) {
("Netty RPC Server failed to start, listened[" + port + "]");
} finally {
();
();
}
}
// Can proxy multiple services
public void addServiceConfig(ServiceConfig serviceConfig) {
(serviceConfig);
}
public static void main(String[] args) {
ServiceConfig serviceConfig = new ServiceConfig( "TestService", , );
NettyRpcServer nettyRpcServer = new NettyRpcServer(DEFAULT_PORT);
(serviceConfig);
();
}
}
(2) Target method of request object based on reflection call
//RpcRequest class needs to modify the field and adjust it to the following
public class RpcRequest implements Serializable {
private String requestId;
private String className;
private String methodName;
private Class[] parameterTypes;//parameter type
private Object[] args;//parameter value
private String invokerApplicationName;//The caller's service name
private String invokerIp;//The caller's IP address
...
}
public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = ();
private ConcurrentHashMap<String, ServiceConfig> serviceConfigMap = new ConcurrentHashMap<String, ServiceConfig>();
public NettyRpcServerHandler(List<ServiceConfig> serviceConfigs) {
for (ServiceConfig serviceConfig: serviceConfigs) {
String serviceInterfaceClass = ().getName();
(serviceInterfaceClass, serviceConfig);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest rpcRequest = (RpcRequest)msg;
("Netty RPC Server receives the request: " + rpcRequest);
RpcResponse rpcResponse = new RpcResponse();
(());
try {
//What do we want to achieve at this time?
//We need to obtain this class based on the class specified by RpcRequest
//Then construct this class object instance through reflection
//Then the method of the RpcRequest specified method and parameter type are obtained through reflection
// Finally, through reflection call, pass in the method, get the return value
//Get the name of the interface implementation class according to the interface name and then get the class
ServiceConfig serviceConfig = (());
Class clazz = ();
Object instance = ();
Method method = ((), ());
Object result = (instance, ());
//Package the rpc call result into the response
(result);
();
} catch(Exception e) {
("Netty RPC Server failed to respond to the request.", e);
();
(e);
}
(rpcResponse);
();
("send RPC response to client: " + rpcResponse);
}
}
The service call side realizes the timeout function
public class ReferenceConfig {
private static final long DEFAULT_TIMEOUT = 5000;
private static final String DEFAULT_SERVICE_HOST = "127.0.0.1";
private static final int DEFAULT_SERVICE_PORT = 8998;
private Class serviceInterfaceClass;
private String serviceHost;
private int servicePort;
private long timeout;
...
}
public class NettyRpcClient {
private static final Logger logger = ();
private ReferenceConfig referenceConfig;
private ChannelFuture channelFuture;
private NettyRpcClientHandler nettyRpcClientHandler;
public NettyRpcClient(ReferenceConfig referenceConfig) {
= referenceConfig;
= new NettyRpcClientHandler(());
}
public void connect() {
("connecting to Netty RPC server: " + () + ":" + ());
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(eventLoopGroup)
.channel()
.option(ChannelOption.SO_KEEPALIVE, true)//Send a detection packet without communication for a long time
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
()
.addLast(new RpcEncoder())
.addLast(new RpcDecoder())
.addLast(new NettyRpcReadTimeoutHandler(()))
.addLast(nettyRpcClientHandler);
}
});
try {
if (() != null && !().equals("")) {
channelFuture = ((), ()).sync();
("successfully connected.");
}
} catch(Exception e) {
throw new RuntimeException(e);
}
}
public RpcResponse remoteCall(RpcRequest rpcRequest) throws Throwable {
//Tag the time when the request was initiated
((), new Date().getTime());
().writeAndFlush(rpcRequest).sync();
RpcResponse rpcResponse = (());
("receives response from netty rpc server.");
if (()) {
return rpcResponse;
}
throw ();
}
}
public class NettyRpcReadTimeoutHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = ();
private long timeout;
public NettyRpcReadTimeoutHandler(long timeout) {
= timeout;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse)msg;
long requestTime = (());
long now = new Date().getTime();
if (now - requestTime >= timeout) {
(true);
("Netty RPC response is marked as timeout status: " + rpcResponse);
}
//Remove the flag of the initiating request time
(());
(rpcResponse);
}
}
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = ();
private static final long GET_RPC_RESPONSE_SLEEP_INTERVAL = 5;
private ConcurrentHashMap<String, RpcResponse> rpcResponse = new ConcurrentHashMap<String, RpcResponse>();
private long timeout;
public NettyRpcClientHandler(long timeout) {
= timeout;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse) msg;
if (()) {
("Netty RPC client receives the response timeout: " + rpcResponse);
} else {
((), rpcResponse);
("Netty RPC client receives the response: " + rpcResponse);
}
}
public RpcResponse getRpcResponse(String requestId) throws NettyRpcReadTimeoutException {
long waitStartTime = new Date().getTime();
while ((requestId) == null) {
try {
long now = new Date().getTime();
if (now - waitStartTime >= timeout) {
break;
}
(GET_RPC_RESPONSE_SLEEP_INTERVAL);
} catch (InterruptedException e) {
("wait for response interrupted", e);
}
}
RpcResponse rpcResponse = (requestId);
if (rpcResponse == null) {
("Get RPC response timeout.");
throw new NettyRpcReadTimeoutException("Get RPC response timeout.");
} else {
(requestId);
}
return rpcResponse;
}
}