I. Introduction

gRPC supports bidirectional stream calls and supports real-time push messages. This is also a major feature of gRPC, and gRPC is also very powerful in controlling the bidirectional flow.

2. What is a gRPC flow?

There are four service types for gRPC: Simple RPC (Unary RPC), Server streaming RPC (Client streaming RPC), Client streaming RPC (Client streaming RPC), and Bi-directional streaming RPC (Bi-directional streaming RPC). . They mainly have the following characteristics:

Service type

Simple RPC
General rpc call, passing in a request object, returning a return object

Server-side streaming RPC
Pass in a request object, the server can return multiple result objects

Client streaming RPC
The client passes in multiple request objects, and the server returns a result object.

Bidirectional flow RPC
Combined with client-side streaming RPC and server-side streaming RPC, multiple request objects can be passed in, and multiple result objects can be returned.

Why gRPC supports streaming

gRPC communication is based on HTTP/2, and its bidirectional flow maps to HTTP/2 streams. HTTP/2 has the concept of streaming, which is to implement HTTP/2 multiplexing. A stream is an independent bidirectional sequence used by a server and a client to exchange frame data in an HTTP/2 connection. It can be logically regarded as a relatively complete interactive processing unit, that is, expressing a complete resource request and response data exchange process; The processing unit is processed in one stream, and the flow life cycle is completed.

Features are as follows:

  • An HTTP/2 connection can hold multiple open streams at the same time, and any endpoint exchange frames
  • Streams can be created and used by clients or servers individually or shared
  • Stream can be closed by either end
  • Send and receive data in the stream in order
  • The natural identifier of the stream indicates that the interval is 1~2^31-1, and there is a terminal allocation for creating a stream.
  • Logical and logically parallel, independent existence

Using a bidirectional stream call in gRPC

Let’s talk about the bidirectional flow directly. According to the four service types listed in the second section, if we master the simple RPC and the bidirectional flow RPC, then the server-side flow RPC and client-side streaming RPC naturally have no problems.

the goal is to bathe multiple cats at once.

① First, in LuCat. proto defining the number two rpc, a Count statistics for cats, a two-way flow RPC Bath The Cat used to bathe a cat

syntax = "proto3";

option csharp_namespace = "AspNetCoregRpcService";

import "google/protobuf/empty.proto";
package LuCat; //package name

//define service
service LuCat{
    //define bidirectional rpc
    rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
    //defin count of cat rpc
    rpc Count(google.protobuf.Empty) returns (CountCatResult);

message SuckingCatResult{
    string message=1;
message BathTheCatReq{
    int32 id=1;
message BathTheCatResp{
    string message=1;
message CountCatResult{
    int32 Count=1;

2 Add service implementation

Here Amway is under the Resharper, very convenient


private readonly ILogger<LuCatService> _logger;
private static readonly List<string> Cats=new List<string>(){"one","two","three","four","five","six"};
private static readonly Random Rand=new Random(DateTime.Now.Millisecond);

public LuCatService(ILogger<LuCatService> logger)
    _logger = logger;

public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context)
    var bathQueue=new Queue<int>();
    while (await requestStream.MoveNext())
        //cat to queue

        _logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");

    while (bathQueue.TryDequeue(out var catId))
        await responseStream.WriteAsync(new BathTheCatResp() { Message = $"success one{Cats[catId]}洗了澡!" });

        await Task.Delay(500);//for test flow show purpose

public override Task<CountCatResult> Count(Empty request, ServerCallContext context)
    return Task.FromResult(new CountCatResult()
        Count = Cats.Count

The BathTheCat method will receive the CatIds sent by multiple clients, then add them to the queue, and then start to take a bath and return to the client after the client sends it.

3 client implementation

Randomly send 10 cat Ids to the server, and then receive 10 bath results.

var channel = GrpcChannel.ForAddress("https://localhost:5001");
var catClient = new LuCat.LuCatClient(channel);
//cats count
var catCount = await catClient.CountAsync(new Empty());
Console.WriteLine($"{catCount.Count} cats。");
var rand = new Random(DateTime.Now.Millisecond);

var bathCat = catClient.BathTheCat();
var bathCatRespTask = Task.Run(async() =>
    await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
for (int i = 0; i < 10; i++)
    await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});
await bathCat.RequestStream.CompleteAsync();
Console.WriteLine("client sent ten cats' id");
await bathCatRespTask;


4 running


It can be seen that the two-way stream call succeeds, the client sends 10 cat bath request objects, and the server returns 10 cat bath result objects. And it is pushed in real time, this is the bidirectional stream call of gRPC.

Here you can improve on your own to evolve into client-side streaming or server-side streaming calls. The client sends a list of cat IDs, and the server returns each cat bath result, which is the server-side streaming call. The client sends the cat Id in turn, and then the server returns all the cat’s bath results at once (to give all cats a bath as a business, returning the result of this service), which is the client stream call. I will not demonstrate it here.

V. Flow control

The streaming call of gRPC supports the control of active cancellation of the flow, which in turn can derive control such as flow timeout restrictions.

In the streaming call, you can pass a CancellationToken parameter, which is used to de-control the stream:


Transform our code in the fourth section:

1 client

var cts = new CancellationTokenSource();
//2.5s after canceled
var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);
var bathCatRespTask = Task.Run(async() =>
        await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
    catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
        Console.WriteLine("Stream cancelled.");

2 server

//for each
while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))
    _logger.LogInformation($"Cat {catId} Dequeue.");
    await responseStream.WriteAsync(new BathTheCatResp() { Message = $"success one {Cats[catId]} bathed!" });

    await Task.Delay(500);//

3 running


It is set to cancel the flow after two-way streaming 2.5s call. From the result of the client call, it does not receive the shower result of all 10 cats, and the flow has been canceled. This is the flow control of gRPC.


Here, the streaming call can realize real-time push, and the server can send messages to the client or the client to the server in real time, but this is different from the traditional long-link active push and broadcast messages, whether you are server-side, Whether the client is streaming or bidirectional, it must be initiated by the client to establish streaming communication through the client request.

Orignal link:https://www.cnblogs.com/stulzq/p/11590088.html