欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > go GRPC学习笔记

go GRPC学习笔记

2025/3/13 19:19:09 来源:https://blog.csdn.net/m0_37149062/article/details/146204095  浏览:    关键词:go GRPC学习笔记

本博文源于笔者正在学习的gprc,相关配套书籍素材来源是《Go编程进阶实战》,博文内容主要包含了RPC模式讲解,RPC通过htttp访问、拦截器、提高服务端与客户端容错的内容配置
在此之前需要下载protoc,这里不做下载过程

1、RPC模式

首先定义在rpc模式里共有四种

  • Unary RPC 单个请求–单个响应
  • Server Streaming RPC 多个请求–单个响应
  • Client Streaming RPC 单个请求–多个响应
  • Bidriectional Streaming RPC 多个请求–多个响应

1.1 举例:编写

文件夹目录
在这里插入图片描述

编写protoc文件

syntax = "proto3"; // 声明编译器用的是prototype3package person;option go_package="/projectRPCTest3/pb/person;person";message PersonReq{string name =1;int32 age = 2;
}message PersonRes{string name = 1;int32 age = 2;
}service SearchService {rpc Search(PersonReq) returns (PersonRes);rpc SearchIn(stream PersonReq) returns(PersonRes);rpc SearchOut(PersonReq) returns (stream PersonRes);rpc SearchIO(stream PersonReq) returns (stream PersonRes);
}

编译出protoc的两个go文件,在person目录下进行编译
在这里插入图片描述

cd .\pb
cd .\person\
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\person.proto

main.go文件进行编写:

  • Search将收到的请求进行返回
  • SearchIn 多个请求过来统一做回复
  • SearchOut 单个请求过来,多次响应
  • SearchIO 多个请求过来,多个响应
package mainimport ("context""fmt""time""net""projectRPCTest3/pb/person""google.golang.org/grpc"
)type personServe struct {person.UnimplementedSearchServiceServer
}func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {name := req.GetName()res := &person.PersonRes{Name: "我收到了" + name + "的信息"}return res, nil
}
func (*personServe) SearchIn(server person.SearchService_SearchInServer) error {for {req, err := server.Recv()fmt.Println(req)if err != nil {server.SendAndClose(&person.PersonRes{Name: "success"})break}}return nil
}func (*personServe) SearchOut(req *person.PersonReq, server grpc.ServerStreamingServer[person.PersonRes]) error {name := req.Namei := 0for {if i > 10 {return nil}time.Sleep(1 * time.Second)server.Send(&person.PersonRes{Name: "I got it" + name})i++}}
func (*personServe) SearchIO(server grpc.BidiStreamingServer[person.PersonReq, person.PersonRes]) error {str := make(chan string)go func() {for {req, err := server.Recv()if err != nil {str <- "Result"break}str <- req.Name}}()for {s := <-strif s == "Result" {break}server.Send(&person.PersonRes{Name: "I got it" + s})}return nil
}func main() {l, _ := net.Listen("tcp", ":8888")s := grpc.NewServer()person.RegisterSearchServiceServer(s, &personServe{})s.Serve(l)
}

然后使用客户端client对server发送消息,进行测试,分为四个部分,最后给出完整的client.go
search

wg := sync.WaitGroup{}
wg.Add(10)
for i := 0; i < 10; i++ {search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})if err != nil {fmt.Println(err)}wg.Done()fmt.Println(search.GetName())
}
wg.Wait()

在这里插入图片描述

searchIn

c, _ := client.SearchIn(context.Background())
i := 0
for {if i > 10 {res, _ := c.CloseAndRecv()fmt.Println(res)break}time.Sleep(1 * time.Second)c.Send(&person.PersonReq{Name: "client send message...."})i++}

在这里插入图片描述

searchOut

c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})
for {req, err := c.Recv()if err != nil {fmt.Println(err)break}fmt.Println(req)
}

在这里插入图片描述

searchIO

c, _ := client.SearchIO(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {for {err := c.Send(&person.PersonReq{Name: " hello"})time.Sleep(2 * time.Second)if err != nil {wg.Done()break}}
}()
go func() {for {req, err := c.Recv()fmt.Println(req)if err != nil {fmt.Println(err)wg.Done()break}}
}()
wg.Wait()

在这里插入图片描述
完整的client.go

package mainimport ("context""fmt""google.golang.org/grpc""projectRPCTest3/pb/person""sync""time"
)func main() {l, _ := grpc.Dial("127.0.0.1:8888", grpc.WithInsecure())client := person.NewSearchServiceClient(l)//wg := sync.WaitGroup{}//wg.Add(10)//for i := 0; i < 10; i++ {//	search, err := client.Search(context.Background(), &person.PersonReq{Name: "Alice"})//	if err != nil {//		fmt.Println(err)//	}//	wg.Done()//	fmt.Println(search.GetName())//}//wg.Wait()//c, _ := client.SearchIn(context.Background())//i := 0//for {//	if i > 10 {//		res, _ := c.CloseAndRecv()//		fmt.Println(res)//		break////	}//	time.Sleep(1 * time.Second)//	c.Send(&person.PersonReq{Name: "client send message...."})//	i++////}//c, _ := client.SearchOut(context.Background(), &person.PersonReq{Name: "zhangsan"})//for {//	req, err := c.Recv()//	if err != nil {//		fmt.Println(err)//		break//	}//	fmt.Println(req)//}c, _ := client.SearchIO(context.Background())wg := sync.WaitGroup{}wg.Add(1)go func() {for {err := c.Send(&person.PersonReq{Name: " hello"})time.Sleep(2 * time.Second)if err != nil {wg.Done()break}}}()go func() {for {req, err := c.Recv()fmt.Println(req)if err != nil {fmt.Println(err)wg.Done()break}}}()wg.Wait()
}

1.2 任意字节传输

proto文件定义,文件名称为repositories.proto

syntax = "proto3";option go_package = "projectBindataClientStreaming/service";service Repo {rpc CreateRepo(stream RepoCreateRequest) returns(RepoCreateReply) {}
}message RepoCreateRequest {oneof body {RepoContext context = 1;bytes data = 2;}
}message RepoContext {string creator_id = 1;string name =2;
}message Repository {string id = 1;string name = 2;string url = 3;
}message RepoCreateReply {Repository repo = 1;int32 size =2;
}

进行编译proto

protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative .\repositories.proto

server.go进行实现CreateRepo,然后注册服务,开启tcp,

package mainimport ("fmt""google.golang.org/grpc""google.golang.org/grpc/codes""google.golang.org/grpc/status""io""log""net""os"svc "projectBindataClientStreaming/service"
)type repoService struct {svc.UnimplementedRepoServer
}func (s *repoService) CreateRepo(stream svc.Repo_CreateRepoServer,
) error {var repoContext *svc.RepoContextvar data []bytefor {r, err := stream.Recv()if err == io.EOF {break}if err != nil {return status.Error(codes.Unknown,err.Error(),)}switch t := r.Body.(type) {case *svc.RepoCreateRequest_Context:repoContext = r.GetContext()case *svc.RepoCreateRequest_Data:b := r.GetData()data = append(data, b...)case nil:return status.Error(codes.InvalidArgument,"Message doesn't contain context or data",)default:return status.Errorf(codes.FailedPrecondition,"Unexpected message type: %s",t,)}}repo := svc.Repository{Name: repoContext.Name,Url: fmt.Sprintf("https://git.example.com/%s/%s",repoContext.CreatorId,repoContext.Name,),}r := svc.RepoCreateReply{Repo: &repo,Size: int32(len(data)),}return stream.SendAndClose(&r)
}func registerServices(s *grpc.Server) {svc.RegisterRepoServer(s, &repoService{})
}func startServer(s *grpc.Server, l net.Listener) error {return s.Serve(l)
}func main() {listenAddr := ":50051"lis, err := net.Listen("tcp", listenAddr)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()registerServices(s)log.Fatal(startServer(s, lis))
}

测试server_test.go

package mainimport ("context""google.golang.org/grpc""google.golang.org/grpc/test/bufconn""io""net"svc "projectBindataClientStreaming/service""strings""testing"
)func startTestGrpcServer() *bufconn.Listener {l := bufconn.Listen(1)s := grpc.NewServer()registerServices(s)go func() {startServer(s, l)}()return l
}func TestCreateRepo(t *testing.T) {l := startTestGrpcServer()bufconnDialer := func(ctx context.Context, address string) (net.Conn, error) {return l.Dial()}client, err := grpc.DialContext(context.Background(),"", grpc.WithContextDialer(bufconnDialer), grpc.WithInsecure())if err != nil {t.Fatalf("could not dial bufconn: %v", err)}repoClient := svc.NewRepoClient(client)stream, err := repoClient.CreateRepo(context.Background(),)if err != nil {t.Fatal("CreateRepo", err)}c := svc.RepoCreateRequest_Context{Context: &svc.RepoContext{CreatorId: "user-123",Name:      "test-repo",},}r := svc.RepoCreateRequest{Body: &c,}err = stream.Send(&r)if err != nil {t.Fatal("stream.Send", err)}data := "Arbitrary Data Bytes"repoData := strings.NewReader(data)for {b, err := repoData.ReadByte()if err == io.EOF {break}bData := svc.RepoCreateRequest_Data{Data: []byte{b},}r := svc.RepoCreateRequest{Body: &bData,}err = stream.Send(&r)if err != nil {t.Fatal("stream.Send", err)}l.Close()}resp, err := stream.CloseAndRecv()if err != nil {t.Fatal("CloseAndRecv", err)}expectedSize := int32(len(data))if resp.Size != expectedSize {t.Errorf("resp.Size != expectedSize, resp: %v", resp.Size)}expectedRepoUrl := "https://git.example.com/user-123/test-repo"if resp.Repo.Url != expectedRepoUrl {t.Errorf("resp.Repo.Url != expectedRepoUrl, resp: %v,got:%s", resp.Repo.Url,expectedRepoUrl)}
}

2、grpc通过http访问

上一小节通过tcp,这次通过http访问

2.1 案例,restful访问请求

proto文件编写,下面代码中的注释部分是用来测试,可复制过来一起测试。

syntax = "proto3";package person;option go_package="projectRPCTest4/pb/person;person";import "google/api/annotations.proto";
message Mbody {string name = 1;
}
message PersonReq {string name = 1;int32 age = 2;Mbody body =3;
}message PersonRes {string name = 1;int32 age = 2;Mbody body = 3;
}service SearchService {rpc Search(PersonReq) returns (PersonRes){option(google.api.http)= {// post:"/api/person",// get:"/api/person",// get:"/api/person/{name}/{age}/{body.name}",get:"/api/person/{name=qm}/{age}/{body.name}",//只能固定//body:"body"};};
}

其中文件夹目录,与其他文件一起附上

在这里插入图片描述
annotations.proto

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.syntax = "proto3";package google.api;import "google/api/http.proto";
import "google/protobuf/descriptor.proto";option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "AnnotationsProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";extend google.protobuf.MethodOptions {// See `HttpRule`.HttpRule http = 72295728;
}

http.proto

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.syntax = "proto3";package google.api;option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "HttpProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";// Defines the HTTP configuration for an API service. It contains a list of
// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method
// to one or more HTTP REST API methods.
message Http {// A list of HTTP configuration rules that apply to individual API methods.//// **NOTE:** All service configuration rules follow "last one wins" order.repeated HttpRule rules = 1;// When set to true, URL path parameters will be fully URI-decoded except in// cases of single segment matches in reserved expansion, where "%2F" will be// left encoded.//// The default behavior is to not decode RFC 6570 reserved characters in multi// segment matches.bool fully_decode_reserved_expansion = 2;
}// gRPC Transcoding
//
// gRPC Transcoding is a feature for mapping between a gRPC method and one or
// more HTTP REST endpoints. It allows developers to build a single API service
// that supports both gRPC APIs and REST APIs. Many systems, including [Google
// APIs](https://github.com/googleapis/googleapis),
// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC
// Gateway](https://github.com/grpc-ecosystem/grpc-gateway),
// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature
// and use it for large scale production services.
//
// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies
// how different portions of the gRPC request message are mapped to the URL
// path, URL query parameters, and HTTP request body. It also controls how the
// gRPC response message is mapped to the HTTP response body. `HttpRule` is
// typically specified as an `google.api.http` annotation on the gRPC method.
//
// Each mapping specifies a URL path template and an HTTP method. The path
// template may refer to one or more fields in the gRPC request message, as long
// as each field is a non-repeated field with a primitive (non-message) type.
// The path template controls how fields of the request message are mapped to
// the URL path.
//
// Example:
//
//     service Messaging {
//       rpc GetMessage(GetMessageRequest) returns (Message) {
//         option (google.api.http) = {
//             get: "/v1/{name=messages/*}"
//         };
//       }
//     }
//     message GetMessageRequest {
//       string name = 1; // Mapped to URL path.
//     }
//     message Message {
//       string text = 1; // The resource content.
//     }
//
// This enables an HTTP REST to gRPC mapping as below:
//
// - HTTP: `GET /v1/messages/123456`
// - gRPC: `GetMessage(name: "messages/123456")`
//
// Any fields in the request message which are not bound by the path template
// automatically become HTTP query parameters if there is no HTTP request body.
// For example:
//
//     service Messaging {
//       rpc GetMessage(GetMessageRequest) returns (Message) {
//         option (google.api.http) = {
//             get:"/v1/messages/{message_id}"
//         };
//       }
//     }
//     message GetMessageRequest {
//       message SubMessage {
//         string subfield = 1;
//       }
//       string message_id = 1; // Mapped to URL path.
//       int64 revision = 2;    // Mapped to URL query parameter `revision`.
//       SubMessage sub = 3;    // Mapped to URL query parameter `sub.subfield`.
//     }
//
// This enables a HTTP JSON to RPC mapping as below:
//
// - HTTP: `GET /v1/messages/123456?revision=2&sub.subfield=foo`
// - gRPC: `GetMessage(message_id: "123456" revision: 2 sub:
// SubMessage(subfield: "foo"))`
//
// Note that fields which are mapped to URL query parameters must have a
// primitive type or a repeated primitive type or a non-repeated message type.
// In the case of a repeated type, the parameter can be repeated in the URL
// as `...?param=A&param=B`. In the case of a message type, each field of the
// message is mapped to a separate parameter, such as
// `...?foo.a=A&foo.b=B&foo.c=C`.
//
// For HTTP methods that allow a request body, the `body` field
// specifies the mapping. Consider a REST update method on the
// message resource collection:
//
//     service Messaging {
//       rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
//         option (google.api.http) = {
//           patch: "/v1/messages/{message_id}"
//           body: "message"
//         };
//       }
//     }
//     message UpdateMessageRequest {
//       string message_id = 1; // mapped to the URL
//       Message message = 2;   // mapped to the body
//     }
//
// The following HTTP JSON to RPC mapping is enabled, where the
// representation of the JSON in the request body is determined by
// protos JSON encoding:
//
// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }`
// - gRPC: `UpdateMessage(message_id: "123456" message { text: "Hi!" })`
//
// The special name `*` can be used in the body mapping to define that
// every field not bound by the path template should be mapped to the
// request body.  This enables the following alternative definition of
// the update method:
//
//     service Messaging {
//       rpc UpdateMessage(Message) returns (Message) {
//         option (google.api.http) = {
//           patch: "/v1/messages/{message_id}"
//           body: "*"
//         };
//       }
//     }
//     message Message {
//       string message_id = 1;
//       string text = 2;
//     }
//
//
// The following HTTP JSON to RPC mapping is enabled:
//
// - HTTP: `PATCH /v1/messages/123456 { "text": "Hi!" }`
// - gRPC: `UpdateMessage(message_id: "123456" text: "Hi!")`
//
// Note that when using `*` in the body mapping, it is not possible to
// have HTTP parameters, as all fields not bound by the path end in
// the body. This makes this option more rarely used in practice when
// defining REST APIs. The common usage of `*` is in custom methods
// which don't use the URL at all for transferring data.
//
// It is possible to define multiple HTTP methods for one RPC by using
// the `additional_bindings` option. Example:
//
//     service Messaging {
//       rpc GetMessage(GetMessageRequest) returns (Message) {
//         option (google.api.http) = {
//           get: "/v1/messages/{message_id}"
//           additional_bindings {
//             get: "/v1/users/{user_id}/messages/{message_id}"
//           }
//         };
//       }
//     }
//     message GetMessageRequest {
//       string message_id = 1;
//       string user_id = 2;
//     }
//
// This enables the following two alternative HTTP JSON to RPC mappings:
//
// - HTTP: `GET /v1/messages/123456`
// - gRPC: `GetMessage(message_id: "123456")`
//
// - HTTP: `GET /v1/users/me/messages/123456`
// - gRPC: `GetMessage(user_id: "me" message_id: "123456")`
//
// Rules for HTTP mapping
//
// 1. Leaf request fields (recursive expansion nested messages in the request
//    message) are classified into three categories:
//    - Fields referred by the path template. They are passed via the URL path.
//    - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They
//    are passed via the HTTP
//      request body.
//    - All other fields are passed via the URL query parameters, and the
//      parameter name is the field path in the request message. A repeated
//      field can be represented as multiple query parameters under the same
//      name.
//  2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL
//  query parameter, all fields
//     are passed via URL path and HTTP request body.
//  3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP
//  request body, all
//     fields are passed via URL path and URL query parameters.
//
// Path template syntax
//
//     Template = "/" Segments [ Verb ] ;
//     Segments = Segment { "/" Segment } ;
//     Segment  = "*" | "**" | LITERAL | Variable ;
//     Variable = "{" FieldPath [ "=" Segments ] "}" ;
//     FieldPath = IDENT { "." IDENT } ;
//     Verb     = ":" LITERAL ;
//
// The syntax `*` matches a single URL path segment. The syntax `**` matches
// zero or more URL path segments, which must be the last part of the URL path
// except the `Verb`.
//
// The syntax `Variable` matches part of the URL path as specified by its
// template. A variable template must not contain other variables. If a variable
// matches a single path segment, its template may be omitted, e.g. `{var}`
// is equivalent to `{var=*}`.
//
// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL`
// contains any reserved character, such characters should be percent-encoded
// before the matching.
//
// If a variable contains exactly one path segment, such as `"{var}"` or
// `"{var=*}"`, when such a variable is expanded into a URL path on the client
// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The
// server side does the reverse decoding. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{var}`.
//
// If a variable contains multiple path segments, such as `"{var=foo/*}"`
// or `"{var=**}"`, when such a variable is expanded into a URL path on the
// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded.
// The server side does the reverse decoding, except "%2F" and "%2f" are left
// unchanged. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{+var}`.
//
// Using gRPC API Service Configuration
//
// gRPC API Service Configuration (service config) is a configuration language
// for configuring a gRPC service to become a user-facing product. The
// service config is simply the YAML representation of the `google.api.Service`
// proto message.
//
// As an alternative to annotating your proto file, you can configure gRPC
// transcoding in your service config YAML files. You do this by specifying a
// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same
// effect as the proto annotation. This can be particularly useful if you
// have a proto that is reused in multiple services. Note that any transcoding
// specified in the service config will override any matching transcoding
// configuration in the proto.
//
// The following example selects a gRPC method and applies an `HttpRule` to it:
//
//     http:
//       rules:
//         - selector: example.v1.Messaging.GetMessage
//           get: /v1/messages/{message_id}/{sub.subfield}
//
// Special notes
//
// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the
// proto to JSON conversion must follow the [proto3
// specification](https://developers.google.com/protocol-buffers/docs/proto3#json).
//
// While the single segment variable follows the semantics of
// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String
// Expansion, the multi segment variable **does not** follow RFC 6570 Section
// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion
// does not expand special characters like `?` and `#`, which would lead
// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding
// for multi segment variables.
//
// The path variables **must not** refer to any repeated or mapped field,
// because client libraries are not capable of handling such variable expansion.
//
// The path variables **must not** capture the leading "/" character. The reason
// is that the most common use case "{var}" does not capture the leading "/"
// character. For consistency, all path variables must share the same behavior.
//
// Repeated message fields must not be mapped to URL query parameters, because
// no client library can support such complicated mapping.
//
// If an API needs to use a JSON array for request or response body, it can map
// the request or response body to a repeated field. However, some gRPC
// Transcoding implementations may not support this feature.
message HttpRule {// Selects a method to which this rule applies.//// Refer to [selector][google.api.DocumentationRule.selector] for syntax// details.string selector = 1;// Determines the URL pattern is matched by this rules. This pattern can be// used with any of the {get|put|post|delete|patch} methods. A custom method// can be defined using the 'custom' field.oneof pattern {// Maps to HTTP GET. Used for listing and getting information about// resources.string get = 2;// Maps to HTTP PUT. Used for replacing a resource.string put = 3;// Maps to HTTP POST. Used for creating a resource or performing an action.string post = 4;// Maps to HTTP DELETE. Used for deleting a resource.string delete = 5;// Maps to HTTP PATCH. Used for updating a resource.string patch = 6;// The custom pattern is used for specifying an HTTP method that is not// included in the `pattern` field, such as HEAD, or "*" to leave the// HTTP method unspecified for this rule. The wild-card rule is useful// for services that provide content to Web (HTML) clients.CustomHttpPattern custom = 8;}// The name of the request field whose value is mapped to the HTTP request// body, or `*` for mapping all request fields not captured by the path// pattern to the HTTP body, or omitted for not having any HTTP request body.//// NOTE: the referred field must be present at the top-level of the request// message type.string body = 7;// Optional. The name of the response field whose value is mapped to the HTTP// response body. When omitted, the entire response message will be used// as the HTTP response body.//// NOTE: The referred field must be present at the top-level of the response// message type.string response_body = 12;// Additional HTTP bindings for the selector. Nested bindings must// not contain an `additional_bindings` field themselves (that is,// the nesting may only be one level deep).repeated HttpRule additional_bindings = 11;
}// A custom pattern is used for defining custom HTTP verb.
message CustomHttpPattern {// The name of this custom HTTP verb.string kind = 1;// The path matched by this custom verb.string path = 2;
}

proto文件进行编译

protoc -I ./pb --go_out ./pb --go_opt paths=source_relative --go-grpc_out ./pb --go-grpc_opt paths=source_relative --grpc-gateway_out ./pb --grpc-gateway_opt paths=source_relative  ./pb/person/person.proto

编写persos.proto中的Search,将接收的打印即可,后面在做注册

func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {name := req.GetName()fmt.Println(name)fmt.Println(req.Name)fmt.Println(req.Age)fmt.Println(req.String())req.GetBody()res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}return res, nil
}

完整的service.go,运行main后,直接用postman进行访问,比较好

package mainimport ("context""fmt""github.com/grpc-ecosystem/grpc-gateway/v2/runtime""net""net/http""projectRPCTest4/pb/person""sync""google.golang.org/grpc"
)type personServe struct {person.UnimplementedSearchServiceServer
}func (*personServe) Search(ctx context.Context, req *person.PersonReq) (*person.PersonRes, error) {name := req.GetName()fmt.Println(name)fmt.Println(req.Name)fmt.Println(req.Age)fmt.Println(req.String())req.GetBody()res := &person.PersonRes{Name: name + "的信息", Age: req.GetAge(), Body: req.GetBody()}return res, nil
}func main() {wg := sync.WaitGroup{}wg.Add(2)go registerGateway(&wg)go registerGRPC(&wg)//time.Sleep(100 * time.Second)wg.Wait()
}func registerGateway(wg *sync.WaitGroup) {conn, _ := grpc.DialContext(context.Background(), "localhost:8888", grpc.WithBlock(), grpc.WithInsecure())mux := runtime.NewServeMux()gwServer := &http.Server{Handler: mux,Addr:    ":8090",}err := person.RegisterSearchServiceHandler(context.Background(), mux, conn)if err != nil {wg.Done()panic(err)}gwServer.ListenAndServe()
}func registerGRPC(wg *sync.WaitGroup) {l, err := net.Listen("tcp", ":8888")if err != nil {wg.Done()panic(err)}s := grpc.NewServer()person.RegisterSearchServiceServer(s, &personServe{})s.Serve(l)
}

3、拦截器

拦截器的语法编写

syntax = "proto3";option go_package = "projectRPCTest5-4/service/users";service Users {rpc GetUser (UserGetRequest) returns (UserGetReply) {}rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}message UserGetRequest {string email = 1;string id = 2;  
}message User {string id = 1;string first_name = 2;string last_name = 3;int32 age = 4;
}message UserGetReply {User user = 1;
}message UserHelpRequest {User user = 1;string request = 2;
}message UserHelpReply {string response = 1;
}

3.1 客户端拦截器

package mainimport ("bufio""context""fmt""google.golang.org/grpc""google.golang.org/grpc/metadata""io""log""os"svc "projectRPCTest5-4/service"
)func metadataUnaryInterceptor(ctx context.Context,method string,req, reply interface{},cc *grpc.ClientConn,invoker grpc.UnaryInvoker,opts ...grpc.CallOption,
) error {ctxWithMetadata := metadata.AppendToOutgoingContext(ctx,"Request-Id","request-123",)return invoker(ctxWithMetadata, method, req, reply, cc, opts...)
}func metadataStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,opts ...grpc.CallOption) (grpc.ClientStream, error) {ctxWithMetadata := metadata.AppendToOutgoingContext(ctx,"Request-Id","request-123",)clientStream, err := streamer(ctxWithMetadata, desc, cc, method, opts...)return clientStream, err
}func setupGrpcConn(addr string) (*grpc.ClientConn, error) {return grpc.DialContext(context.Background(), addr,grpc.WithInsecure(), grpc.WithBlock(),grpc.WithUnaryInterceptor(metadataUnaryInterceptor),grpc.WithStreamInterceptor(metadataStreamInterceptor))
}func getUserServiceClient(conn *grpc.ClientConn) svc.UsersClient {return svc.NewUsersClient(conn)
}func getUser(client svc.UsersClient,u *svc.UserGetRequest) (*svc.UserGetReply, error) {return client.GetUser(context.Background(), u)
}func setupChat(r io.Reader, w io.Writer, c svc.UsersClient) error {stream, err := c.GetHelp(context.Background())if err != nil {return err}for {scanner := bufio.NewScanner(r)prompt := "Request:"fmt.Fprint(w, prompt)scanner.Scan()if err := scanner.Err(); err != nil {return err}msg := scanner.Text()if msg == "quit" {break}request := svc.UserHelpRequest{Request: msg,}err := stream.Send(&request)if err != nil {return err}resp, err := stream.Recv()if err != nil {return err}fmt.Println("Recv", resp.Response)}return stream.CloseSend()
}func main() {if len(os.Args) != 3 {log.Fatal("Specify a gRPC server and method to call")}serverAddrr := os.Args[1]methodName := os.Args[2]conn, err := setupGrpcConn(serverAddrr)if err != nil {log.Fatal(err)}defer conn.Close()c := getUserServiceClient(conn)switch methodName {case "GetUser":result, err := getUser(c,&svc.UserGetRequest{Email: "jane@doe.com"},)if err != nil {log.Fatal(err)}fmt.Fprintf(os.Stdout, "User: %s %s\n",result.User.FirstName,result.User.LastName,)case "GetHelp":err = setupChat(os.Stdin, os.Stdout, c)if err != nil {log.Fatal(err)}default:log.Fatal("Unrecognized method name")}
}

3.2 服务端拦截器

拦截器这里是在每个请求加上日志请求

package main//
import (//	"bufio""context""errors""fmt""google.golang.org/grpc""google.golang.org/grpc/metadata""io""log""net""os"svc "projectRPCTest5-4/service""strings""time"
)type userService struct {svc.UnimplementedUsersServer
}func logMessage(ctx context.Context, method string, latency time.Duration, err error) {var requestId stringmd, ok := metadata.FromIncomingContext(ctx)if !ok {log.Println("md", md)} else {if len(md.Get("Request-Id")) != 0 {requestId = md.Get("Request-Id")[0]}}log.Printf("Method:%s,Error:%v,Latency:%v,Request-Id:%s", method, err, latency, requestId)}func loggingUnaryInterceptor(ctx context.Context, req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler) (interface{},error) {start := time.Now()resp, err := handler(ctx, req)logMessage(ctx, info.FullMethod, time.Since(start), err)return resp, err
}func loggingStreamInterceptor(srv interface{}, stream grpc.ServerStream,info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {start := time.Now()err := handler(srv, stream)ctx := stream.Context()logMessage(ctx, info.FullMethod, time.Since(start), err)return err
}func (s *userService) GetUser(ctx context.Context, in *svc.UserGetRequest) (*svc.UserGetReply, error) {log.Printf("Received request for user with Email:%s Id:%s\n",in.Email, in.Id)components := strings.Split(in.Email, "@")if len(components) != 2 {return nil, errors.New("invalid email")}u := svc.User{Id:        in.Id,FirstName: components[0],LastName:  components[1],Age:       36,}return &svc.UserGetReply{User: &u}, nil
}func (s *userService) GetHelp(stream svc.Users_GetHelpServer) error {log.Println("Client connected")for {request, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}fmt.Printf("Request:%+v\n", request.Request)response := svc.UserHelpReply{Response: request.Request,}err = stream.Send(&response)if err != nil {return err}}log.Println("Client disconnected")return nil
}func registerServices(s *grpc.Server) {svc.RegisterUsersServer(s, &userService{})
}func startServer(s *grpc.Server, l net.Listener) error {return s.Serve(l)
}func main() {listenAddr := os.Getenv("LISTEN_ADDR")if len(listenAddr) == 0 {listenAddr = ":50051"}lis, err := net.Listen("tcp", listenAddr)if err != nil {log.Fatal(err)}s := grpc.NewServer(grpc.UnaryInterceptor(loggingUnaryInterceptor),grpc.StreamInterceptor(loggingStreamInterceptor))registerServices(s)log.Fatal(startServer(s, lis))
}

3.4 连接拦截器

就是将众多拦截器集成在一个应用程序里

grpc.ChainUnaryInterceptor(metricUnaryInterceptro,loggingUnaryInterceptor,
)
grpc ChainStreamInterceptor(metricStreamInterceptro,loggingStreamInterceptor,	
)

4.服务端与客户端健壮性

4.1 服务端健康检查

syntax = "proto3";option go_package = "projectRPCHealthCheck/service/users";service Users {rpc GetUser (UserGetRequest) returns (UserGetReply) {}rpc GetHelp (stream UserHelpRequest) returns (stream UserHelpReply) {}
}message UserGetRequest {string email = 1;string id = 2;  
}message User {string id = 1;string first_name = 2;string last_name = 3;int32 age = 4;
}message UserGetReply {User user = 1;
}message UserHelpRequest {User user = 1;string request = 2;
}message UserHelpReply {string response = 1;
}

服务端的go文件

package mainimport ("context""errors""fmt""io""log""net""os""strings""google.golang.org/grpc"healthz "google.golang.org/grpc/health"healthsvc "google.golang.org/grpc/health/grpc_health_v1"svc "projectRPCHealthCheck/service"
)type userService struct {svc.UnimplementedUsersServer
}func (s *userService) GetUser(ctx context.Context,in *svc.UserGetRequest,
) (*svc.UserGetReply, error) {log.Printf("Received request for user with Email: %s Id: %s\n",in.Email,in.Id,)components := strings.Split(in.Email, "@")if len(components) != 2 {return nil, errors.New("invalid email address")}u := svc.User{Id:        in.Id,FirstName: components[0],LastName:  components[1],Age:       36,}return &svc.UserGetReply{User: &u}, nil
}func (s *userService) GetHelp(stream svc.Users_GetHelpServer,
) error {for {request, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}fmt.Printf("Request receieved: %s\n", request.Request)response := svc.UserHelpReply{Response: request.Request,}err = stream.Send(&response)if err != nil {return err}}return nil
}func registerServices(s *grpc.Server, h *healthz.Server) {svc.RegisterUsersServer(s, &userService{})healthsvc.RegisterHealthServer(s, h)
}func updateServiceHealth(h *healthz.Server,service string,status healthsvc.HealthCheckResponse_ServingStatus,
) {h.SetServingStatus(service,status,)
}func startServer(s *grpc.Server, l net.Listener) error {return s.Serve(l)
}func main() {listenAddr := os.Getenv("LISTEN_ADDR")if len(listenAddr) == 0 {listenAddr = ":50051"}lis, err := net.Listen("tcp", listenAddr)if err != nil {log.Fatal(err)}s := grpc.NewServer()h := healthz.NewServer()registerServices(s, h)updateServiceHealth(h,svc.Users_ServiceDesc.ServiceName,healthsvc.HealthCheckResponse_SERVING,)log.Fatal(startServer(s, lis))
}

对这段代码进行测试

package mainimport ("context""errors"healthz "google.golang.org/grpc/health""log""net"svc "projectRPCHealthCheck/service"users "projectRPCHealthCheck/service""testing""google.golang.org/grpc""google.golang.org/grpc/codes"healthsvc "google.golang.org/grpc/health/grpc_health_v1""google.golang.org/grpc/status""google.golang.org/grpc/test/bufconn"
)var h *healthz.Serverfunc startTestGrpcServer() *bufconn.Listener {h = healthz.NewServer()l := bufconn.Listen(10)s := grpc.NewServer()registerServices(s, h)updateServiceHealth(h,svc.Users_ServiceDesc.ServiceName,healthsvc.HealthCheckResponse_SERVING,)go func() {log.Fatal(startServer(s, l))}()return l
}func getHealthSvcClient(l *bufconn.Listener) (healthsvc.HealthClient, error) {bufconnDialer := func(ctx context.Context, addr string,) (net.Conn, error) {return l.Dial()}client, err := grpc.DialContext(context.Background(),"", grpc.WithInsecure(),grpc.WithContextDialer(bufconnDialer),)if err != nil {return nil, err}return healthsvc.NewHealthClient(client), nil
}func TestHealthService(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}resp, err := healthClient.Check(context.Background(),&healthsvc.HealthCheckRequest{},)if err != nil {t.Fatal(err)}serviceHealthStatus := resp.Status.String()if serviceHealthStatus != "SERVING" {t.Fatalf("Expected health: SERVING, Got: %s",serviceHealthStatus,)}
}func TestHealthServiceUsers(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}resp, err := healthClient.Check(context.Background(),&healthsvc.HealthCheckRequest{Service: "Users",},)if err != nil {t.Fatal(err)}serviceHealthStatus := resp.Status.String()if serviceHealthStatus != "SERVING" {t.Fatalf("Expected health: SERVING, Got: %s",serviceHealthStatus,)}
}func TestHealthServiceUnknown(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}_, err = healthClient.Check(context.Background(),&healthsvc.HealthCheckRequest{Service: "Repo",},)if err == nil {t.Fatalf("Expected non-nil error, Got nil error")}expectedError := status.Errorf(codes.NotFound, "unknown service",)if !errors.Is(err, expectedError) {t.Fatalf("Expected error %v, Got; %v",err,expectedError,)}
}func TestHealthServiceWatch(t *testing.T) {l := startTestGrpcServer()healthClient, err := getHealthSvcClient(l)if err != nil {t.Fatal(err)}client, err := healthClient.Watch(context.Background(),&healthsvc.HealthCheckRequest{Service: "Users",},)if err != nil {t.Fatal(err)}resp, err := client.Recv()if err != nil {t.Fatalf("Error in Watch: %#v\n", err)}if resp.Status != healthsvc.HealthCheckResponse_SERVING {t.Errorf("Expected SERVING, Got: %#v", resp.Status.String())}updateServiceHealth(h,"Users",healthsvc.HealthCheckResponse_NOT_SERVING,)resp, err = client.Recv()if err != nil {t.Fatalf("Error in Watch: %#v\n", err)}if resp.Status != healthsvc.HealthCheckResponse_NOT_SERVING {t.Errorf("Expected NOT_SERVING, Got: %#v", resp.Status.String())}
}func TestUserService1(t *testing.T) {l := startTestGrpcServer()bufconnDialer := func(ctx context.Context, addr string,) (net.Conn, error) {return l.Dial()}client, err := grpc.DialContext(context.Background(),"", grpc.WithInsecure(),grpc.WithContextDialer(bufconnDialer),)if err != nil {t.Fatal(err)}usersClient := users.NewUsersClient(client)resp, err := usersClient.GetUser(context.Background(),&users.UserGetRequest{Email: "jane@doe.com",Id:    "foo-bar",},)if err != nil {t.Fatal(err)}if resp.User.FirstName != "jane" {t.Errorf("Expected FirstName to be: jane, Got: %s",resp.User.FirstName,)}
}

4.2 服务端处理运行时错误

如果是流式服务

func panicStreamInterceptor(srv interface{},stream grpc.ServerStream,info *grpc.StreamServerInfo,handler grpc.StreamHandler,
) (err error) {defer func() {if r := recover(); r != nil {log.Printf("Panic recovered: %v\n", r)err = status.Error(codes.Internal,"Unexpected error happened",)}}()serverStream := wrappedServerStream{ServerStream: stream,}err = handler(srv, serverStream)return
}

如果是单一服务

func panicUnaryInterceptor(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler,
) (resp interface{}, err error) {defer func() {if r := recover(); r != nil {log.Printf("Panic recovered: %v\n", r)err = status.Error(codes.Internal,"Unexpected error happened",)}}()resp, err = handler(ctx, req)return
}

4.3 服务端终止请求处理

流式


func timeoutStreamInterceptor(srv interface{},stream grpc.ServerStream,info *grpc.StreamServerInfo,handler grpc.StreamHandler,
) error {serverStream := wrappedServerStream{RecvMsgTimeout: 500 * time.Millisecond,ServerStream:   stream,}err := handler(srv, serverStream)return err
}

单一


func timeoutUnaryInterceptor(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler,
) (interface{}, error) {var resp interface{}var err errorctxWithTimeout, cancel := context.WithTimeout(ctx, 300*time.Millisecond)defer cancel()ch := make(chan error)go func() {resp, err = handler(ctxWithTimeout, req)ch <- err}()select {case <-ctxWithTimeout.Done():cancel()err = status.Error(codes.DeadlineExceeded,fmt.Sprintf("%s: Deadline exceeded", info.FullMethod),)return resp, errcase <-ch:}return resp, err
}

4.4 客户端提高连接配置

增加超时机制,就是增加连接配置

func setupGrpcConn(addr string) (*grpc.ClientConn, context.CancelFunc, error) {log.Printf("Connecting to server on %s\n", addr)ctx, cancel := context.WithTimeout(context.Background(),10*time.Second,)conn, err := grpc.DialContext(ctx,addr,grpc.WithInsecure(),grpc.WithBlock(),grpc.FailOnNonTempDialError(true),grpc.WithReturnConnectionError(),)return conn, cancel, err
}

4.5 为方法调用设置超时

用上面的方法context即可

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词