// 建立 應用實體狀態變更監聽器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId(){ return"statusChangeListener"; }
@Override publicvoidnotify(StatusChangeEvent statusChangeEvent){ if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };
// 註冊 應用實體狀態變更監聽器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); }
// InstanceInfoReplicator.java publicbooleanonDemandUpdate(){ if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相關,跳過 scheduler.submit(new Runnable() { @Override publicvoidrun(){ logger.debug("Executing on-demand update of local InstanceInfo"); // 取消任務 Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } // 再次呼叫 InstanceInfoReplicator.this.run(); } }); returntrue; } else { logger.warn("Ignoring onDemand update due to rate limiter"); returnfalse; } }
// ApplicationInfoManager.java publicvoidrefreshDataCenterInfoIfRequired(){ // hostname String existingAddress = instanceInfo.getHostName(); String newAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh data center info, and return up to date address newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true); } else { newAddress = config.getHostName(true); } // ip String newIp = config.getIpAddress(); if (newAddress != null && !newAddress.equals(existingAddress)) { logger.warn("The address changed from : {} => {}", existingAddress, newAddress); // :( in the legacy code here the builder is acting as a mutator. // This is hard to fix as this same instanceInfo instance is referenced elsewhere. // We will most likely re-write the client at sometime so not fixing for now. InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); builder.setHostName(newAddress) // hostname .setIPAddr(newIp) // ip .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo instanceInfo.setIsDirty(); } }
1: publicvoidregister(InstanceInfo registrant, int leaseDuration, boolean isReplication){ 2: try { 3: // 獲取讀鎖 4: read.lock(); 5: Map> gMap = registry.get(registrant.getAppName()); 6: // 增加 註冊次數 到 監控 7: REGISTER.increment(isReplication); 8: // 獲得 應用實體資訊 對應的 租約 9: if (gMap == null) { 10: final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>(); 11: gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 新增 應用 12: if (gMap == null) { // 新增 應用 成功 13: gMap = gNewMap; 14: } 15: } 16: Lease existingLease = gMap.get(registrant.getId()); 17: // Retain the last dirty timestamp without overwriting it, if there is already a lease 18: if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在時,使用資料不一致的時間大的應用註冊資訊為有效的 19: Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 註冊的 InstanceInfo 20: Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 請求的 InstanceInfo 21: logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 22: 23: // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted 24: // InstanceInfo instead of the server local copy. 25: if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 26: logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + 27: " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 28: logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); 29: registrant = existingLease.getHolder(); 30: } 31: } else { 32: // The lease does not exist and hence it is a new registration 33: // 【自我保護機制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin` 34: synchronized (lock) { 35: if (this.expectedNumberOfRenewsPerMin > 0) { 36: // Since the client wants to cancel it, reduce the threshold 37: // (1 38: // for 30 seconds, 2 for a minute) 39: this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; 40: this.numberOfRenewsPerMinThreshold = 41: (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); 42: } 43: } 44: logger.debug("No previous lease information found; it is new registration"); 45: } 46: // 建立 租約 47: Lease lease = new Lease(registrant, leaseDuration); 48: if (existingLease != null) { // 若租約已存在,設定 租約的開始服務的時間戳 49: lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 50: } 51: // 新增到 租約對映 52: gMap.put(registrant.getId(), lease); 53: // 新增到 最近註冊的除錯佇列 54: synchronized (recentRegisteredQueue) { 55: recentRegisteredQueue.add(new Pair( 56: System.currentTimeMillis(), 57: registrant.getAppName() + "(" + registrant.getId() + ")")); 58: } 59: // 新增到 應用實體改寫狀態對映(Eureka-Server 初始化使用) 60: // This is where the initial state transfer of overridden status happens 61: if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 62: logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " 63: + "overrides", registrant.getOverriddenStatus(), registrant.getId()); 64: if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { 65: logger.info("Not found overridden id {} and hence adding it", registrant.getId()); 66: overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); 67: } 68: } 69: InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); 70: if (overriddenStatusFromMap != null) { 71: logger.info("Storing overridden status {} from map", overriddenStatusFromMap); 72: registrant.setOverriddenStatus(overriddenStatusFromMap); 73: } 74: 75: // 獲得應用實體最終狀態,並設定應用實體的狀態 76: // Set the status based on the overridden status rules 77: InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); 78: registrant.setStatusWithoutDirty(overriddenInstanceStatus); 79: 80: // 設定 租約的開始服務的時間戳(只有第一次有效) 81: // If the lease is registered with UP status, set lease service up timestamp 82: if (InstanceStatus.UP.equals(registrant.getStatus())) { 83: lease.serviceUp(); 84: } 85: // 設定 應用實體資訊的操作型別 為 新增 86: registrant.setActionType(ActionType.ADDED); 87: // 新增到 最近租約變更記錄佇列 88: recentlyChangedQueue.add(new RecentlyChangedItem(lease)); 89: // 設定 租約的最後更新時間戳 90: registrant.setLastUpdatedTimestamp(); 91: // 設定 響應快取 過期 92: invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 93: logger.info("Registered instance {}/{} with status {} (replication={})", 94: registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); 95: } finally { 96: // 釋放鎖 97: read.unlock(); 98: } 99: }