博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
从零开始实现一个RPC框架(三)
阅读量:6581 次
发布时间:2019-06-24

本文共 4899 字,大约阅读时间需要 16 分钟。

前言

到目前为止我们的框架已经有了一部分服务治理的功能,这次我们在之前的基础上实现一些其他功能。篇幅所限这里只列举部分实现,完整代码参考:

zookeeper注册中心

实现我们之前的注册中心的接口即可,这里使用了docker的libkv而不是直接用zk客户端(从那学的),libkv封装了对于几种存储服务的操作,包括Consul、Etcd、Zookeeper和BoltDB,后续如果要支持其他类型的存储就得自己写客户端了。基于zk的注册中心的定义如下:

type ZookeeperRegistry struct {	AppKey         string //一个ZookeeperRegistry实例和一个appkey关联	ServicePath    string //数据存储的基本路径位置,比如/service/providers	UpdateInterval time.Duration //定时拉取数据的时间间隔	kv store.Store //封装过的zk客户端	providersMu sync.RWMutex	providers   []registry.Provider //本地缓存的列表	watchersMu sync.Mutex	watchers   []*Watcher //watcher列表}复制代码

初始化部分逻辑如下:

func NewZookeeperRegistry(AppKey string, ServicePath string, zkAddrs []string,	updateInterval time.Duration, cfg *store.Config) registry.Registry {	zk := new(ZookeeperRegistry)	zk.AppKey = AppKey	zk.ServicePath = ServicePath	zk.UpdateInterval = updateInterval	kv, err := libkv.NewStore(store.ZK, zkAddrs, cfg)	if err != nil {		log.Fatalf("cannot create zk registry: %v", err)	}	zk.kv = kv	basePath := zk.ServicePath	if basePath[0] == '/' { //路径不能以"/"开头		basePath = basePath[1:]		zk.ServicePath = basePath	}	//先创建基本路径	err = zk.kv.Put(basePath, []byte("base path"), &store.WriteOptions{IsDir: true})	if err != nil {		log.Fatalf("cannot create zk path %s: %v", zk.ServicePath, err)	}	//显式拉取第一次数据	zk.doGetServiceList()	go func() {		t := time.NewTicker(updateInterval)		for range t.C {			//定时拉取数据			zk.doGetServiceList()		}	}()	go func() {		//后台watch数据		zk.watch()	}()	return zk}复制代码

我们在初始化注册中心时执行两个后台任务:定时拉取和监听数据,相当于推拉结合的方式。同时监听获得的数据是全量数据,因为实现起来简单一些,后续如果服务列表越来越大时,可能需要加上基于版本号的机制或者只传输增量数据。这里额外指出几个要点:

  1. 后台定时拉取数据并缓存起来
  2. 查询时直接返回缓存
  3. 注册时在zk添加节点,注销时在zk删除节点
  4. 监听时并不监听每个服务提供者,而是监听其父级目录,有变更时再统一拉取服务提供者列表,这样可以减少watcher的数目,逻辑也更简单一些
  5. 因为第4点,所以注册和注销时需要更改父级目录的内容(lastUpdate)来触发监听

具体的注册注销逻辑这里不再列举,参考:

客户端心跳

如果我们使用zk作为注册中心,更简单的做法可能是直接将服务提供者作为临时节点添加到zk上,这样就可以利用临时节点的特性实现动态的服务发现。但是我们使用的libkv库并不支持临时节点的功能,而且除了zk其他存储服务比如etcd等可能也不支持临时节点的特性,所以我们注册到注册中心的都是持久节点。在这种情况下,可能某些由于特殊情况无法访问的服务提供者并没有及时地将自身从注册中心注销掉,所以客户端需要额外的能力来判断一个服务提供者是否可用,而不是完全依赖注册中心。

所以我们需要增加客户端心跳的支持,客户端可以定时向服务端发送心跳请求,服务端收到心跳请求时可以直接返回,只要通知客户端自身仍然可用就行。客户端可以根据设置的阈值,对心跳失败的服务提供者进行降级处理,直到心跳恢复或者服务提供者被注销掉。客户端发送心跳逻辑如下:

func (c *sgClient) heartbeat() {	if c.option.HeartbeatInterval <= 0 {		return	}	//根据指定的时间间隔发送心跳	t := time.NewTicker(c.option.HeartbeatInterval)	for range t.C {		if c.shutdown {			t.Stop()			return		}		//遍历每个RPCClient进行心跳检查		c.clients.Range(func(k, v interface{}) bool {			err := v.(RPCClient).Call(context.Background(), "", "", nil)			c.mu.Lock()			if err != nil {				//心跳失败进行计数				if fail, ok := c.clientsHeartbeatFail[k.(string)]; ok {					fail++					c.clientsHeartbeatFail[k.(string)] = fail				} else {					c.clientsHeartbeatFail[k.(string)] = 1				}			} else {				//心跳成功则进行恢复				c.clientsHeartbeatFail[k.(string)] = 0				c.serversMu.Lock()				for i, p := range c.servers {					if p.ProviderKey == k {						delete(c.servers[i].Meta, protocol.ProviderDegradeKey)					}				}				c.serversMu.Unlock()			}			c.mu.Unlock()			//心跳失败次数超过阈值则进行降级			if c.clientsHeartbeatFail[k.(string)] > c.option.HeartbeatDegradeThreshold {				c.serversMu.Lock()				for i, p := range c.servers {					if p.ProviderKey == k {						c.servers[i].Meta[protocol.ProviderDegradeKey] = true					}				}				c.serversMu.Unlock()			}			return true		})	}}复制代码

鉴权

鉴权的实现比较简单,客户端可以在元数据中携带鉴权相关的信息,而服务端可以通过指定的Wrapper进行鉴权。服务端Wrapper的代码如下:

type AuthFunc func(key string) booltype ServerAuthInterceptor struct {	authFunc AuthFunc}func NewAuthInterceptor(authFunc AuthFunc) Wrapper {	return &ServerAuthInterceptor{authFunc}}func (sai *ServerAuthInterceptor) WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc {	return func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport) {		if auth, ok := ctx.Value(protocol.AuthKey).(string); ok {			//鉴权通过则执行业务逻辑			if sai.authFunc(auth) {				requestFunc(ctx, response, response, tr)				return			}		}		//鉴权失败则返回异常		s.writeErrorResponse(response, tr, "auth failed")	}}复制代码

熔断降级

暂时实现了简单的基于时间窗口的熔断器,实现如下:

type CircuitBreaker interface {	AllowRequest() bool	Success()	Fail(err error)}type DefaultCircuitBreaker struct {	lastFail  time.Time	fails     uint64	threshold uint64	window    time.Duration}func NewDefaultCircuitBreaker(threshold uint64, window time.Duration) *DefaultCircuitBreaker {	return &DefaultCircuitBreaker{		threshold: threshold,		window:    window,	}}func (cb *DefaultCircuitBreaker) AllowRequest() bool {	if time.Since(cb.lastFail) > cb.window {		cb.reset()		return true	}	failures := atomic.LoadUint64(&cb.fails)	return failures < cb.threshold}func (cb *DefaultCircuitBreaker) Success() {	cb.reset()}func (cb *DefaultCircuitBreaker) Fail() {	atomic.AddUint64(&cb.fails, 1)	cb.lastFail = time.Now()}func (cb *DefaultCircuitBreaker) reset() {	atomic.StoreUint64(&cb.fails, 0)	cb.lastFail = time.Now()}复制代码

结语

这次的内容就到此为止,有任何意见或者建议欢迎指正。

历史链接

转载于:https://juejin.im/post/5c978371e51d452e0c338552

你可能感兴趣的文章
POJ2229 Sumsets
查看>>
在LINQ-TO-SQL中实现“级联删除”的方法
查看>>
lemur run PLSA
查看>>
HTTP中的header头解析说明
查看>>
MVC3.0原理学习及总结
查看>>
删除windows中的库、家庭组、收藏夹
查看>>
war 宽度变窄
查看>>
set p4 environment in windows
查看>>
pl/sql development 查询的数据复制到excel
查看>>
自定义指令的参数
查看>>
python实现进度条
查看>>
Android 一个应用启动另一个应用的说明
查看>>
阿里云CentOS7服务器利用LVM分区挂载磁盘全记录
查看>>
Setting up the Web Admin Tool in LDAP 6.x to communicate via SSL
查看>>
SQL好习惯:编写支持可搜索的SQL
查看>>
Shadowbox
查看>>
【 程 序 员 】:伤不起的三十岁,你还有多远 ?
查看>>
openldap安装
查看>>
[leetcode]count and say
查看>>
润乾报表 - 缓存问题
查看>>