  • Unary RPC 单个请求–单个响应
  • Server Streaming RPC 多个请求–单个响应
  • Client Streaming RPC 单个请求–多个响应
  • Bidriectional Streaming RPC 多个请求–多个响应

1.1 举例:编写



syntax = "proto3"; // 声明编译器用的是prototype3package person;option go_package="/projectRPCTest3/pb/person;person";message PersonReq{string name =1;int32 age = 2;
}message PersonRes{string name = 1;int32 age = 2;
}service SearchService {rpc Search(PersonReq) returns (PersonRes);rpc SearchIn(stream PersonReq) returns(PersonRes);rpc SearchOut(PersonReq) returns (stream PersonRes);rpc SearchIO(stream PersonReq) returns (stream PersonRes);


cd .\pb
cd .\person\
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\person.proto


  • Search将收到的请求进行返回
  • SearchIn 多个请求过来统一做回复
  • SearchOut 单个请求过来,多次响应
  • SearchIO 多个请求过来,多个响应
package mainimport ("context""fmt""time""net""projectRPCTest3/pb/person""google.golang.org/grpc"
)type personServe struct {person.UnimplementedSearchServiceServer
}func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {name := req.GetName()res := &person.PersonRes{Name: "我收到了" + name + "的信息"}return res, nil
func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {for {req, err := server.Recv()fmt.Println(req)if err != nil {server.SendAndClose(&person.PersonRes{Name: "success"})break}}return nil
}func (*personServe) SearchOut(req *person.PersonReq, server grpc.ServerStreamingServer[person.PersonRes]) error {name := req.Namei := 0for {if i > 10 {return nil}time.Sleep(1 * time.Second)server.Send(&person.PersonRes{Name: "I got it" + name})i++}}
func (*personServe) SearchIO(server grpc.BidiStreamingServer[person.PersonReq, person.PersonRes]) error {str := make(chan string)go func() {for {req, err := server.Recv()if err != nil {str <- "Result"break}str <- req.Name}}()for {s := <-strif s == "Result" {break}server.Send(&person.PersonRes{Name: "I got it" + s})}return nil
}func main() {l, _ := net.Listen("tcp", ":8888")s := grpc.NewServer()person.RegisterSearchServiceServer(s, &personServe{})s.Serve(l)


wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})if err != nil {fmt.Println(err)}wg.Done()fmt.Println(search.GetName())



c, _ := client.SearchIn(context.Background())
i := 0
for {if i > 10 {res, _ := c.CloseAndRecv()fmt.Println(res)break}time.Sleep(1 * time.Second)c.Send(&person.PersonReq{Name: "client send message...."})i++}



c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})
for {req, err := c.Recv()if err != nil {fmt.Println(err)break}fmt.Println(req)



c, _ := client.SearchIO(context.Background())
wg := sync.WaitGroup{}
go func() {for {err := c.Send(&person.PersonReq{Name: " hello"})time.Sleep(2 * time.Second)if err != nil {wg.Done()break}}
go func() {for {req, err := c.Recv()fmt.Println(req)if err != nil {fmt.Println(err)wg.Done()break}}


package mainimport ("context""fmt""google.golang.org/grpc""projectRPCTest3/pb/person""sync""time"
)func main() {l, _ := grpc.Dial("", grpc.WithInsecure())client := person.NewSearchServiceClient(l)//wg := sync.WaitGroup{}//wg.Add(10)//for i := 0; i < 10; i++ {//	search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})//	if err != nil {//		fmt.Println(err)//	}//	wg.Done()//	fmt.Println(search.GetName())//}//wg.Wait()//c, _ := client.SearchIn(context.Background())//i := 0//for {//	if i > 10 {//		res, _ := c.CloseAndRecv()//		fmt.Println(res)//		break////	}//	time.Sleep(1 * time.Second)//	c.Send(&person.PersonReq{Name: "client send message...."})//	i++////}//c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})//for {//	req, err := c.Recv()//	if err != nil {//		fmt.Println(err)//		break//	}//	fmt.Println(req)//}c, _ := client.SearchIO(context.Background())wg := sync.WaitGroup{}wg.Add(1)go func() {for {err := c.Send(&person.PersonReq{Name: " hello"})time.Sleep(2 * time.Second)if err != nil {wg.Done()break}}}()go func() {for {req, err := c.Recv()fmt.Println(req)if err != nil {fmt.Println(err)wg.Done()break}}}()wg.Wait()

1.2 任意字节传输


syntax = "proto3";option go_package = "projectBindataClientStreaming/service";service Repo {rpc CreateRepo(stream RepoCreateRequest) returns(RepoCreateReply) {}
}message RepoCreateRequest {oneof body {RepoContext context = 1;bytes data = 2;}
}message RepoContext {string creator_id = 1;string name =2;
}message Repository {string id = 1;string name = 2;string url = 3;
}message RepoCreateReply {Repository repo = 1;int32 size =2;


protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\repositories.proto


package mainimport ("fmt""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/status""io""log""net""os"svc "projectBindataClientStreaming/service"
)type repoService struct {svc.UnimplementedRepoServer
}func (s *repoService) CreateRepo(stream svc.Repo_CreateRepoServer,
) error {var repoContext *svc.RepoContextvar data []bytefor {r, err := stream.Recv()if err == io.EOF {break}if err != nil {return status.Error(codes.Unknown,err.Error(),)}switch t := r.Body.(type) {case *svc.RepoCreateRequest_Context:repoContext = r.GetContext()case *svc.RepoCreateRequest_Data:b := r.GetData()data = append(data, b...)case nil:return status.Error(codes.InvalidArgument,"Message doesn't contain context or data",)default:return status.Errorf(codes.FailedPrecondition,"Unexpected message type: %s",t,)}}repo := svc.Repository{Name: repoContext.Name,Url: fmt.Sprintf("https://git.example.com/%s/%s",repoContext.CreatorId,repoContext.Name,),}r := svc.RepoCreateReply{Repo: &repo,Size: int32(len(data)),}return stream.SendAndClose(&r)
}func registerServices(s *grpc.Server) {svc.RegisterRepoServer(s, &repoService{})
}func startServer(s *grpc.Server, l net.Listener) error {return s.Serve(l)
}func main() {listenAddr := ":50051"lis, err := net.Listen("tcp", listenAddr)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()registerServices(s)log.Fatal(startServer(s, lis))


package mainimport ("context""google.golang.org/grpc""google.golang.org/grpc/test/bufconn""io""net"svc "projectBindataClientStreaming/service""strings""testing"
)func startTestGrpcServer() *bufconn.Listener {l := bufconn.Listen(1)s := grpc.NewServer()registerServices(s)go func() {startServer(s, l)}()return l
}func TestCreateRepo(t *testing.T) {l := startTestGrpcServer()bufconnDialer := func(ctx context.Context, address string) (net.Conn, error) {return l.Dial()}client, err := grpc.DialContext(context.Background(),"", grpc.WithContextDialer(bufconnDialer), grpc.WithInsecure())if err != nil {t.Fatalf("could not dial bufconn: %v", err)}repoClient := svc.NewRepoClient(client)stream, err := repoClient.CreateRepo(context.Background(),)if err != nil {t.Fatal("CreateRepo", err)}c := svc.RepoCreateRequest_Context{Context: &svc.RepoContext{CreatorId: "user-123",Name:      "test-repo",},}r := svc.RepoCreateRequest{Body: &c,}err = stream.Send(&r)if err != nil {t.Fatal("stream.Send", err)}data := "Arbitrary Data Bytes"repoData := strings.NewReader(data)for {b, err := repoData.ReadByte()if err == io.EOF {break}bData := svc.RepoCreateRequest_Data{Data: []byte{b},}r := svc.RepoCreateRequest{Body: &bData,}err = stream.Send(&r)if err != nil {t.Fatal("stream.Send", err)}l.Close()}resp, err := stream.CloseAndRecv()if err != nil {t.Fatal("CloseAndRecv", err)}expectedSize := int32(len(data))if resp.Size != expectedSize {t.Errorf("resp.Size != expectedSize, resp: %v", resp.Size)}expectedRepoUrl := "https://git.example.com/user-123/test-repo"if resp.Repo.Url != expectedRepoUrl {t.Errorf("resp.Repo.Url != expectedRepoUrl, resp: %v,got:%s", resp.Repo.Url,expectedRepoUrl)}



2.1 案例,restful访问请求


syntax = "proto3";package person;option go_package="projectRPCTest4/pb/person;person";import "google/api/annotations.proto";
message Mbody {string name = 1;
message PersonReq {string name = 1;int32 age = 2;Mbody body =3;
}message PersonRes {string name = 1;int32 age = 2;Mbody body = 3;
}service SearchService {rpc Search(PersonReq) returns (PersonRes){option(google.api.http)= {// post:"/api/person",// get:"/api/person",// get:"/api/person/{name}/{age}/{body.name}",get:"/api/person/{name=qm}/{age}/{body.name}",//只能固定//body:"body"};};



protoc -I ./pb --go_out ./pb --go_opt paths=source_relative --go-grpc_out ./pb --go-grpc_opt paths=source_relative --grpc-gateway_out ./pb --grpc-gateway_opt paths=source_relative  ./pb/person/person.proto


func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {name := req.GetName()fmt.Println(name)fmt.Println(req.Name)fmt.Println(req.Age)fmt.Println(req.String())req.GetBody()res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}return res, nil


package mainimport ("context""fmt""github.com/grpc-ecosystem/grpc-gateway/v2/runtime""net""net/http""projectRPCTest4/pb/person""sync""google.golang.org/grpc"
)type personServe struct {person.UnimplementedSearchServiceServer
}func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {name := req.GetName()fmt.Println(name)fmt.Println(req.Name)fmt.Println(req.Age)fmt.Println(req.String())req.GetBody()res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}return res, nil
}func main() {wg := sync.WaitGroup{}wg.Add(2)go registerGateway(&wg)go registerGRPC(&wg)//time.Sleep(100 * time.Second)wg.Wait()
}func registerGateway(wg *sync.WaitGroup) {conn, _ := grpc.DialContext(context.Background(), "localhost:8888", grpc.WithBlock(), grpc.WithInsecure())mux := runtime.NewServeMux()gwServer := &http.Server{Handler: mux,Addr:    ":8090",}err := person.RegisterSearchServiceHandler(context.Background(), mux, conn)if err != nil {wg.Done()panic(err)}gwServer.ListenAndServe()
}func registerGRPC(wg *sync.WaitGroup) {l, err := net.Listen("tcp", ":8888")if err != nil {wg.Done()panic(err)}s := grpc.NewServer()person.RegisterSearchServiceServer(s, &personServe{})s.Serve(l)



syntax = "proto3";option go_package = "projectRPCTest5-4/service/users";service Users {rpc GetUser (UserGetRequest) returns (UserGetReply) {}rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}message UserGetRequest {string email = 1;string id = 2;  
}message User {string id = 1;string first_name = 2;string last_name = 3;int32 age = 4;
}message UserGetReply {User user = 1;
}message UserHelpRequest {User user = 1;string request = 2;
}message UserHelpReply {string response = 1;

3.1 客户端拦截器

package mainimport ("bufio""context""fmt""google.golang.org/grpc""google.golang.org/grpc/metadata""io""log""os"svc "projectRPCTest5-4/service"
)func metadataUnaryInterceptor(ctx context.Context,method string,req, reply interface{},cc *grpc.ClientConn,invoker grpc.UnaryInvoker,opts ...grpc.CallOption,
) error {ctxWithMetadata := metadata.AppendToOutgoingContext(ctx,"Request-Id","request-123",)return invoker(ctxWithMetadata, method, req, reply, cc, opts...)
}func metadataStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,opts ...grpc.CallOption) (grpc.ClientStream, error) {ctxWithMetadata := metadata.AppendToOutgoingContext(ctx,"Request-Id","request-123",)clientStream, err := streamer(ctxWithMetadata, desc, cc, method, opts...)return clientStream, err
}func setupGrpcConn(addr string) (*grpc.ClientConn, error) {return grpc.DialContext(context.Background(), addr,grpc.WithInsecure(), grpc.WithBlock(),grpc.WithUnaryInterceptor(metadataUnaryInterceptor),grpc.WithStreamInterceptor(metadataStreamInterceptor))
}func getUserServiceClient(conn *grpc.ClientConn) svc.UsersClient {return svc.NewUsersClient(conn)
}func getUser(client svc.UsersClient,u *svc.UserGetRequest) (*svc.UserGetReply, error) {return client.GetUser(context.Background(), u)
}func setupChat(r io.Reader, w io.Writer, c svc.UsersClient) error {stream, err := c.GetHelp(context.Background())if err != nil {return err}for {scanner := bufio.NewScanner(r)prompt := "Request:"fmt.Fprint(w, prompt)scanner.Scan()if err := scanner.Err(); err != nil {return err}msg := scanner.Text()if msg == "quit" {break}request := svc.UserHelpRequest{Request: msg,}err := stream.Send(&request)if err != nil {return err}resp, err := stream.Recv()if err != nil {return err}fmt.Println("Recv", resp.Response)}return stream.CloseSend()
}func main() {if len(os.Args) != 3 {log.Fatal("Specify a gRPC server and method to call")}serverAddrr := os.Args[1]methodName := os.Args[2]conn, err := setupGrpcConn(serverAddrr)if err != nil {log.Fatal(err)}defer conn.Close()c := getUserServiceClient(conn)switch methodName {case "GetUser":result, err := getUser(c,&svc.UserGetRequest{Email: "jane@doe.com"},)if err != nil {log.Fatal(err)}fmt.Fprintf(os.Stdout, "User: %s %s\n",result.User.FirstName,result.User.LastName,)case "GetHelp":err = setupChat(os.Stdin, os.Stdout, c)if err != nil {log.Fatal(err)}default:log.Fatal("Unrecognized method name")}

3.2 服务端拦截器


package main//
import (//	"bufio""context""errors""fmt""google.golang.org/grpc""google.golang.org/grpc/metadata""io""log""net""os"svc "projectRPCTest5-4/service""strings""time"
)type userService struct {svc.UnimplementedUsersServer
}func logMessage(ctx context.Context, method string, latency time.Duration, err error) {var requestId stringmd, ok := metadata.FromIncomingContext(ctx)if !ok {log.Println("md", md)} else {if len(md.Get("Request-Id")) != 0 {requestId = md.Get("Request-Id")[0]}}log.Printf("Method:%s,Error:%v,Latency:%v,Request-Id:%s", method, err, latency, requestId)}func loggingUnaryInterceptor(ctx context.Context, req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler) (interface{},error) {start := time.Now()resp, err := handler(ctx, req)logMessage(ctx, info.FullMethod, time.Since(start), err)return resp, err
}func loggingStreamInterceptor(srv interface{}, stream grpc.ServerStream,info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {start := time.Now()err := handler(srv, stream)ctx := stream.Context()logMessage(ctx, info.FullMethod, time.Since(start), err)return err
}func (s *userService) GetUser(ctx context.Context, in *svc.UserGetRequest) (*svc.UserGetReply, error) {log.Printf("Received request for user with Email:%s Id:%s\n",in.Email, in.Id)components := strings.Split(in.Email, "@")if len(components) != 2 {return nil, errors.New("invalid email")}u := svc.User{Id:        in.Id,FirstName: components[0],LastName:  components[1],Age:       36,}return &svc.UserGetReply{User: &u}, nil
}func (s *userService) GetHelp(stream svc.Users_GetHelpServer) error {log.Println("Client connected")for {request, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}fmt.Printf("Request:%+v\n", request.Request)response := svc.UserHelpReply{Response: request.Request,}err = stream.Send(&response)if err != nil {return err}}log.Println("Client disconnected")return nil
}func registerServices(s *grpc.Server) {svc.RegisterUsersServer(s, &userService{})
}func startServer(s *grpc.Server, l net.Listener) error {return s.Serve(l)
}func main() {listenAddr := os.Getenv("LISTEN_ADDR")if len(listenAddr) == 0 {listenAddr = ":50051"}lis, err := net.Listen("tcp", listenAddr)if err != nil {log.Fatal(err)}s := grpc.NewServer(grpc.UnaryInterceptor(loggingUnaryInterceptor),grpc.StreamInterceptor(loggingStreamInterceptor))registerServices(s)log.Fatal(startServer(s, lis))

3.4 连接拦截器


grpc ChainStreamInterceptor(metricStreamInterceptro,loggingStreamInterceptor,	


4.1 服务端健康检查

syntax = "proto3";option go_package = "projectRPCHealthCheck/service/users";service Users {rpc GetUser (UserGetRequest) returns (UserGetReply) {}rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}message UserGetRequest {string email = 1;string id = 2;  
}message User {string id = 1;string first_name = 2;string last_name = 3;int32 age = 4;
}message UserGetReply {User user = 1;
}message UserHelpRequest {User user = 1;string request = 2;
}message UserHelpReply {string response = 1;


package mainimport ("context""errors""fmt""io""log""net""os""strings""google.golang.org/grpc"healthz "google.golang.org/grpc/health"healthsvc "google.golang.org/grpc/health/grpc_health_v1"svc "projectRPCHealthCheck/service"
)type userService struct {svc.UnimplementedUsersServer
}func (s *userService) GetUser(ctx context.Context,in *svc.UserGetRequest,
) (*svc.UserGetReply, error) {log.Printf("Received request for user with Email: %s Id: %s\n",in.Email,in.Id,)components := strings.Split(in.Email, "@")if len(components) != 2 {return nil, errors.New("invalid email address")}u := svc.User{Id:        in.Id,FirstName: components[0],LastName:  components[1],Age:       36,}return &svc.UserGetReply{User: &u}, nil
}func (s *userService) GetHelp(stream svc.Users_GetHelpServer,
) error {for {request, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}fmt.Printf("Request receieved: %s\n", request.Request)response := svc.UserHelpReply{Response: request.Request,}err = stream.Send(&response)if err != nil {return err}}return nil
}func registerServices(s *grpc.Server, h *healthz.Server) {svc.RegisterUsersServer(s, &userService{})healthsvc.RegisterHealthServer(s, h)
}func updateServiceHealth(h *healthz.Server,service string,status healthsvc.HealthCheckResponse_ServingStatus,
) {h.SetServingStatus(service,status,)
}func startServer(s *grpc.Server, l net.Listener) error {return s.Serve(l)
}func main() {listenAddr := os.Getenv("LISTEN_ADDR")if len(listenAddr) == 0 {listenAddr = ":50051"}lis, err := net.Listen("tcp", listenAddr)if err != nil {log.Fatal(err)}s := grpc.NewServer()h := healthz.NewServer()registerServices(s, h)updateServiceHealth(h,svc.Users_ServiceDesc.ServiceName,healthsvc.HealthCheckResponse_SERVING,)log.Fatal(startServer(s, lis))


package mainimport ("context""errors"healthz "google.golang.org/grpc/health""log""net"svc "projectRPCHealthCheck/service"users "projectRPCHealthCheck/service""testing""google.golang.org/grpc""google.golang.org/grpc/codes"healthsvc "google.golang.org/grpc/health/grpc_health_v1""google.golang.org/grpc/status""google.golang.org/grpc/test/bufconn"
)var h *healthz.Serverfunc startTestGrpcServer() *bufconn.Listener {h = healthz.NewServer()l := bufconn.Listen(10)s := grpc.NewServer()registerServices(s, h)updateServiceHealth(h,svc.Users_ServiceDesc.ServiceName,healthsvc.HealthCheckResponse_SERVING,)go func() {log.Fatal(startServer(s, l))}()return l
}func getHealthSvcClient(l *bufconn.Listener) (healthsvc.HealthClient, error) {bufconnDialer := func(ctx context.Context, addr string,) (net.Conn, error) {return l.Dial()}client, err := grpc.DialContext(context.Background(),"", grpc.WithInsecure(),grpc.WithContextDialer(bufconnDialer),)if err != nil {return nil, err}return healthsvc.NewHealthClient(client), nil
}func TestHealthService(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}resp, err := healthClient.Check(context.Background(),&healthsvc.HealthCheckRequest{},)if err != nil {t.Fatal(err)}serviceHealthStatus := resp.Status.String()if serviceHealthStatus != "SERVING" {t.Fatalf("Expected health: SERVING, Got: %s",serviceHealthStatus,)}
}func TestHealthServiceUsers(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}resp, err := healthClient.Check(context.Background(),&healthsvc.HealthCheckRequest{Service: "Users",},)if err != nil {t.Fatal(err)}serviceHealthStatus := resp.Status.String()if serviceHealthStatus != "SERVING" {t.Fatalf("Expected health: SERVING, Got: %s",serviceHealthStatus,)}
}func TestHealthServiceUnknown(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}_, err = healthClient.Check(context.Background(),&healthsvc.HealthCheckRequest{Service: "Repo",},)if err == nil {t.Fatalf("Expected non-nil error, Got nil error")}expectedError := status.Errorf(codes.NotFound, "unknown service",)if !errors.Is(err, expectedError) {t.Fatalf("Expected error %v, Got; %v",err,expectedError,)}
}func TestHealthServiceWatch(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}client, err := healthClient.Watch(context.Background(),&healthsvc.HealthCheckRequest{Service: "Users",},)if err != nil {t.Fatal(err)}resp, err := client.Recv()if err != nil {t.Fatalf("Error in Watch: %#v\n", err)}if resp.Status != healthsvc.HealthCheckResponse_SERVING {t.Errorf("Expected SERVING, Got: %#v", resp.Status.String())}updateServiceHealth(h,"Users",healthsvc.HealthCheckResponse_NOT_SERVING,)resp, err = client.Recv()if err != nil {t.Fatalf("Error in Watch: %#v\n", err)}if resp.Status != healthsvc.HealthCheckResponse_NOT_SERVING {t.Errorf("Expected NOT_SERVING, Got: %#v", resp.Status.String())}
}func TestUserService1(t *testing.T) {l := startTestGrpcServer()bufconnDialer := func(ctx context.Context, addr string,) (net.Conn, error) {return l.Dial()}client, err := grpc.DialContext(context.Background(),"", grpc.WithInsecure(),grpc.WithContextDialer(bufconnDialer),)if err != nil {t.Fatal(err)}usersClient := users.NewUsersClient(client)resp, err := usersClient.GetUser(context.Background(),&users.UserGetRequest{Email: "jane@doe.com",Id:    "foo-bar",},)if err != nil {t.Fatal(err)}if resp.User.FirstName != "jane" {t.Errorf("Expected FirstName to be: jane, Got: %s",resp.User.FirstName,)}

4.2 服务端处理运行时错误


func panicStreamInterceptor(srv interface{},stream grpc.ServerStream,info *grpc.StreamServerInfo,handler grpc.StreamHandler,
) (err error) {defer func() {if r := recover(); r != nil {log.Printf("Panic recovered: %v\n", r)err = status.Error(codes.Internal,"Unexpected error happened",)}}()serverStream := wrappedServerStream{ServerStream: stream,}err = handler(srv, serverStream)return


func panicUnaryInterceptor(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler,
) (resp interface{}, err error) {defer func() {if r := recover(); r != nil {log.Printf("Panic recovered: %v\n", r)err = status.Error(codes.Internal,"Unexpected error happened",)}}()resp, err = handler(ctx, req)return

4.3 服务端终止请求处理


func timeoutStreamInterceptor(srv interface{},stream grpc.ServerStream,info *grpc.StreamServerInfo,handler grpc.StreamHandler,
) error {serverStream := wrappedServerStream{RecvMsgTimeout: 500 * time.Millisecond,ServerStream:   stream,}err := handler(srv, serverStream)return err


func timeoutUnaryInterceptor(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler,
) (interface{}, error) {var resp interface{}var err errorctxWithTimeout, cancel := context.WithTimeout(ctx, 300*time.Millisecond)defer cancel()ch := make(chan error)go func() {resp, err = handler(ctxWithTimeout, req)ch <- err}()select {case <-ctxWithTimeout.Done():cancel()err = status.Error(codes.DeadlineExceeded,fmt.Sprintf("%s: Deadline exceeded", info.FullMethod),)return resp, errcase <-ch:}return resp, err

4.4 客户端提高连接配置


func setupGrpcConn(addr string) (*grpc.ClientConn, context.CancelFunc, error) {log.Printf("Connecting to server on %s\n", addr)ctx, cancel := context.WithTimeout(context.Background(),10*time.Second,)conn, err := grpc.DialContext(ctx,addr,grpc.WithInsecure(),grpc.WithBlock(),grpc.FailOnNonTempDialError(true),grpc.WithReturnConnectionError(),)return conn, cancel, err

4.5 为方法调用设置超时




