配置更新分析
-
MeshConfig
、MeshNetworkConfig
配置发生变更会触发Full Push1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// initMeshHandlers initializes mesh and network handlers. func (s *Server) initMeshHandlers() { log.Info("initializing mesh handlers") // When the mesh config or networks change, do a full push. s.environment.AddMeshHandler(func() { spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain()) s.XDSServer.ConfigGenerator.MeshConfigChanged(s.environment.Mesh()) s.XDSServer.ConfigUpdate(&model.PushRequest{ Full: true, Reason: []model.TriggerReason{model.GlobalUpdate}, }) }) s.environment.AddNetworksHandler(func() { s.XDSServer.ConfigUpdate(&model.PushRequest{ Full: true, Reason: []model.TriggerReason{model.GlobalUpdate}, }) }) }
-
服务更新,配置更新,最后都会调用ConfigUpdate接口,并传递PushRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
// initRegistryEventHandlers sets up event handlers for config and service updates func (s *Server) initRegistryEventHandlers() { log.Info("initializing registry event handlers") // Flush cached discovery responses whenever services configuration change. serviceHandler := func(svc *model.Service, _ model.Event) { pushReq := &model.PushRequest{ Full: true, ConfigsUpdated: map[model.ConfigKey]struct{}{{ Kind: gvk.ServiceEntry, Name: string(svc.Hostname), Namespace: svc.Attributes.Namespace, }: {}}, Reason: []model.TriggerReason{model.ServiceUpdate}, } s.XDSServer.ConfigUpdate(pushReq) } s.ServiceController().AppendServiceHandler(serviceHandler) if s.configController != nil { configHandler := func(old config.Config, curr config.Config, event model.Event) { ...... pushReq := &model.PushRequest{ Full: true, ConfigsUpdated: map[model.ConfigKey]struct{}{{ Kind: curr.GroupVersionKind, Name: curr.Name, Namespace: curr.Namespace, }: {}}, Reason: []model.TriggerReason{model.ConfigUpdate}, } s.XDSServer.ConfigUpdate(pushReq) ..... } ..... } }
无论是什么配置,最后的入口都是
ConfigUpdate
1
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest)
-
通过debounce后,最终放入到推送队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) { inboundConfigUpdates.Increment() s.InboundUpdates.Inc() s.pushChannel <- req } // 通过pushChannel触发debounce // debounce最终会触发Push的调用 func (s *DiscoveryServer) handleUpdates(stopCh <-chan struct{}) { debounce(s.pushChannel, stopCh, s.debounceOptions, s.Push, s.CommittedUpdates) } // Push is called to push changes on config updates using ADS. This is set in DiscoveryService.Push, // to avoid direct dependencies. // Push的最后会调用AdsPushAll,并携带PushContext func (s *DiscoveryServer) Push(req *model.PushRequest) { // 非Full Push的场景,直接用全局的PushContext if !req.Full { req.Push = s.globalPushContext() s.AdsPushAll(versionInfo(), req) return } // Full Push场景则用Global Push COntext重新生成新的PushContext // Reset the status during the push. oldPushContext := s.globalPushContext() if oldPushContext != nil { oldPushContext.OnConfigChange() } // PushContext is reset after a config change. Previous status is // saved. t0 := time.Now() versionLocal := time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(versionNum.Inc(), 10) push, err := s.initPushContext(req, oldPushContext, versionLocal) if err != nil { return } initContextTime := time.Since(t0) // 输出Context初始化花费的时间 adsLog.Debugf("InitContext %v for push took %s", versionLocal, initContextTime) versionMutex.Lock() version = versionLocal versionMutex.Unlock() req.Push = push s.AdsPushAll(versionLocal, req) } // AdsPushAll最终会去调用startPush // AdsPushAll implements old style invalidation, generated when any rule or endpoint changes. // Primary code path is from v1 discoveryService.clearCache(), which is added as a handler // to the model ConfigStorageCache and Controller. func (s *DiscoveryServer) AdsPushAll(version string, req *model.PushRequest) { // If we don't know what updated, cannot safely cache. Clear the whole cache if len(req.ConfigsUpdated) == 0 { s.Cache.ClearAll() } else { // Otherwise, just clear the updated configs s.Cache.Clear(req.ConfigsUpdated) } if !req.Full { adsLog.Infof("XDS: Incremental Pushing:%s ConnectedEndpoints:%d Version:%s", version, s.adsClientCount(), req.Push.PushVersion) } else { totalService := len(req.Push.Services(nil)) adsLog.Infof("XDS: Pushing:%s Services:%d ConnectedEndpoints:%d Version:%s", version, totalService, s.adsClientCount(), req.Push.PushVersion) monServices.Record(float64(totalService)) // Make sure the ConfigsUpdated map exists if req.ConfigsUpdated == nil { req.ConfigsUpdated = make(map[model.ConfigKey]struct{}) } } s.startPush(req) } // 最终将PushRequest放入到请求队列中 // Send a signal to all connections, with a push event. func (s *DiscoveryServer) startPush(req *model.PushRequest) { // Push config changes, iterating over connected envoys. This cover ADS and EDS(0.7), both share // the same connection table if adsLog.DebugEnabled() { currentlyPending := s.pushQueue.Pending() if currentlyPending != 0 { adsLog.Infof("Starting new push while %v were still pending", currentlyPending) } } req.Start = time.Now() for _, p := range s.AllClients() { s.pushQueue.Enqueue(p, req) } }
// 最终doSendPushes会通过concurrentPushLimit来处理PushRequest // 最多可以同时从Queue中获取到100个数据,然后通过协程构建PushEvent然后通过pushChannel,发给对应client的连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
|
每次收到一个Stream就开启for循环,监听配置更新channel、和请求的Channel,一方面收到请求的时候需要Push 另外一方面收到配置更新的时候也需要进行Push。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
// 最终调用pushConnection给一个client进行Push推送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
|
如何确定一个配置变更是否需要推送给某个Proxy?
- 首先无论是任何配置,在发生变更的时候都会带上GVK。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|