gRPC 四种模式

介绍

简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Client streaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。它们主要有以下特点:.

服务类型 特点
简单 (一元)RPC 一般的rpc调用,传入一个请求对象,返回一个返回对象
服务端流式   RPC 传入一个请求对象,服务端可以返回多个结果对象
客户端流式   RPC 客户端传入多个请求对象,服务端返回一个结果对象
双向流式   RPC 结合客户端流式RPC和服务端流式RPC,可以传入多个请求对象,返回多个结果对象

基本配置

组件包介绍

-- 服务端
Grpc.AspNetCore 一个用于在ASP.NET Core承载gRPC服务的框架,将 gRPC和ASP.NET Core 功能集成在一起,如:日志、依赖注入、身份认证和授权。

-- 客户端包
Google.Protobuf 包含适用于 C# 的 Protobuf 消息。
Grpc.Tools 包含适用于 Protobuf 文件的 C# 工具支持,将*.proto 文件生成 C# 资产。 运行时不需要工具包,因此依赖项标记为 PrivateAssets="All"。
Grpc.Net.Client,其中包含 .NET Core 客户端。
Grpc.Net.ClientFactory 与gRPC客户端集成的HttpClientFactory,允许对gRPC客户端进行集中配置,并使用DI注入到应用程序中

一个服务可以是服务端也可以是客户端

服务端基本配置

右键新建proto文件

syntax = "proto3"; //使用的协议

option csharp_namespace = "UserService.Proto"; //命名空间

service UserInfoService{ //服务名
    //登录 一元调用
	rpc Login(LoginRequest) returns(LoginResponse);
}

//登录请求类
message LoginRequest{
  string account=1;//帐号
  string password=2;//密码
}

//登录返回类
message LoginResponse{
  string name=1;//用户名
}

引用组件

<PackageReference Include="Grpc.AspNetCore" Version="2.38.0" />

ConfigureServices中配置

// 默认配置
services.AddGrpc();

或者

//自定义配置
services.AddGrpc(options =>
{
    options.EnableDetailedErrors = true;//启用想起错误
    options.MaxReceiveMessageSize = 2 * 1024 * 1024;//最大接受 2m
    options.MaxSendMessageSize = 5 * 1024 * 1024;//最大请求 5m
});

Configure中配置

app.UseEndpoints(endpoints =>
{
    endpoints.MapGrpcService<PayService>();// 配置指定实现
    endpoints.MapControllers();
});

客户端基本配置

引用组件

<ItemGroup>
  <PackageReference Include="Google.Protobuf" Version="3.19.1" />
  <PackageReference Include="Grpc.Net.ClientFactory" Version="2.40.0" />
  <PackageReference Include="Grpc.Tools" Version="2.42.0">
    <PrivateAssets>all</PrivateAssets>
    <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
  </PackageReference>
  <PackageReference Include="Grpc.Net.Client" Version="2.40.0" />
</ItemGroup>

ConfigureServices中配置

// 默认配置
services.AddGrpc();

//自定义配置
services.AddGrpc(options =>
{
    options.EnableDetailedErrors = true;//启用想起错误
    options.MaxReceiveMessageSize = 2 * 1024 * 1024;//最大接受 2m
    options.MaxSendMessageSize = 5 * 1024 * 1024;//最大请求 5m
});

// 要调用的grpc服务
services.AddGrpcClient<RPC.Protos.Two.Order.OrderClient>(o => o.Address = new Uri("https://localhost:6001"));

操作

以下的使用场景仅供参考,因为工作中我也只用过简单rpc模式。

简单RPC

实现需求:订单服务根据用户ID获取用户的详细信息数据。

右键添加proto文件并命名为UserInfoService.proto

syntax = "proto3"; //使用的协议

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option csharp_namespace = "UserService.Proto"; //命名空间

service UserInfoService{ //服务名
    //获取用户信息 一元调用
 rpc GetUserInfo(GetUserInfoRequest) returns(GetUserInfoResponse);
}

//获取用户信息请求类
message GetUserInfoRequest{
  string user_id=1;//用户ID
}

//获取用户信息返回类
message GetUserInfoResponse{
  string name=1;//用户名
  string nick_name=2;//昵称
  google.protobuf.Int32Value sex=3;//可为空性别 1男 2女 空是未知
  google.protobuf.Timestamp birthday=4;//生日
  bool isenabled=5;//是否启用
}

服务端

安装nuget包

<ItemGroup>
  <PackageReference Include="Grpc.AspNetCore" Version="2.40.0" />
</ItemGroup>

修改csproj文件去引用proto文件并且标识为服务端GrpcServices="Server"

<ItemGroup>
  <Protobuf Include="Proto\UserInfoService.proto" GrpcServices="Server" />
</ItemGroup>

也可以将proto都存放在同一个固定位置让不同的程序引用,使用

<ItemGroup>
  <Protobuf Include="..\ShareProto\ServiceTwo\*.proto" GrpcServices="Server" Link="Protos\Server\%(RecursiveDir)%(Filename)%(Extension)" />
</ItemGroup>      

新建UserInfoGrpcService继承自我们的UserInfoService grpc服务,并且重写GetUserInfo方法

public class UserInfoGrpcService : UserInfoServiceBase
{
    /// <summary>
    ///根据用户id获取用户信息(简单rpc)
    /// </summary>
    /// <param name="request"></param>
    /// <param name="context"></param>
    /// <returns></returns>
    public override Task<GetUserInfoResponse> GetUserInfo(GetUserInfoRequest request, ServerCallContext context)
    {
        var userId = request.UserId;

        Console.WriteLine("接收到的用户Id是" + userId);

        //根据用户ID去查询数据库返回结果

        return Task.FromResult(new GetUserInfoResponse
        {
            Name = "admin",
            NickName = "好好学习天天向上",
            Sex = null,
            Birthday = Timestamp.FromDateTime(DateTime.UtcNow),
            Isenabled = true
        });
    }
}

在startop注册grpc服务

//启用grpc
services.AddGrpc(options =>
{
    options.EnableDetailedErrors = true;//启用内部错误输出详情
    options.MaxReceiveMessageSize = 2 * 1024 * 1024;//最大接受 2m
    options.MaxSendMessageSize = 5 * 1024 * 1024;//最大请求 5m
});

配置指定grpc并且暴露出grpc服务

app.UseEndpoints(endpoints =>
{
    endpoints.MapGrpcService<UserInfoGrpcService>();
    endpoints.MapControllers();
});

项目目录下可以看到proto文件生成的类

gRPC 四种模式
img

客户端

安装nuget包

<ItemGroup>
  <PackageReference Include="Google.Protobuf" Version="3.19.1" />
  <PackageReference Include="Grpc.Net.ClientFactory" Version="2.40.0" />
  <PackageReference Include="Grpc.Tools" Version="2.42.0">
    <PrivateAssets>all</PrivateAssets>
    <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
  </PackageReference>
  <PackageReference Include="Grpc.Net.Client" Version="2.40.0" />
</ItemGroup>

引用上面的grpc文件,修改csproj文件并标识为客户端GrpcServices="Client"

//本来暂且将proto文件拷贝到客户端服务,然后设置
<ItemGroup>
  <Protobuf Include="Proto\UserInfoService.proto" GrpcServices="Client" />
</ItemGroup>


//或者将proto文件放到一个共享目录,然后采用
<ItemGroup>
    <Protobuf Include="..\ShareProto\ServiceTwo\*.proto" GrpcServices="Client" Link="Protos\Client\%(RecursiveDir)%(Filename)%(Extension)" />
</ItemGroup>

ConfigureServices添加该grpc服务,服务端地址为localhost:5001

services.AddGrpcClient<UserInfoService.UserInfoServiceClient>(o =>
{
    o.Address = new Uri("https://localhost:5001");//协议必须是https
});

这个UserInfoServiceClient文件就是我们刚才引入的proto文件生成的代码

调用测试

[ApiController]
[Route("[controller]")]
public class HomeController : ControllerBase
{
    private readonly UserInfoService.UserInfoServiceClient _userInfoServiceClient;

    public HomeController(UserInfoService.UserInfoServiceClient userInfoServiceClient)
    {
        _userInfoServiceClient = userInfoServiceClient;
    }

    [HttpGet]
    public async Task<GetUserInfoResponse> Get(string userId)
    {
        var result = await _userInfoServiceClient.GetUserInfoAsync(new GetUserInfoRequest { UserId = userId });
        return result;
    }
}
gRPC 四种模式
img

下面的部分代码如上,因此省略。

服务端流式RPC

修改之前的UserInfoService.proto文件,增加一个方法参数是一个请求对象,服务器将多个结果流式传回调用方。

syntax = "proto3"; //使用的协议

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option csharp_namespace = "UserService.Proto"; //命名空间

service UserInfoService{ //服务名
    
 //获取所有信息   服务器流式处理:从客户端发送请求消息开始 
 //ResponseStream.MoveNext() 读取从服务流式处理的消息。 ResponseStream.MoveNext() 返回 false 时,服务器流式处理调用已完成。
 rpc GetAll(GetAllRequest) returns(stream CustomerResponse);
}

message GetAllRequest { }

message CustomerResponse{
 CustomerInfo customer=1;//用户信息
}

message CustomerInfo{
  string account=1;//帐号
  string name=3;//姓名
  int32 sex=4;//性别
}

服务端

继承自UserInfoServiceBase并重写GetAll方法

public class UserInfoGrpcService : UserInfoServiceBase
{
    /// <summary>
    /// 获取所有用户信息(服务端流式rpc)
    /// </summary>
    /// <param name="request"></param>
    /// <param name="responseStream"></param>
    /// <param name="context"></param>
    /// <returns></returns>
    public override async Task GetAll(GetAllRequest request, IServerStreamWriter<CustomerResponse> responseStream, ServerCallContext context)
    {
        for (int i = 0; i < 5; i++)
        {
            if (!context.CancellationToken.IsCancellationRequested)
            {
                await responseStream.WriteAsync(new CustomerResponse
                {
                    Customer = new CustomerInfo
                    {
                        Account = "admin" + i,
                        Name = "张三" + i,
                        Sex = 1
                    }
                }).ConfigureAwait(false);
                await Task.Delay(System.TimeSpan.FromSeconds(1));
            }
        }
    }
}

客户端

修改之前的订单服务引用的proto文件,新建一个接口用于获取grpc服务

/// <summary>
/// 获取所有的用户信息(服务端流式处理)
/// </summary>
/// <returns></returns>
[HttpGet]
public async Task<ActionResult<IEnumerable<CustomerResponse>>> GetAllUserInfoAsync()
{
    var result = new List<CustomerResponse>();
    var cancel = new CancellationToken();
    var request = new GetAllRequest();
    var response = _userInfoServiceClient.GetAll(request);
    while (await response.ResponseStream.MoveNext(cancel))
    {
        result.Add(response.ResponseStream.Current);
    }

    return Ok(result);
}

返回结果

gRPC 四种模式
img

客户端流式RPC

客户端将连续的数据流发送到服务端,服务端返回一个响应信息,使用场景:根据id返回用户列表信息,分段上传图片

syntax = "proto3"; //使用的协议

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option csharp_namespace = "UserService.Proto"; //命名空间

service UserInfoService{ //服务名

 //批量获取用户信息  客户端流式处理:多个请求对象,服务器返回一个结果
 rpc GetUserListByUserId(stream GetUserListRequest) returns (GetUserListResponse);
}

message CustomerInfo{
  string user_id=1;//用户id
  string account=2;//帐号
  string name=3;//姓名
  int32 sex=4;//性别
}

message GetUserListRequest
{
    string  user_id = 1;
}

message GetUserListResponse
{
 repeated CustomerInfo  user_list = 1;//用户列表
}

服务端

为了操作数据,这点新建了一个userInfo的类

public class UserInfo
{
    /// <summary>
    ///用户ID
    /// </summary>
    public string UserId { get; set; }

    /// <summary>
    /// 帐号
    /// </summary>
    public string Acount { get; set; }

    /// <summary>
    /// 姓名
    /// </summary>
    public string Name { get; set; }

    /// <summary>
    /// 性别
    /// </summary>
    public int Sex { get; set; }
}

继承自UserInfoServiceBase并重写GetUserListByUserId方法

public class UserInfoGrpcService : UserInfoServiceBase
{
    /// <summary>
    /// 根据用户id获取用户列表
    /// </summary>
    /// <param name="requestStream"></param>
    /// <param name="context"></param>
    /// <returns></returns>
    public override async Task<GetUserListResponse> GetUserListByUserId(
        IAsyncStreamReader<GetUserListRequest> requestStream, ServerCallContext context)
    {
        //模拟数据库查询
        var userList = new List<UserInfo>
        {
            new UserInfo { UserId = "11", Acount = "admin1", Name = "张三", Sex = 1 },
            new UserInfo { UserId = "22", Acount = "admin2", Name = "李四", Sex = 0 },
            new UserInfo { UserId = "33", Acount = "admin3", Name = "王五", Sex = 1 },
            new UserInfo { UserId = "44", Acount = "admin4", Name = "赵六", Sex = 0 },
        };

        //获取请求的用户ID列表
        var userIds = new List<string>();
        var cancel = context.CancellationToken;
        while (await requestStream.MoveNext(cancel))
        {
            userIds.Add(requestStream.Current.UserId);
        }

        var resultList = userList.Where(t => userIds.Contains(t.UserId));

        //拼接返回列表
        var result = new GetUserListResponse();
        foreach (var item in resultList)
        {
            result.UserList.Add(new CustomerInfo
            {
                UserId = item.UserId,
                Account = item.Acount,
                Name = item.Name,
                Sex = item.Sex
            });
        }

        return result;
    }
}

客户端

修改之前的订单服务引用的proto文件,新建一个接口GetUserListByUserIdAsync用于获取grpc服务

/// <summary>
/// 根据用户id获取用户信息(客户端流式处理) 
/// </summary>
/// <param name="userIds"></param>
/// <returns></returns>
[HttpPost]
public async Task<ActionResult<RepeatedField<CustomerInfo>>> GetUserListByUserIdAsync(
    IEnumerable<string> userIds)
{
    using var call = _userInfoServiceClient.GetUserListByUserId();
    var clientStream = call.RequestStream;
    //循环给服务端发送流
    foreach (var item in userIds)
    {
        await clientStream.WriteAsync(new GetUserListRequest { UserId = item });
    }

    // 发送完成之后,告诉服务端发送完成
    await clientStream.CompleteAsync();
    // 接收返回结果,并返回
    var res = await call.ResponseAsync;
    return Ok(res.UserList);
}

返回结果

gRPC 四种模式
img

双向流式RPC

请求和响应都通过流的方式进行交互

示例:通过流的方式添加用户并返回用户ID

修改之前的UserInfoService.proto文件,增加一个方法入参是连续流,服务器将多个结果流式传回调用方。

syntax = "proto3"; //使用的协议

import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

option csharp_namespace = "UserService.Proto"; //命名空间

service UserInfoService{ //服务名

 //批量添加用户 并返回用户ID   双向流模式,多个请求,返回多个信息
 rpc AddUserList(stream AddUserListRequest) returns (stream AddUserListResponse);
}

message AddUserListRequest
{
  string account=1;//帐号
  string name=2;//姓名
  int32 sex=3;//性别
}

message AddUserListResponse
{
    string user_id=1;//用户id
}

服务端

继承自UserInfoServiceBase并重写AddUserList方法

public class UserInfoGrpcService : UserInfoServiceBase
{
    /// <summary>
    /// 批量添加用户信息(双向流式处理) 
    /// </summary>
    /// <param name="requestStream"></param>
    /// <param name="responseStream"></param>
    /// <param name="context"></param>
    /// <returns></returns>
    public override async Task AddUserList(IAsyncStreamReader<AddUserListRequest> requestStream,
        IServerStreamWriter<AddUserListResponse> responseStream, ServerCallContext context)
    {
        //获取请求的用户ID列表
        var userInfos = new List<UserInfo>();
        var cancel = context.CancellationToken;
        while (await requestStream.MoveNext(cancel))
        {
            userInfos.Add(new UserInfo
            {
                UserId = Guid.NewGuid().ToString(),
                Acount = requestStream.Current.Account,
                Name = requestStream.Current.Name,
                Sex = requestStream.Current.Sex
            });
        }

        //返回用户Id
        foreach (var item in userInfos)
        {
            await responseStream.WriteAsync(new AddUserListResponse
            {
                UserId = item.UserId
            }).ConfigureAwait(false);
        }
    }
}

客户端

修改之前的订单服务引用的proto文件,新建一个接口用于获取grpc服务

/// <summary>
/// 根据用户id获取用户信息(双向流) 
/// </summary>
/// <param name="userIds"></param>
/// <returns></returns>
[HttpGet]
public async Task<ActionResult<RepeatedField<CustomerInfo>>> AddUserList()
{
    //模拟用户添加的数据
    var userList = new List<UserInfo>
    {
        new UserInfo { UserId = "11", Acount = "admin1", Name = "张三", Sex = 1 },
        new UserInfo { UserId = "22", Acount = "admin2", Name = "李四", Sex = 0 },
        new UserInfo { UserId = "33", Acount = "admin3", Name = "王五", Sex = 1 },
        new UserInfo { UserId = "44", Acount = "admin4", Name = "赵六", Sex = 0 },
    };

    using var call = _userInfoServiceClient.AddUserList();
    var clientStream = call.RequestStream;
    //循环给服务端发送流
    foreach (var item in userList)
    {
        await clientStream.WriteAsync(new AddUserListRequest
        {
            Account = item.Acount,
            Name = item.Name,
            Sex = item.Sex
        });
    }

    // 发送完成之后,告诉服务端发送完成
    await clientStream.CompleteAsync();


    var userIds = new List<string>();
    var cancel = new CancellationToken();
    // 接收返回结果,并返回
    while (await call.ResponseStream.MoveNext(cancel))
    {
        userIds.Add(call.ResponseStream.Current.UserId);
    }

    return Ok(userIds);
}

返回结果

gRPC 四种模式