// PushContext tracks the status of a push - metrics and errors.// Metrics are reset after a push - at the beginning all// values are zero, and when push completes the status is reset.// The struct is exposed in a debug endpoint - fields public to allow// easy serialization as json.typePushContextstruct{proxyStatusMutexsync.RWMutex// ProxyStatus is keyed by the error code, and holds a map keyed// by the ID.ProxyStatusmap[string]map[string]ProxyPushStatus// Synthesized from env.MeshexportToDefaultsexportToDefaults// ServiceIndex is the index of services by various fields.ServiceIndexserviceIndex// ServiceAccounts contains a map of hostname and port to service accounts.ServiceAccountsmap[host.Name]map[int][]string`json:"-"`// virtualServiceIndex is the index of virtual services by various fields.virtualServiceIndexvirtualServiceIndex// destinationRuleIndex is the index of destination rules by various fields.destinationRuleIndexdestinationRuleIndex// gatewayIndex is the index of gateways.gatewayIndexgatewayIndex// clusterLocalHosts extracted from the MeshConfigclusterLocalHostshost.Names// sidecars for each namespacesidecarsByNamespacemap[string][]*SidecarScope// envoy filters for each namespace including global config namespaceenvoyFiltersByNamespacemap[string][]*EnvoyFilterWrapper// AuthnPolicies contains Authn policies by namespace.AuthnPolicies*AuthenticationPolicies`json:"-"`// AuthzPolicies stores the existing authorization policies in the cluster. Could be nil if there// are no authorization policies in the cluster.AuthzPolicies*AuthorizationPolicies`json:"-"`// The following data is either a global index or used in the inbound path.// Namespace specific views do not apply here.// Mesh configuration for the mesh.Mesh*meshconfig.MeshConfig`json:"-"`// Discovery interface for listing services and instances.ServiceDiscovery`json:"-"`// Config interface for listing routing rulesIstioConfigStore`json:"-"`// PushVersion describes the push version this push context was computed forPushVersionstring// LedgerVersion is the version of the configuration ledgerLedgerVersionstring// cache gateways addresses for each network// this is mainly used for kubernetes multi-cluster scenarionetworksMusync.RWMutexnetworkGatewaysmap[string][]*GatewayinitDoneatomic.BoolinitializeMutexsync.Mutex}
func(s*DiscoveryServer)Stream(streamDiscoveryStream)error{.....// InitContext returns immediately if the context was already initialized.// 当有请求的时候,开始进行初始化,初始化完成后会设置标志位,后续的请求将不会再进行初始化了。iferr=s.globalPushContext().InitContext(s.Env,nil,nil);err!=nil{// Error accessing the data - log and close, maybe a different pilot replica// has more luckadsLog.Warnf("Error reading config %v",err)returnstatus.Error(codes.Unavailable,"error reading config")}con:=newConnection(peerAddr,stream)con.Identities=ids
// Push is called to push changes on config updates using ADS. This is set in DiscoveryService.Push,// to avoid direct dependencies.func(s*DiscoveryServer)Push(req*model.PushRequest){if!req.Full{// 不是全量Push就直接用global push context即可,因为相关资源不需要重新生成,直接对发生变化的资源做更新即可。req.Push=s.globalPushContext()s.AdsPushAll(versionInfo(),req)return}// 如果是全量Push,那么需要重新更新PushContext,这里先保存之前的PushContext方便排查请问题和debug// Reset the status during the push.oldPushContext:=s.globalPushContext()ifoldPushContext!=nil{oldPushContext.OnConfigChange()}// PushContext is reset after a config change. Previous status is// saved.t0:=time.Now()versionLocal:=time.Now().Format(time.RFC3339)+"/"+strconv.FormatUint(versionNum.Inc(),10)// 开始创建新的PushContext,并使用global PushContext进行更新。push,err:=s.initPushContext(req,oldPushContext,versionLocal)iferr!=nil{return}initContextTime:=time.Since(t0)adsLog.Debugf("InitContext %v for push took %s",versionLocal,initContextTime)versionMutex.Lock()version=versionLocalversionMutex.Unlock()req.Push=pushs.AdsPushAll(versionLocal,req)}
// initPushContext creates a global push context and stores it on the environment. Note: while this// method is technically thread safe (there are no data races), it should not be called in parallel;// if it is, then we may start two push context creations (say A, and B), but then write them in// reverse order, leaving us with a final version of A, which may be incomplete.func(s*DiscoveryServer)initPushContext(req*model.PushRequest,oldPushContext*model.PushContext,versionstring)(*model.PushContext,error){// 创建一个新的PushContext,然后用老的PushContext来初始化它push:=model.NewPushContext()push.PushVersion=versioniferr:=push.InitContext(s.Env,oldPushContext,req);err!=nil{adsLog.Errorf("XDS: Failed to update services: %v",err)// We can't push if we can't read the data - stick with previous version.pushContextErrors.Increment()returnnil,err}iferr:=s.UpdateServiceShards(push);err!=nil{returnnil,err}s.updateMutex.Lock()// 最后更新global push contexts.Env.PushContext=pushs.updateMutex.Unlock()returnpush,nil}
// InitContext will initialize the data structures used for code generation.// This should be called before starting the push, from the thread creating// the push context.func(ps*PushContext)InitContext(env*Environment,oldPushContext*PushContext,pushReq*PushRequest)error{// Acquire a lock to ensure we don't concurrently initialize the same PushContext.// If this does happen, one thread will block then exit early from initDone=trueps.initializeMutex.Lock()deferps.initializeMutex.Unlock()ifps.initDone.Load(){returnnil}ps.Mesh=env.Mesh()ps.ServiceDiscovery=env.ServiceDiscoveryps.IstioConfigStore=env.IstioConfigStoreps.LedgerVersion=env.Version()// Must be initialized first// as initServiceRegistry/VirtualServices/Destrules// use the default export mapps.initDefaultExportMaps()// create new or incremental update// 空的Push请求、不存在old push context、或者说old push context还没初始化,又或者push请求不包含任何配置请求// 这些情况都会导致全量的PushContext更新,否则就使用old push context做增量更新ifpushReq==nil||oldPushContext==nil||!oldPushContext.initDone.Load()||len(pushReq.ConfigsUpdated)==0{iferr:=ps.createNewContext(env);err!=nil{returnerr}}else{// 增量更新iferr:=ps.updateContext(env,oldPushContext,pushReq);err!=nil{returnerr}}// TODO: only do this when meshnetworks or gateway service changedps.initMeshNetworks(env.Networks())ps.initClusterLocalHosts(env)ps.initDone.Store(true)returnnil}
func(ps*PushContext)createNewContext(env*Environment)error{iferr:=ps.initServiceRegistry(env);err!=nil{returnerr}iferr:=ps.initVirtualServices(env);err!=nil{returnerr}iferr:=ps.initDestinationRules(env);err!=nil{returnerr}iferr:=ps.initAuthnPolicies(env);err!=nil{returnerr}iferr:=ps.initAuthorizationPolicies(env);err!=nil{authzLog.Errorf("failed to initialize authorization policies: %v",err)returnerr}iferr:=ps.initEnvoyFilters(env);err!=nil{returnerr}iferr:=ps.initGateways(env);err!=nil{returnerr}// Must be initialized in the endiferr:=ps.initSidecarScopes(env);err!=nil{returnerr}returnnil}