谈论到实现RPC,不可避免的就要讨论一个问题,啥是gRpc,gRpc是由谷歌实现的一个PRC,为啥带了个g呢。
gPRC中的g也有global的意思,意思是全球化比较fashion,是一个高性能、开源和通用的 RPC 框架,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。面向服务端和移动端,基于 HTTP/2 设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。
这就是grpc 存在的优势了。.
我们自己实现的话,可以通过http实现通讯,也可以通过socket 实现,websocket实现。通讯协议,也可以自定义,也可以用json,也可以用谷歌的ProtoBuf来进行实现。
这都不是问题,但是,要向完整实现gRpc那种就难很多了。
但是,作为参考也是很不错的选择。
下面我就基于之前写过的Netty来作为通讯方式,以json为通讯数据格式,来实现一个完整的RPC案 例。
如果不会Netty的可以看之前的文章也可以自己查查资料,当然自己也通过其他方式实现,其实就是个通讯的通道。
实际项目
大概如下所示:

实现了一个netty 服务,作为核心通讯服务。
实现了一个RPCServer 服务,作为RPC核心服务。
然后,服务端和客户端,应对的就是实际使用中的客户端和服务端。
其中,服务端与客户端,是通过 接口的方式,实现统一通讯协议格式的。
核心代码
代码有点多,我捡重要的写
InterfaceProxy.cs
public class InterfaceProxy
{
private static readonly string DllName;
private static ModuleBuilder ModuleBuilder = null;
private static AssemblyBuilder AssemblyBuilder = null;
private static readonly ConcurrentDictionary<Type, Type> Maps = new ConcurrentDictionary<Type, Type>();
static InterfaceProxy()
{
var assemblyName = new AssemblyName("Rpc_InterfaceProxy");
DllName = assemblyName.Name + ".dll";
AssemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.RunAndCollect);
ModuleBuilder = AssemblyBuilder.DefineDynamicModule(DllName);
}
public static T Resolve<T>(IRpcClient rpcClient = null) where T : class
{
var hanlder = new DefaultInvocationHandler<T>(rpcClient);
var interfaceType = typeof(T);
if (interfaceType?.IsInterface != true)
{
throw new ArgumentException("interfaceType");
}
Maps.TryGetValue(interfaceType, out Type newType);
if (newType == null)
{
newType = CreateType(interfaceType);
Maps.TryAdd(interfaceType, newType);
}
return (T)Activator.CreateInstance(newType, hanlder);
}
private static Type CreateType(Type interfaceType)
{
var tb = ModuleBuilder.DefineType(string.Format("{0}.{1}", typeof(InterfaceProxy).FullName, interfaceType.Name));
tb.AddInterfaceImplementation(interfaceType);
var fb = tb.DefineField("_handler", typeof(InvocationHandler), FieldAttributes.Private);
CreateConstructor(tb, fb);
CreateMethods(interfaceType, tb, fb);
CreateProperties(interfaceType, tb, fb);
return tb.CreateType();
}
private static void CreateConstructor(TypeBuilder tb, FieldBuilder fb)
{
var ctor = tb.DefineConstructor(MethodAttributes.Public, CallingConventions.Standard, new Type[] { typeof(InvocationHandler) });
var il = ctor.GetILGenerator();
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Call, typeof(object).GetConstructor(Type.EmptyTypes));
il.Emit(OpCodes.Nop);
il.Emit(OpCodes.Nop);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldarg_1);
il.Emit(OpCodes.Stfld, fb);
il.Emit(OpCodes.Ret);
}
private static void CreateMethods(Type interfaceType, TypeBuilder tb, FieldBuilder fb)
{
foreach (MethodInfo met in interfaceType.GetMethods())
{
if (met.Name.Contains("get_") || met.Name.Contains("set_"))
{
continue;
}
CreateMethod(met, tb, fb);
}
}
private static MethodBuilder CreateMethod(MethodInfo met, TypeBuilder tb, FieldBuilder fb)
{
var args = met.GetParameters();
var mb = tb.DefineMethod(met.Name, MethodAttributes.Public | MethodAttributes.NewSlot | MethodAttributes.Virtual | MethodAttributes.Final | MethodAttributes.HideBySig,
met.CallingConvention, met.ReturnType, args.Select(t => t.ParameterType).ToArray());
var il = mb.GetILGenerator();
il.DeclareLocal(typeof(object[]));
if (met.ReturnType != typeof(void))
{
il.DeclareLocal(met.ReturnType);
}
il.Emit(OpCodes.Nop);
il.Emit(OpCodes.Ldc_I4, args.Length);
il.Emit(OpCodes.Newarr, typeof(object));
for (int i = 0; i < args.Length; i++)
{
il.Emit(OpCodes.Dup);
il.AddLdcI4(i);
il.EmitLoadArg(i + 1);
var type = args[i].ParameterType;
if (type.IsValueType)
{
il.Emit(OpCodes.Box, type);
}
il.Emit(OpCodes.Stelem_Ref);
}
il.Emit(OpCodes.Stloc_0);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, fb);
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldc_I4, met.MetadataToken);
il.Emit(OpCodes.Ldstr, met.DeclaringType?.FullName + "+" + met.Name);
il.Emit(OpCodes.Ldloc_0);
il.Emit(OpCodes.Callvirt, typeof(InvocationHandler).GetMethod(nameof(InvocationHandler.InvokeMember), BindingFlags.Instance | BindingFlags.Public));
if (met.ReturnType == typeof(void))
{
il.Emit(OpCodes.Pop);
}
else
{
il.Emit(met.ReturnType.IsValueType ? OpCodes.Unbox_Any : OpCodes.Castclass, met.ReturnType);
il.Emit(OpCodes.Stloc_1);
il.Emit(OpCodes.Ldloc_1);
}
il.Emit(OpCodes.Ret);
return mb;
}
private static void CreateProperties(Type interfaceType, TypeBuilder tb, FieldBuilder fb)
{
foreach (var prop in interfaceType.GetProperties())
{
var pb = tb.DefineProperty(prop.Name, PropertyAttributes.SpecialName, prop.PropertyType, Type.EmptyTypes);
var met = prop.GetGetMethod();
if (met != null)
{
var mb = CreateMethod(met, tb, fb);
pb.SetGetMethod(mb);
}
met = prop.GetSetMethod();
if (met != null)
{
var mb = CreateMethod(met, tb, fb);
pb.SetSetMethod(mb);
}
}
}
}
DefaultInterfaceProxy.cs
此方法是实现核心通讯的方法。
public class DefaultInvocationHandler<T> : InvocationHandler
{
readonly IRpcClient RpcClient;
public DefaultInvocationHandler(IRpcClient rpcClient)
{
RpcClient = rpcClient;
}
public object InvokeMember(object sender, int methodId, string name, params object[] args)
{
var met = (MethodInfo)typeof(T).Module.ResolveMethod(methodId);
var request = new RpcRequest();
string[] names = name.Split('+');
request.NameSpace = names[0];
request.Method = names[1];
if (args != null)
{
foreach (var arg in args)
{
request.Parameters.Add(JsonConvert.SerializeObject(arg));
}
request.ParameterTypes.AddRange(met.GetParameters().Select(p => p.ParameterType.FullName));
}
var response = JsonConvert.DeserializeObject<RpcResponse>(RpcClient.Send(JsonConvert.SerializeObject(request)));// RpcConatiner.Invoke(request);
if (response != null && response.Code == 0)
{
if (met.ReturnType == typeof(void))
{
return null;
}
return JsonConvert.DeserializeObject(response.Response, met.ReturnType);
}
else
{
throw new Exception(response.Message);
}
}
}
RpcConatiner.cs
public static class RpcConatiner
{
private static ConcurrentDictionary<string, object> ServiceContainer = new ConcurrentDictionary<string, object>();
private static ConcurrentDictionary<string, Type> ServiceTypeContainer = new ConcurrentDictionary<string, Type>();
static RpcConatiner()
{
Initialize();
}
private static void Initialize()
{
try
{
foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies())
{
foreach (var type in assembly.GetTypes())
{
RpcServiceAttribute attribute = type.GetCustomAttribute<RpcServiceAttribute>();
if (attribute != null)
{
var obj = Activator.CreateInstance(type);
foreach (var iInterface in type.GetInterfaces())
{
ServiceContainer.AddOrUpdate(iInterface.FullName, obj, (k, v) => obj);
}
}
}
}
}
catch
{
throw;
}
}
public static void RegisterService<IService, Service>() where Service : class, IService where IService : class
{
ServiceTypeContainer.AddOrUpdate(typeof(IService).FullName, typeof(Service), (k, v) => typeof(Service));
}
public static RpcResponse Invoke(RpcRequest request)
{
var rpcResponse = new RpcResponse() { Code = 1, Message = "未知错误", Request = request };
rpcResponse.SetState($"{request.NameSpace} 命名空间未存在!");
if (ServiceContainer.TryGetValue(request.NameSpace, out object obj))
{
rpcResponse.SetState($"{request.Method} 方法未存在!");
try
{
var Types = request.GetTypes();
var method = obj.GetType().GetMethod(request.Method, Types);
if (method != null)
{
rpcResponse.SetState($"执行结果异常!");
rpcResponse.Response = JsonConvert.SerializeObject(method.Invoke(obj, request.GetParameters(Types)));
rpcResponse.SetState(0, "成功");
}
}
catch (Exception ex)
{
rpcResponse.SetState(-1, ex.Message);
}
}
return rpcResponse;
}
private static Type[] GetTypes(this RpcRequest request)
{
var types = new List<Type>();
foreach (var paramType in request.ParameterTypes)
{
Type type = GetType(paramType);
if (type == null)
{
return Type.EmptyTypes;
}
types.Add(type);
}
if (types.Any())
{
return types.ToArray();
}
return Type.EmptyTypes;
}
private static object[] GetParameters(this RpcRequest request, Type[] Types)
{
var list = new List<object>();
if (Types == null || !Types.Any())
{
var types = request.GetTypes();
if (types == null || !types.Any())
{
return null;
}
}
for (int i = 0; i < Types.Length; i++)
{
list.Add(JsonConvert.DeserializeObject(request.Parameters[i], Types[i]));
}
return list.ToArray();
}
private static Type GetType(string typeFullName)
{
if (ServiceTypeContainer.TryGetValue(typeFullName, out Type Type))
{
return Type;
}
foreach (Assembly ass in AppDomain.CurrentDomain.GetAssemblies())
{
Type type = ass.GetType(typeFullName);
if (type != null)
{
ServiceTypeContainer.TryAdd(type.FullName, type);
return type;
}
}
return null;
}
}
服务端代码
static void Main(string[] args)
{
Console.Title = "RpcServerDemo by 蓝创精英团队";
var server = RpcServerFactory.GetServer(999);
server.RegisterService<IDemo, Demo>();
server.Start();
Console.WriteLine("服务启动");
Console.ReadLine();
}
客户端代码
static void Main(string[] args)
{
Console.Title = "RpcClientDemo by 蓝创精英团队";
var client = RpcClientFactory.GetClient("127.0.0.1", 999);
client.Start();
Console.WriteLine("客户端开始连接!");
var demo = client.Resolve<IDemo>();
demo.Say();
Console.WriteLine(demo.Say("123"));
Console.WriteLine(demo.Say("demo", 6, new List<string>() { "6" }, Kind.b));
Console.WriteLine("不错,完成了任务!");
while (Console.ReadLine().StartsWith("Exit", StringComparison.InvariantCultureIgnoreCase))
{
break;
}
Console.ReadLine();
}
实际效果
先启动服务端,然后,在启动客户端。
效果如下:

可以看到,服务端获取的数据格式。
还有客户端获取到服务器的返回。
结束
至此,一个完整的RPC案例就搞完了。
你的支持就是我最大的动力!
代码地址
https://github.com/kesshei/RPCDemo2.git
https://gitee.com/kesshei/RPCDemo2.git