本文目录
- 1. 书接上回
- 2. 引入etcd
- discovery
- struct{}{}
- resolver
- server
- 3. 将服务注册到etcd中
- 4. 梳理下etcd调用逻辑
1. 书接上回
本节是为项目引入etcd这个环节,然后我们来看看具体该怎么实现。
首先来谈谈为什么要引入服务发现?
动态服务注册与发现
:微服务系统通常由多个服务组成,这些服务可能分布在不同的机器上,并且可能会动态地启动或停止。etcd 提供了一个集中化的存储,服务实例可以在启动时向 etcd 注册自己的信息(如 IP 地址、端口、健康状态等),并在停止时注销。客户端可以通过 etcd 动态地发现可用的服务实例,从而实现高可用性和弹性扩展。
高可用性和容错性
:在分布式系统中,服务实例可能会因为各种原因(如机器故障、网络问题等)变得不可用。etcd 通过其高可用性设计(如 Raft 协议)确保服务注册信息的一致性和可靠性。即使部分节点故障,etcd 集群仍然可以正常工作,从而保证服务发现的高可用性。
配置管理
:除了服务发现,etcd 还可以用于配置管理。分布式系统中的配置信息(如数据库地址、API 密钥等)可以存储在 etcd 中,并且可以动态更新。客户端可以通过监听 etcd 中的配置变化,实时获取最新的配置信息,从而实现配置的动态更新而无需重启服务。
之前我们是直接写入gRPC的地址,那么现在需要引入etcd,就可以实现服务发现,我们只需要监听etcd就可以了,如果服务地址变了,那么就能够立即发现。会更加方便一些。
2. 引入etcd
在api下和user下、common下都需要安装etcd的依赖。
go get go.etcd.io/etcd/client/v3
接下来写服务发现的代码,目前我们先实现单机版的etcd,来看看具体怎么实现。
首先需要启动etcd,这里我已经提前下载好并且运行起来了,简单尝试一下。
所以用法就是,把服务注册进etcd,然后监听etcd,如果有变化,立马进行变动即可。
discovery
package discoveryimport ("context""encoding/json""errors""net/http""strconv""strings""time"clientv3 "go.etcd.io/etcd/client/v3""go.uber.org/zap"
)// Register for grpc server
type Register struct {EtcdAddrs []stringDialTimeout intcloseCh chan struct{}leasesID clientv3.LeaseIDkeepAliveCh <-chan *clientv3.LeaseKeepAliveResponsesrvInfo ServersrvTTL int64cli *clientv3.Clientlogger *zap.Logger
}// NewRegister create a register base on etcd
func NewRegister(etcdAddrs []string, logger *zap.Logger) *Register {return &Register{EtcdAddrs: etcdAddrs,DialTimeout: 3,logger: logger,}
}// Register a service
func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {var err errorif strings.Split(srvInfo.Addr, ":")[0] == "" {return nil, errors.New("invalid ip")}if r.cli, err = clientv3.New(clientv3.Config{Endpoints: r.EtcdAddrs,DialTimeout: time.Duration(r.DialTimeout) * time.Second,}); err != nil {return nil, err}r.srvInfo = srvInfor.srvTTL = ttlif err = r.register(); err != nil {return nil, err}r.closeCh = make(chan struct{})go r.keepAlive()return r.closeCh, nil
}// Stop stop register
func (r *Register) Stop() {r.closeCh <- struct{}{}
}// register 注册节点
func (r *Register) register() error {leaseCtx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)defer cancel()leaseResp, err := r.cli.Grant(leaseCtx, r.srvTTL)if err != nil {return err}r.leasesID = leaseResp.IDif r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), leaseResp.ID); err != nil {return err}data, err := json.Marshal(r.srvInfo)if err != nil {return err}_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))return err
}// unregister 删除节点
func (r *Register) unregister() error {_, err := r.cli.Delete(context.Background(), BuildRegPath(r.srvInfo))return err
}// keepAlive
func (r *Register) keepAlive() {ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)for {select {case <-r.closeCh:if err := r.unregister(); err != nil {r.logger.Error("unregister failed", zap.Error(err))}if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {r.logger.Error("revoke failed", zap.Error(err))}returncase res := <-r.keepAliveCh:if res == nil {if err := r.register(); err != nil {r.logger.Error("register failed", zap.Error(err))}}case <-ticker.C:if r.keepAliveCh == nil {if err := r.register(); err != nil {r.logger.Error("register failed", zap.Error(err))}}}}
}// UpdateHandler return http handler
func (r *Register) UpdateHandler() http.HandlerFunc {return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {wi := req.URL.Query().Get("weight")weight, err := strconv.Atoi(wi)if err != nil {w.WriteHeader(http.StatusBadRequest)w.Write([]byte(err.Error()))return}var update = func() error {r.srvInfo.Weight = int64(weight)data, err := json.Marshal(r.srvInfo)if err != nil {return err}_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))return err}if err := update(); err != nil {w.WriteHeader(http.StatusInternalServerError)w.Write([]byte(err.Error()))return}w.Write([]byte("update server weight success"))})
}func (r *Register) GetServerInfo() (Server, error) {resp, err := r.cli.Get(context.Background(), BuildRegPath(r.srvInfo))if err != nil {return r.srvInfo, err}info := Server{}if resp.Count >= 1 {if err := json.Unmarshal(resp.Kvs[0].Value, &info); err != nil {return info, err}}return info, nil
}
接下来看看discovery中的各个部分含义和作用。
Register
是一个结构体,用于管理服务的注册和更新。
EtcdAddrs
:etcd 服务的地址列表、DialTimeout
:连接 etcd 的超时时间。closeCh
:一个关闭通道,用于停止注册流程。leasesID
:etcd 的租约 ID,用于保持服务的存活状态。keepAliveCh
:租约续期的响应通道。srvInfo
:服务的详细信息。srvTTL
:服务的存活时间(TTL)。cli
:etcd 客户端实例。logger
:日志记录器。
NewRegister
是一个构造函数,用于创建一个新的 Register
实例。接收 etcd 地址列表和日志记录器作为参数。默认设置连接超时时间为 3 秒。
Register
方法用于在 etcd 中注册服务,接收服务信息 srvInfo
和存活时间 ttl
作为参数。首先检查服务地址是否有效,然后创建 etcd 客户端。
调用 register 方法将服务信息注册到 etcd。启动一个后台协程 keepAlive
,用于保持服务的存活状态。返回一个关闭通道,用于停止注册流程。
Stop 方法用于停止注册流程。向关闭通道发送一个信号,通知后台协程停止运行。
struct{}{}
这里有两个{},来看看是为什么。
struct{} 是类型声明:定义了一个空结构体类型,不包含任何字段,占用 0 字节内存。
第二个 {} 是实例化:创建一个空结构体的实例,相当于 new(struct{})
。
这种写法常用于通道信号传递,因为只需要一个信号,不需要传递具体的数据,并且孔结构体不占用内存,效率高,常用于停止信号、同步信号等场景。
register
方法用于将服务信息注册到 etcd。首先创建一个租约,然后将服务信息序列化为 JSON 格式。使用 Put 方法将服务信息存储到 etcd,并将其与租约关联。
unregister
方法用于从 etcd 中删除服务信息,使用 Delete 方法
删除服务对应的键值对。
keepAlive
方法用于保持服务的存活状态。使用一个定时器和租约续期通道,定期检查租约状态。如果收到关闭信号,调用 unregister 方法删除服务信息,并撤销租约。如果租约续期失败或超时,重新调用 register 方法注册服务。
UpdateHandler
方法返回一个 HTTP 处理器,用于更新服务的权重。通过 HTTP 请求的查询参数获取新的权重值,然后更新服务信息并重新注册到 etcd。
GetServerInfo
方法用于从 etcd 中获取服务信息。使用 Get 方法查询服务对应的键值对,并反序列化为服务信息结构体。
在etcd中,租约(Lease) 是一种机制
,用于确保服务实例的注册信息在一定时间内有效。如果服务实例在租约到期前没有续期,那么注册信息会被自动删除。这种机制可以防止服务实例在故障或网络问题后仍然被客户端调用。
租约(Lease) 是一种机制,它允许你为某些键值对设置有效期。租约的作用是确保在一定时间内,某个特定的键值对不会被意外删除或者修改,同时也可以在租约到期后自动删除。
自动过期
:当一个键值对绑定了一个租约时,该键值对会在租约到期后自动删除。这个特性对于管理短期或临时数据(例如服务发现中的节点信息)非常有用。
防止僵尸数据
:如果某个服务崩溃或失效,未能在租约到期前刷新租约,那么租约绑定的键值对将会自动过期,防止僵尸数据长期占用资源。
服务发现
:通常在分布式系统中,服务注册时会绑定租约。如果服务失效或宕机,绑定该服务的键值对会在租约到期后自动删除,其他服务能够及时感知。
举个例子来说明下,假设有一个分布式系统中的服务需要定期将自己的健康状况注册到 etcd 中,作为服务发现的一部分。服务会在启动时向 etcd 注册自己的信息,并设置一个租约,例如设置租约为10秒。
- 如果服务正常运行,它会在10秒内刷新租约。
- 如果服务崩溃或停止,它就无法刷新租约。
- 10秒后,etcd 会发现该服务的租约已过期,并删除与该服务相关的键值对,其他节点就不再看到该服务的信息。
这种机制确保了如果服务不再可用,它的注册信息会被及时清除,从而避免系统中有过期的服务信息。
resolver
package discoveryimport ("context""go.etcd.io/etcd/api/v3/mvccpb"clientv3 "go.etcd.io/etcd/client/v3""go.uber.org/zap""google.golang.org/grpc/resolver""time"
)const (schema = "etcd"
)// Resolver for grpc client
type Resolver struct {schema stringEtcdAddrs []stringDialTimeout intcloseCh chan struct{}watchCh clientv3.WatchChancli *clientv3.ClientkeyPrifix stringsrvAddrsList []resolver.Addresscc resolver.ClientConnlogger *zap.Logger
}// NewResolver create a new resolver.Builder base on etcd
func NewResolver(etcdAddrs []string, logger *zap.Logger) *Resolver {return &Resolver{schema: schema,EtcdAddrs: etcdAddrs,DialTimeout: 3,logger: logger,}
}// Scheme returns the scheme supported by this resolver.
func (r *Resolver) Scheme() string {return r.schema
}// Build creates a new resolver.Resolver for the given target
func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {r.cc = ccr.keyPrifix = BuildPrefix(Server{Name: target.URL.Host, Version: target.URL.Path})if _, err := r.start(); err != nil {return nil, err}return r, nil
}// ResolveNow resolver.Resolver interface
func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {}// Close resolver.Resolver interface
func (r *Resolver) Close() {r.closeCh <- struct{}{}
}// start
func (r *Resolver) start() (chan<- struct{}, error) {var err errorr.cli, err = clientv3.New(clientv3.Config{Endpoints: r.EtcdAddrs,DialTimeout: time.Duration(r.DialTimeout) * time.Second,})if err != nil {return nil, err}resolver.Register(r)r.closeCh = make(chan struct{})if err = r.sync(); err != nil {return nil, err}go r.watch()return r.closeCh, nil
}// watch update events
func (r *Resolver) watch() {ticker := time.NewTicker(time.Minute)r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix())for {select {case <-r.closeCh:returncase res, ok := <-r.watchCh:if ok {r.update(res.Events)}case <-ticker.C:if err := r.sync(); err != nil {r.logger.Error("sync failed", zap.Error(err))}}}
}// update
func (r *Resolver) update(events []*clientv3.Event) {for _, ev := range events {var info Servervar err errorswitch ev.Type {case mvccpb.PUT:info, err = ParseValue(ev.Kv.Value)if err != nil {continue}addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}if !Exist(r.srvAddrsList, addr) {r.srvAddrsList = append(r.srvAddrsList, addr)r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})}case mvccpb.DELETE:info, err = SplitPath(string(ev.Kv.Key))if err != nil {continue}addr := resolver.Address{Addr: info.Addr}if s, ok := Remove(r.srvAddrsList, addr); ok {r.srvAddrsList = sr.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})}}}
}// sync 同步获取所有地址信息
func (r *Resolver) sync() error {ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)defer cancel()res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix())if err != nil {return err}r.srvAddrsList = []resolver.Address{}for _, v := range res.Kvs {info, err := ParseValue(v.Value)if err != nil {continue}addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}r.srvAddrsList = append(r.srvAddrsList, addr)}r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})return nil
}
总的来说,实现一个基于 etcd 的 gRPC 客户端解析器(resolver)
,用于动态发现和更新服务地址。它允许 gRPC 客户端根据 etcd 中的注册信息动态调整连接的目标地址。
Resolver
是一个结构体,用于管理 gRPC 客户端解析器。
包含以下字段:
schema
:解析器支持的协议前缀。
EtcdAddrs
:etcd 服务的地址列表。
DialTimeout
:连接 etcd 的超时时间。
closeCh
:一个关闭通道,用于停止解析器。
watchCh
:etcd 的监听通道,用于接收 etcd 的变更事件。
cli
:etcd 客户端实例。
keyPrifix
:etcd 中存储服务信息的键前缀。
srvAddrsList
:当前已知的服务地址列表。
cc
:gRPC 客户端连接。
logger
:日志记录器。
NewResolver
是一个构造函数,用于创建一个新的 Resolver 实例
。接收 etcd 地址列表和日志记录器作为参数。默认设置连接超时时间为 3 秒。
Scheme 方法
返回解析器支持的协议前缀。在 gRPC 解析器接口中,Scheme 方法用于标识解析器支持的协议(如 etcd://
)。
Build 方法
用于创建一个新的 gRPC 解析器实例。接收目标地址、客户端连接和构建选项作为参数。根据目标地址构建 etcd 的键前缀,并启动解析器。返回解析器实例或错误。
ResolveNow 方法
是 gRPC 解析器接口的一部分,用于触发解析器的即时解析。在这个实现中,ResolveNow 方法为空,因为解析器通过监听 etcd 的变更事件动态更新服务地址。
start 方法
用于启动解析器,首先创建 etcd 客户端实例
,并注册解析器,启动一个后台协程用于监听 etcd 的变更事件,返回关闭通道或错误。
watch 方法
用于监听 etcd 的变更事件。使用 Watch 方法监听 etcd 中的键前缀变化。
如果收到关闭信号,停止监听。如果收到变更事件,调用 update 方法更新服务地址。
定期调用 sync 方法
同步服务地址。
update 方法
用于处理 etcd 的变更事件。遍历事件列表,根据事件类型(PUT 或 DELETE)更新服务地址列表。如果是 PUT 事件,解析服务信息并添加到地址列表。如果是 DELETE 事件,从地址列表中移除服务地址。更新 gRPC 客户端连接的状态。
sync 方法
用于同步获取 etcd 中的所有服务地址信息。使用 Get 方法查询 etcd 中的键前缀。解析查询结果,构建服务地址列表。更新 gRPC 客户端连接的状态。
也就是这实现了一个基于 etcd 的 gRPC 客户端解析器,用于动态发现和更新服务地址。它通过监听 etcd 的变更事件,实时更新 gRPC 客户端的连接目标地址。
server
package discoveryimport ("encoding/json""errors""fmt""strings""google.golang.org/grpc/resolver"
)type Server struct {Name string `json:"name"`Addr string `json:"addr"` //服务地址Version string `json:"version"` //服务版本Weight int64 `json:"weight"` //服务权重
}func BuildPrefix(info Server) string {if info.Version == "" {return fmt.Sprintf("/%s/", info.Name)}return fmt.Sprintf("/%s/%s/", info.Name, info.Version)
}func BuildRegPath(info Server) string {return fmt.Sprintf("%s%s", BuildPrefix(info), info.Addr)
}func ParseValue(value []byte) (Server, error) {info := Server{}if err := json.Unmarshal(value, &info); err != nil {return info, err}return info, nil
}func SplitPath(path string) (Server, error) {info := Server{}strs := strings.Split(path, "/")if len(strs) == 0 {return info, errors.New("invalid path")}info.Addr = strs[len(strs)-1]return info, nil
}// Exist helper function
func Exist(l []resolver.Address, addr resolver.Address) bool {for i := range l {if l[i].Addr == addr.Addr {return true}}return false
}// Remove helper function
func Remove(s []resolver.Address, addr resolver.Address) ([]resolver.Address, bool) {for i := range s {if s[i].Addr == addr.Addr {s[i] = s[len(s)-1]return s[:len(s)-1], true}}return nil, false
}func BuildResolverUrl(app string) string {return schema + ":///" + app
}
简单说说,首先代码定义了一个 Server 结构体,
用于表示服务的基本信息。它包含服务的名称、地址、版本和权重
。这些字段在服务发现和负载均衡
中非常关键,例如,权重可以用于控制服务的流量分配
。
BuildPrefix 函数
用于根据服务信息构建一个路径前缀。如果服务版本为空,路径前缀将只包含服务名称;否则,它将包含服务名称和版本。这个前缀用于在 etcd 或类似的存储系统中组织服务信息。
BuildRegPath 函数
进一步扩展了 BuildPrefix 的功能
,它通过在前缀后面添加服务地址,生成一个完整的注册路径。这个路径可以用于在 etcd 中存储服务实例的具体信息。
ParseValue 函数
的作用是从 etcd 中获取的字节数据中解析出服务信息。它使用 json.Unmarshal 将字节数据反序列化为 Server 结构体
。如果解析过程中出现错误,它会返回错误信息。
SplitPath 函数
则用于从路径中提取服务地址。它通过分割路径字符串来获取地址部分。如果路径格式不正确,它会返回一个错误。
Exist 函数
用于检查一个地址是否已经存在于地址列表中,这对于避免重复添加服务地址非常有用。Remove 函数
则用于从地址列表中移除一个特定的地址,这在服务下线或更新时非常有用。
3. 将服务注册到etcd中
在user下的router.go中把服务注册到etcd中去,代码如下:
func RegisterEtcdServer() {etcdRegister := discovery.NewResolver(config.C.EtcdConfig.Addrs, logs.LG)resolver.Register(etcdRegister)info := discovery.Server{Name: config.C.GC.Name,Addr: config.C.GC.Addr,Version: config.C.GC.Version,Weight: config.C.GC.Weight,}r := discovery.NewRegister(config.C.EtcdConfig.Addrs, logs.LG)_, err := r.Register(info, 2)if err != nil {log.Fatalln(err)}
}
创建 gRPC 客户端解析器 (etcd resolver):这部分的作用是 客户端 用来解析和查找服务的位置(如 IP 和端口)。在分布式系统中,客户端通常不知道服务的具体地址,因此它需要一个 解析器,它会向 etcd 注册中心询问目标服务的地址信息。这样可以确保客户端在不同的服务器间寻找服务时能够动态地获取服务位置。
这里的 discovery.NewResolver
创建了一个新的 etcd 解析器,它使用 etcd 存储服务的位置。该解析器注册到 gRPC 中 (resolver.Register)
,使得客户端能够通过它查找服务。这一步是 客户端 侧的操作,客户端通过解析器可以在 etct 中查询并获得服务的信息。
注册服务端:这部分是 服务端 将自身的信息注册到 etcd 中,供客户端发现。它会向 etcd 注册服务的信息(如名称、地址、版本等),使得客户端能够基于这些信息去访问和调用服务。
discovery.NewRegister
创建了一个新的 注册器,用于将服务信息注册到 etcd。
r.Register(info, 2)
将服务信息 info 注册到 etcd 中,并设置一个租约,表示该服务信息在 2 秒内有效。租约过期后,服务信息会从 etcd 中自动删除。
4. 梳理下etcd调用逻辑
可以这么理解,也就是gRPC内部的解析注册表是m,m = make(map[string]Builder)
,string是key,表示协议方案scheme
,比如etcd、dns
等,Builder是value
,是一个接口类型,用于构建解析器,我们的Resolver结构体就实现了这个接口
。
不同的服务发现机制etcd、consul等会注册自己的解析器,gRPC根据地址中的scheme找到对应的解析器,解析器负责将服务器名转回为实际地址。