.Net Core RPC 从零开发到完整 案例版

谈论到实现RPC,不可避免的就要讨论一个问题,啥是gRpc,gRpc是由谷歌实现的一个PRC,为啥带了个g呢。

gPRC中的g也有global的意思,意思是全球化比较fashion,是一个高性能、开源和通用的 RPC 框架,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。面向服务端和移动端,基于 HTTP/2 设计,带来诸如双向流、流控、头部压缩、单 TCP 连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

这就是grpc 存在的优势了。.

我们自己实现的话,可以通过http实现通讯,也可以通过socket 实现,websocket实现。通讯协议,也可以自定义,也可以用json,也可以用谷歌的ProtoBuf来进行实现。

这都不是问题,但是,要向完整实现gRpc那种就难很多了。

但是,作为参考也是很不错的选择。

下面我就基于之前写过的Netty来作为通讯方式,以json为通讯数据格式,来实现一个完整的RPC案 例。

如果不会Netty的可以看之前的文章也可以自己查查资料,当然自己也通过其他方式实现,其实就是个通讯的通道。

实际项目

大概如下所示:

.Net Core RPC 从零开发到完整 案例版

实现了一个netty 服务,作为核心通讯服务。

实现了一个RPCServer 服务,作为RPC核心服务。

然后,服务端和客户端,应对的就是实际使用中的客户端和服务端。

其中,服务端与客户端,是通过 接口的方式,实现统一通讯协议格式的。

核心代码

代码有点多,我捡重要的写

InterfaceProxy.cs

    public class InterfaceProxy
    {
        private static readonly string DllName;
        private static ModuleBuilder ModuleBuilder = null;
        private static AssemblyBuilder AssemblyBuilder = null;

        private static readonly ConcurrentDictionary<Type, Type> Maps = new ConcurrentDictionary<Type, Type>();
        static InterfaceProxy()
        {
            var assemblyName = new AssemblyName("Rpc_InterfaceProxy");
            DllName = assemblyName.Name + ".dll";
            AssemblyBuilder = AssemblyBuilder.DefineDynamicAssembly(assemblyName, AssemblyBuilderAccess.RunAndCollect);
            ModuleBuilder = AssemblyBuilder.DefineDynamicModule(DllName);
        }
        public static T Resolve<T>(IRpcClient rpcClient = null) where T : class
        {
            var hanlder = new DefaultInvocationHandler<T>(rpcClient);
            var interfaceType = typeof(T);
            if (interfaceType?.IsInterface != true)
            {
                throw new ArgumentException("interfaceType");
            }
            Maps.TryGetValue(interfaceType, out Type newType);
            if (newType == null)
            {
                newType = CreateType(interfaceType);
                Maps.TryAdd(interfaceType, newType);
            }
            return (T)Activator.CreateInstance(newType, hanlder);
        }
        private static Type CreateType(Type interfaceType)
        {
            var tb = ModuleBuilder.DefineType(string.Format("{0}.{1}", typeof(InterfaceProxy).FullName, interfaceType.Name));
            tb.AddInterfaceImplementation(interfaceType);

            var fb = tb.DefineField("_handler", typeof(InvocationHandler), FieldAttributes.Private);

            CreateConstructor(tb, fb);
            CreateMethods(interfaceType, tb, fb);
            CreateProperties(interfaceType, tb, fb);

            return tb.CreateType();
        }
        private static void CreateConstructor(TypeBuilder tb, FieldBuilder fb)
        {
            var ctor = tb.DefineConstructor(MethodAttributes.Public, CallingConventions.Standard, new Type[] { typeof(InvocationHandler) });
            var il = ctor.GetILGenerator();

            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Call, typeof(object).GetConstructor(Type.EmptyTypes));
            il.Emit(OpCodes.Nop);
            il.Emit(OpCodes.Nop);

            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Ldarg_1);
            il.Emit(OpCodes.Stfld, fb);
            il.Emit(OpCodes.Ret);
        }

        private static void CreateMethods(Type interfaceType, TypeBuilder tb, FieldBuilder fb)
        {
            foreach (MethodInfo met in interfaceType.GetMethods())
            {
                if (met.Name.Contains("get_") || met.Name.Contains("set_"))
                {
                    continue;
                }
                CreateMethod(met, tb, fb);
            }
        }
        private static MethodBuilder CreateMethod(MethodInfo met, TypeBuilder tb, FieldBuilder fb)
        {
            var args = met.GetParameters();
            var mb = tb.DefineMethod(met.Name, MethodAttributes.Public | MethodAttributes.NewSlot | MethodAttributes.Virtual | MethodAttributes.Final | MethodAttributes.HideBySig,
                met.CallingConvention, met.ReturnType, args.Select(t => t.ParameterType).ToArray());
            var il = mb.GetILGenerator();

            il.DeclareLocal(typeof(object[]));
            if (met.ReturnType != typeof(void))
            {
                il.DeclareLocal(met.ReturnType);
            }

            il.Emit(OpCodes.Nop);
            il.Emit(OpCodes.Ldc_I4, args.Length);
            il.Emit(OpCodes.Newarr, typeof(object));

            for (int i = 0; i < args.Length; i++)
            {
                il.Emit(OpCodes.Dup);
                il.AddLdcI4(i);
                il.EmitLoadArg(i + 1);
                var type = args[i].ParameterType;
                if (type.IsValueType)
                {
                    il.Emit(OpCodes.Box, type);
                }
                il.Emit(OpCodes.Stelem_Ref);
            }

            il.Emit(OpCodes.Stloc_0);
            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Ldfld, fb);

            il.Emit(OpCodes.Ldarg_0);
            il.Emit(OpCodes.Ldc_I4, met.MetadataToken);
            il.Emit(OpCodes.Ldstr, met.DeclaringType?.FullName + "+" + met.Name);
            il.Emit(OpCodes.Ldloc_0);
            il.Emit(OpCodes.Callvirt, typeof(InvocationHandler).GetMethod(nameof(InvocationHandler.InvokeMember), BindingFlags.Instance | BindingFlags.Public));

            if (met.ReturnType == typeof(void))
            {
                il.Emit(OpCodes.Pop);
            }
            else
            {
                il.Emit(met.ReturnType.IsValueType ? OpCodes.Unbox_Any : OpCodes.Castclass, met.ReturnType);
                il.Emit(OpCodes.Stloc_1);
                il.Emit(OpCodes.Ldloc_1);
            }
            il.Emit(OpCodes.Ret);

            return mb;
        }
        private static void CreateProperties(Type interfaceType, TypeBuilder tb, FieldBuilder fb)
        {
            foreach (var prop in interfaceType.GetProperties())
            {
                var pb = tb.DefineProperty(prop.Name, PropertyAttributes.SpecialName, prop.PropertyType, Type.EmptyTypes);
                var met = prop.GetGetMethod();
                if (met != null)
                {
                    var mb = CreateMethod(met, tb, fb);
                    pb.SetGetMethod(mb);
                }
                met = prop.GetSetMethod();
                if (met != null)
                {
                    var mb = CreateMethod(met, tb, fb);
                    pb.SetSetMethod(mb);
                }
            }
        }
    }

DefaultInterfaceProxy.cs

此方法是实现核心通讯的方法。

    public class DefaultInvocationHandler<T> : InvocationHandler
    {
        readonly IRpcClient RpcClient;
        public DefaultInvocationHandler(IRpcClient rpcClient)
        {
            RpcClient = rpcClient;
        }
        public object InvokeMember(object sender, int methodId, string name, params object[] args)
        {
            var met = (MethodInfo)typeof(T).Module.ResolveMethod(methodId);
            var request = new RpcRequest();
            string[] names = name.Split('+');
            request.NameSpace = names[0];
            request.Method = names[1];
            if (args != null)
            {
                foreach (var arg in args)
                {
                    request.Parameters.Add(JsonConvert.SerializeObject(arg));
                }
                request.ParameterTypes.AddRange(met.GetParameters().Select(p => p.ParameterType.FullName));
            }
            var response = JsonConvert.DeserializeObject<RpcResponse>(RpcClient.Send(JsonConvert.SerializeObject(request)));// RpcConatiner.Invoke(request);
            if (response != null && response.Code == 0)
            {
                if (met.ReturnType == typeof(void))
                {
                    return null;
                }
                return JsonConvert.DeserializeObject(response.Response, met.ReturnType);
            }
            else
            {
                throw new Exception(response.Message);
            }
        }
    }

RpcConatiner.cs

    public static class RpcConatiner
    {
        private static ConcurrentDictionary<string, object> ServiceContainer = new ConcurrentDictionary<string, object>();
        private static ConcurrentDictionary<string, Type> ServiceTypeContainer = new ConcurrentDictionary<string, Type>();
        static RpcConatiner()
        {
            Initialize();
        }

        private static void Initialize()
        {
            try
            {
                foreach (var assembly in  AppDomain.CurrentDomain.GetAssemblies())
                {
                    foreach (var type in assembly.GetTypes())
                    {
                        RpcServiceAttribute attribute = type.GetCustomAttribute<RpcServiceAttribute>();
                        if (attribute != null)
                        {
                            var obj = Activator.CreateInstance(type);
                            foreach (var iInterface in type.GetInterfaces())
                            {
                                ServiceContainer.AddOrUpdate(iInterface.FullName, obj, (k, v) => obj);
                            }
                        }
                    }
                }
            }
            catch
            {
                throw;
            }
        }
        public static void RegisterService<IService, Service>() where Service : class, IService where IService : class
        {
            ServiceTypeContainer.AddOrUpdate(typeof(IService).FullName, typeof(Service), (k, v) => typeof(Service));
        }
        public static RpcResponse Invoke(RpcRequest request)
        {
            var rpcResponse = new RpcResponse() { Code = 1, Message = "未知错误", Request = request };
            rpcResponse.SetState($"{request.NameSpace} 命名空间未存在!");
            if (ServiceContainer.TryGetValue(request.NameSpace, out object obj))
            {
                rpcResponse.SetState($"{request.Method} 方法未存在!");
                try
                {
                    var Types = request.GetTypes();
                    var method = obj.GetType().GetMethod(request.Method, Types);
                    if (method != null)
                    {
                        rpcResponse.SetState($"执行结果异常!");
                        rpcResponse.Response = JsonConvert.SerializeObject(method.Invoke(obj, request.GetParameters(Types)));
                        rpcResponse.SetState(0, "成功");
                    }
                }
                catch (Exception ex)
                {
                    rpcResponse.SetState(-1, ex.Message);
                }
            }
            return rpcResponse;
        }
        private static Type[] GetTypes(this RpcRequest request)
        {
            var types = new List<Type>();
            foreach (var paramType in request.ParameterTypes)
            {
                Type type = GetType(paramType);
                if (type == null)
                {
                    return Type.EmptyTypes;
                }
                types.Add(type);
            }
            if (types.Any())
            {
                return types.ToArray();
            }
            return Type.EmptyTypes;
        }
        private static object[] GetParameters(this RpcRequest request, Type[] Types)
        {
            var list = new List<object>();
            if (Types == null || !Types.Any())
            {
                var types = request.GetTypes();
                if (types == null || !types.Any())
                {
                    return null;
                }
            }

            for (int i = 0; i < Types.Length; i++)
            {
                list.Add(JsonConvert.DeserializeObject(request.Parameters[i], Types[i]));
            }
            return list.ToArray();
        }
        private static Type GetType(string typeFullName)
        {
            if (ServiceTypeContainer.TryGetValue(typeFullName, out Type Type))
            {
                return Type;
            }
            foreach (Assembly ass in AppDomain.CurrentDomain.GetAssemblies())
            {
                Type type = ass.GetType(typeFullName);
                if (type != null)
                {
                    ServiceTypeContainer.TryAdd(type.FullName, type);
                    return type;
                }
            }
            return null;
        }
    }

服务端代码

        static void Main(string[] args)
        {
            Console.Title = "RpcServerDemo by 蓝创精英团队";
            var server = RpcServerFactory.GetServer(999);
            server.RegisterService<IDemo, Demo>();
            server.Start();

            Console.WriteLine("服务启动");
            Console.ReadLine();
        }

客户端代码

        static void Main(string[] args)
        {
            Console.Title = "RpcClientDemo by 蓝创精英团队";
            var client = RpcClientFactory.GetClient("127.0.0.1", 999);
            client.Start();
            Console.WriteLine("客户端开始连接!");

            var demo = client.Resolve<IDemo>();
            demo.Say();
            Console.WriteLine(demo.Say("123"));
            Console.WriteLine(demo.Say("demo", 6, new List<string>() { "6" }, Kind.b));
            Console.WriteLine("不错,完成了任务!");

            while (Console.ReadLine().StartsWith("Exit", StringComparison.InvariantCultureIgnoreCase))
            {
                break;
            }
            Console.ReadLine();
        }

实际效果

先启动服务端,然后,在启动客户端。

效果如下:

.Net Core RPC 从零开发到完整 案例版

可以看到,服务端获取的数据格式。

还有客户端获取到服务器的返回。

结束

至此,一个完整的RPC案例就搞完了。

你的支持就是我最大的动力!

代码地址

https://github.com/kesshei/RPCDemo2.git

https://gitee.com/kesshei/RPCDemo2.git