Orchestrator Failover过程源码分析-II

Orchestrator Failover过程源码分析-II

书接上文Orchestrator Failover过程源码分析-I

DeadMaster恢复流程

首先通过getCheckAndRecoverFunction获取”checkAndRecoverFunction”

1
2
3
4
5
6
7
8
9
10
11
12
func getCheckAndRecoverFunction(analysisCode inst.AnalysisCode, analyzedInstanceKey *inst.InstanceKey) (
checkAndRecoverFunction func(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error),
isActionableRecovery bool,
) {
switch analysisCode {
// master
case inst.DeadMaster, inst.DeadMasterAndSomeReplicas: // 如果analysisCode是DeadMaster 或 DeadMasterAndSomeReplicas
if isInEmergencyOperationGracefulPeriod(analyzedInstanceKey) { // 首先判断是否处于 EmergencyOperationGracefulPeriod
return checkAndRecoverGenericProblem, false // 如果处于EmergencyOperationGracefulPeriod, 则又相当于啥也没干, 等下一轮recoverTick
} else {
return checkAndRecoverDeadMaster, true
}

这里先判断isInEmergencyOperationGracefulPeriod
1
2
3
4
func isInEmergencyOperationGracefulPeriod(instanceKey *inst.InstanceKey) bool {
_, found := emergencyOperationGracefulPeriodMap.Get(instanceKey.StringCode()) // emergencyOperationGracefulPeriodMap 是一个cache, 有过期时间的"缓存"
return found
}

实际是尝试去emergencyOperationGracefulPeriodMap这个cache找有没有这个instance. 那么问题来了, 是谁在什么时候向这个cache放这个instance呢?
其实是在主库处于UnreachableMaster状态下, executeCheckAndRecoverFunction中调用runEmergentOperations时

1
2
3
4
5
6
7
8
9
// executeCheckAndRecoverFunction will choose the correct check & recovery function based on analysis.
// It executes the function synchronuously
func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
atomic.AddInt64(&countPendingRecoveries, 1)
defer atomic.AddInt64(&countPendingRecoveries, -1)

checkAndRecoverFunction, isActionableRecovery := getCheckAndRecoverFunction(analysisEntry.Analysis, &analysisEntry.AnalyzedInstanceKey) // 最初处于UnreachableMaster时, 这里拿到的是checkAndRecoverGenericProblem
analysisEntry.IsActionableRecovery = isActionableRecovery
runEmergentOperations(&analysisEntry)

可以看到, UnreachableMaster时, 会运行emergentlyReadTopologyInstance

1
2
3
4
5
6
func runEmergentOperations(analysisEntry *inst.ReplicationAnalysis) {
switch analysisEntry.Analysis {
...省略部分代码
case inst.UnreachableMaster: // 实际目的是强制重新读一下该实例和其所有从库的"信息"
go emergentlyReadTopologyInstance(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)
go emergentlyReadTopologyInstanceReplicas(&analysisEntry.AnalyzedInstanceKey, analysisEntry.Analysis)

而在emergentlyReadTopologyInstance中会先尝试向emergencyReadTopologyInstanceMap中Add
1
2
3
4
5
6
7
8
9
10
11
// Force a re-read of a topology instance; this is done because we need to substantiate a suspicion
// that we may have a failover scenario. we want to speed up reading the complete picture.
func emergentlyReadTopologyInstance(instanceKey *inst.InstanceKey, analysisCode inst.AnalysisCode) (instance *inst.Instance, err error) {
if existsInCacheError := emergencyReadTopologyInstanceMap.Add(instanceKey.StringCode(), true, cache.DefaultExpiration); existsInCacheError != nil {
// Just recently attempted
return nil, nil
}
instance, err = inst.ReadTopologyInstance(instanceKey) // 会调用 ReadTopologyInstanceBufferable
inst.AuditOperation("emergently-read-topology-instance", instanceKey, string(analysisCode))
return instance, err
}

ReadTopologyInstance 实际调用 ReadTopologyInstanceBufferable

这个cache是在logic包init时创建的

1
emergencyOperationGracefulPeriodMap = cache.New(time.Second*5, time.Millisecond*500) // 过期时间5s

isInEmergencyOperationGracefulPeriod 如果返回true, 表示尚处于EmergentOperations运行窗口期内, 所以需要等
若返回false, 表示已经可以开始”恢复”了, 就会返回checkAndRecoverDeadMaster, 并且isActionableRecovery也为true

executeCheckAndRecoverFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// It executes the function synchronuously
func executeCheckAndRecoverFunction(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
atomic.AddInt64(&countPendingRecoveries, 1)
defer atomic.AddInt64(&countPendingRecoveries, -1)

checkAndRecoverFunction, isActionableRecovery := getCheckAndRecoverFunction(analysisEntry.Analysis, &analysisEntry.AnalyzedInstanceKey)
analysisEntry.IsActionableRecovery = isActionableRecovery
runEmergentOperations(&analysisEntry)

...省略部分代码
// Initiate detection:
// 向backend db topology_failure_detection表insert ignore into一条数据
// 这里 skipProcesses = false, 所以会调用 OnFailureDetectionProcesses 钩子
// 到这里我才明白, 原来 skipProcesses 意思是 是否跳过 钩子的执行
registrationSuccess, _, err := checkAndExecuteFailureDetectionProcesses(analysisEntry, skipProcesses)
if registrationSuccess {
if orcraft.IsRaftEnabled() {
_, err := orcraft.PublishCommand("register-failure-detection", analysisEntry)
log.Errore(err)
}
}
...省略部分代码
// We don't mind whether detection really executed the processes or not
// (it may have been silenced due to previous detection). We only care there's no error.

// We're about to embark on recovery shortly...

// Check for recovery being disabled globally
// 看是否全局禁用了恢复, 如通过API disable-global-recoveries 去禁用全局恢复
if recoveryDisabledGlobally, err := IsRecoveryDisabled(); err != nil {
// Unexpected. Shouldn't get this
log.Errorf("Unable to determine if recovery is disabled globally: %v", err)
} else if recoveryDisabledGlobally {
// 这里checkAndRecover传的是false
if !forceInstanceRecovery { // 所以如果全局禁用了recover, recoverTick是不会进行恢复的, 因为forceInstanceRecovery也是false. 至此退出
log.Infof("CheckAndRecover: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+
"skipProcesses: %v: NOT Recovering host (disabled globally)",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses)

return false, nil, err
}
log.Infof("CheckAndRecover: Analysis: %+v, InstanceKey: %+v, candidateInstanceKey: %+v, "+
"skipProcesses: %v: recoveries disabled globally but forcing this recovery",
analysisEntry.Analysis, analysisEntry.AnalyzedInstanceKey, candidateInstanceKey, skipProcesses)
}

// 开始运行checkAndRecoverFunction. 本例中是运行checkAndRecoverDeadMaster
// 我们先不看checkAndRecoverDeadMaster具体干了啥, 先往下看
recoveryAttempted, topologyRecovery, err = checkAndRecoverFunction(analysisEntry, candidateInstanceKey, forceInstanceRecovery, skipProcesses) // 实参为analysisEntry, nil, false, false
if !recoveryAttempted {
return recoveryAttempted, topologyRecovery, err
}
if topologyRecovery == nil {
return recoveryAttempted, topologyRecovery, err
}
if b, err := json.Marshal(topologyRecovery); err == nil {
log.Infof("Topology recovery: %+v", string(b))
} else {
log.Infof("Topology recovery: %+v", *topologyRecovery)
}
// skipProcesses=false
if !skipProcesses {
// 没恢复成功, 调用 PostUnsuccessfulFailoverProcesses
if topologyRecovery.SuccessorKey == nil {
// Execute general unsuccessful post failover processes
executeProcesses(config.Config.PostUnsuccessfulFailoverProcesses, "PostUnsuccessfulFailoverProcesses", topologyRecovery, false)
} else {
// 否则调用 PostFailoverProcesses
// Execute general post failover processes
inst.EndDowntime(topologyRecovery.SuccessorKey)
executeProcesses(config.Config.PostFailoverProcesses, "PostFailoverProcesses", topologyRecovery, false)
}
}

...省略部分代码
// 代码只要能走到这里, 就带表恢复成功了? recoveryAttempted肯定是true
// 具体的恢复操作, 都在checkAndRecoverFunction里
return recoveryAttempted, topologyRecovery, err
}

对于本文场景, 当主库被认定处于DeadMaster状态后:
executeCheckAndRecoverFunction会先调用getCheckAndRecoverFunction获取:

  • checkAndRecoverFunction: checkAndRecoverDeadMaster
  • isActionableRecovery: true

然后检查是否”禁用”了全局故障恢复功能(如通过API disable-global-recoveries), 如果是, 则直接return

接下来, 开始执行checkAndRecoverFunction, 即checkAndRecoverDeadMaster

我们先不看checkAndRecoverDeadMaster具体逻辑, 继续往下看. 下面的代码就是根据checkAndRecoverFunction的执行情况(是否成功恢复), 调用对应的钩子, 最后将恢复结果return回去, 其实到这里, 本轮恢复就算做完了. 具体恢复是否成功, recoverTick也不关心. 具体如何恢复的, 还是要看checkAndRecoverFunction(本例中是checkAndRecoverDeadMaster)

关于钩子, 见:
OnFailureDetectionProcesses
PostUnsuccessfulFailoverProcesses
PostFailoverProcesses

checkAndRecoverDeadMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// checkAndRecoverDeadMaster checks a given analysis, decides whether to take action, and possibly takes action
// Returns true when action was taken.
func checkAndRecoverDeadMaster(analysisEntry inst.ReplicationAnalysis, candidateInstanceKey *inst.InstanceKey, forceInstanceRecovery bool, skipProcesses bool) (recoveryAttempted bool, topologyRecovery *TopologyRecovery, err error) {
// forceInstanceRecovery=false
// HasAutomatedMasterRecovery 是在GetReplicationAnalysis中会运行 a.ClusterDetails.ReadRecoveryInfo()
// ReadRecoveryInfo中会执行 this.HasAutomatedMasterRecovery = this.filtersMatchCluster(config.Config.RecoverMasterClusterFilters)
// RecoverMasterClusterFilters 我们配置的是 .* 所以这里为true
// false || true = true
// 所以这对于recoverTick来说, 目的是检查一下RecoverMasterClusterFilters配置, 看这个集群到底配没配自动故障恢复
if !(forceInstanceRecovery || analysisEntry.ClusterDetails.HasAutomatedMasterRecovery) {
return false, nil, nil
}
// 实参 &analysisEntry, true, true
// 尝试注册一个恢复条目, 如果失败, 意味着 recovery is already in place.

// 让我们检查一下这个实例是否最近刚刚被提升或集群刚刚经历failover,并且仍然处于活动期。如果是这样,我们就拒绝恢复注册,to avoid flapping.
// 就是说刚Failover没多久又Failover不行, 类似MHA 8小时限制
// 时间由RecoveryPeriodBlockMinutes控制, 默认1小时. 也是在recoveryTick启动协程是会去清理in_active_period标记(update为0)
// 如果一切没问题, 会向数据库topology_recovery插入一条记录, 并new一个topologyRecovery结构体返回.
// 否则 topologyRecovery 是 nil
topologyRecovery, err = AttemptRecoveryRegistration(&analysisEntry, !forceInstanceRecovery, !forceInstanceRecovery)
if topologyRecovery == nil { // 如果 topologyRecovery = nil, 说明最近刚恢复完, 或者正在恢复, 直接return
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("found an active or recent recovery on %+v. Will not issue another RecoverDeadMaster.", analysisEntry.AnalyzedInstanceKey))
return false, nil, err
}

// That's it! We must do recovery!
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("will handle DeadMaster event on %+v", analysisEntry.ClusterDetails.ClusterName))
recoverDeadMasterCounter.Inc(1)

// 正式开始恢复
// 实参为 topologyRecovery, nil, false
recoveryAttempted, promotedReplica, lostReplicas, err := recoverDeadMaster(topologyRecovery, candidateInstanceKey, skipProcesses)
...省略部分代码
topologyRecovery.LostReplicas.AddInstances(lostReplicas) // lostReplicas 由于种种原因, 没有成功change到new master的从库
if !recoveryAttempted {
return false, topologyRecovery, err
}

// 代码运行到这里时, 可能已经选举出了new master, 即promotedReplica
// 这个函数补充了一些判断条件, 判断这个promotedReplica是不是可以做new master
// 主要就是这几个参数:
// - PreventCrossDataCenterMasterFailover
// - PreventCrossRegionMasterFailover
// - FailMasterPromotionOnLagMinutes
// - FailMasterPromotionIfSQLThreadNotUpToDate
// - DelayMasterPromotionIfSQLThreadNotUpToDate
overrideMasterPromotion := func() (*inst.Instance, error) {
if promotedReplica == nil {
// No promotion; nothing to override.
return promotedReplica, err
}
// Scenarios where we might cancel the promotion.
// 这个函数就是通过以下参数:
// - PreventCrossDataCenterMasterFailover
// - PreventCrossRegionMasterFailover
// 判断 promotedReplica 是否可以做 new master
// PreventCrossDataCenterMasterFailover:
// 默认false . 当为true 时, orchestrator将只用与故障集群主库位于同一DC的从库替换故障的主库. 它将尽最大努力从同一DC中找到一个替代者, 如果找不到, 将中止(失败)故障转移.
// PreventCrossRegionMasterFailover:
// 默认false . 当为true 时, orchestrator将只用与故障集群主库位于同一region的从库替换故障的主库. 它将尽最大努力找到同一region的替代者, 如果找不到, 将中止(失败)故障转移.
if satisfied, reason := MasterFailoverGeographicConstraintSatisfied(&analysisEntry, promotedReplica); !satisfied {
return nil, fmt.Errorf("RecoverDeadMaster: failed %+v promotion; %s", promotedReplica.Key, reason)
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: promoted replica lag seconds: %+v", promotedReplica.ReplicationLagSeconds.Int64))
if config.Config.FailMasterPromotionOnLagMinutes > 0 &&
time.Duration(promotedReplica.ReplicationLagSeconds.Int64)*time.Second >= time.Duration(config.Config.FailMasterPromotionOnLagMinutes)*time.Minute {
// candidate replica lags too much
return nil, fmt.Errorf("RecoverDeadMaster: failed promotion. FailMasterPromotionOnLagMinutes is set to %d (minutes) and promoted replica %+v 's lag is %d (seconds)", config.Config.FailMasterPromotionOnLagMinutes, promotedReplica.Key, promotedReplica.ReplicationLagSeconds.Int64)
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: promoted replica sql thread up-to-date: %+v", promotedReplica.SQLThreadUpToDate()))
if config.Config.FailMasterPromotionIfSQLThreadNotUpToDate && !promotedReplica.SQLThreadUpToDate() {
return nil, fmt.Errorf("RecoverDeadMaster: failed promotion. FailMasterPromotionIfSQLThreadNotUpToDate is set and promoted replica %+v 's sql thread is not up to date (relay logs still unapplied). Aborting promotion", promotedReplica.Key)
}
if config.Config.DelayMasterPromotionIfSQLThreadNotUpToDate && !promotedReplica.SQLThreadUpToDate() {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("DelayMasterPromotionIfSQLThreadNotUpToDate: waiting for SQL thread on %+v", promotedReplica.Key))
if _, err := inst.WaitForSQLThreadUpToDate(&promotedReplica.Key, 0, 0); err != nil {
return nil, fmt.Errorf("DelayMasterPromotionIfSQLThreadNotUpToDate error: %+v", err)
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("DelayMasterPromotionIfSQLThreadNotUpToDate: SQL thread caught up on %+v", promotedReplica.Key))
}
// All seems well. No override done.
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: found no reason to override promotion of %+v", promotedReplica.Key))
return promotedReplica, err
}
// 运行 overrideMasterPromotion
if promotedReplica, err = overrideMasterPromotion(); err != nil {
AuditTopologyRecovery(topologyRecovery, err.Error())
}
// And this is the end; whether successful or not, we're done.
resolveRecovery(topologyRecovery, promotedReplica)
// Now, see whether we are successful or not. From this point there's no going back.

if promotedReplica != nil { // 成功选举除了新主库
// Success!
recoverDeadMasterSuccessCounter.Inc(1)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: successfully promoted %+v", promotedReplica.Key))
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: promoted server coordinates: %+v", promotedReplica.SelfBinlogCoordinates))

// 当为true 时, orchestrator 将在选举出的新主库上执行reset slave all 和set read_only=0 . 默认: true . 当该参数为true 时, 将覆盖MasterFailoverDetachSlaveMasterHost .
// 所以这个if列操作就是执行在新主库执行 reset slave all 和set read_only=0
if config.Config.ApplyMySQLPromotionAfterMasterFailover || analysisEntry.CommandHint == inst.GracefulMasterTakeoverCommandHint {
// on GracefulMasterTakeoverCommandHint it makes utter sense to RESET SLAVE ALL and read_only=0, and there is no sense in not doing so.
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: will apply MySQL changes to promoted master"))
{
_, err := inst.ResetReplicationOperation(&promotedReplica.Key)
if err != nil {
// Ugly, but this is important. Let's give it another try
_, err = inst.ResetReplicationOperation(&promotedReplica.Key)
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying RESET SLAVE ALL on promoted master: success=%t", (err == nil)))
if err != nil {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: NOTE that %+v is promoted even though SHOW SLAVE STATUS may still show it has a master", promotedReplica.Key))
}
}
{
_, err := inst.SetReadOnly(&promotedReplica.Key, false)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=0 on promoted master: success=%t", (err == nil)))
}
// Let's attempt, though we won't necessarily succeed, to set old master as read-only
go func() {
// 并且尝试给旧主库打开只读, 成功与否无所
_, err := inst.SetReadOnly(&analysisEntry.AnalyzedInstanceKey, true)
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: applying read-only=1 on demoted master: success=%t", (err == nil)))
}()
}

...省略部分代码
// 当该参数为true 时, orchestrator将对被选举为新主库的节点执行detach-replica-master-host(这确保了即使旧主库"复活了", 新主库也不会试图从旧主库复制数据). 默认值: false. 如果ApplyMySQLPromotionAfterMasterFailover为真,这个参数将失去意义.
if config.Config.MasterFailoverDetachReplicaMasterHost {
postponedFunction := func() error {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("- RecoverDeadMaster: detaching master host on promoted master"))
// 所以说 如果 ApplyMySQLPromotionAfterMasterFailover=true, 就已经完成了reset slave all了.
// 那么DetachReplicaMasterHost也就失去意义了
inst.DetachReplicaMasterHost(&promotedReplica.Key)
return nil
}
topologyRecovery.AddPostponedFunction(postponedFunction, fmt.Sprintf("RecoverDeadMaster, detaching promoted master host %+v", promotedReplica.Key))
}
...省略部分代码

if !skipProcesses {
// Execute post master-failover processes
// 执行钩子 PostMasterFailoverProcesses
executeProcesses(config.Config.PostMasterFailoverProcesses, "PostMasterFailoverProcesses", topologyRecovery, false)
}
} else {
// 否则, 说明恢复失败了
recoverDeadMasterFailureCounter.Inc(1)
}

return true, topologyRecovery, err
}

checkAndRecoverDeadMaster 首先通过RecoverMasterClusterFilters再次判断这个实例是否可以进行自动故障恢复.
接着, 调用AttemptRecoveryRegistration检查这个实例是否最近刚刚被提升或该实例所处集群刚刚经历failover, 并且仍然处于活动期.

就是说刚Failover没多久又Failover不行, 类似MHA 8小时限制

活动期的时间由RecoveryPeriodBlockMinutes控制, 默认1小时. 也是在recoveryTick启动协程时会去清理in_active_period标记(update为0)
如果一切没问题, 会向数据库topology_recovery插入一条记录, 并new一个topologyRecovery结构体返回. 否则 topologyRecovery 是 nil

以上检查都没问题的话, 就正式开始执行恢复, 调用recoverDeadMaster.

我们先不看recoverDeadMaster代码, 继续往后看.

recoverDeadMaster最重要的是会返回promotedReplica, 即选举出来的新主库
代码运行到这里时, 可能已经选举出了new master, 即promotedReplica. 随后会运行overrideMasterPromotion函数
这个函数补充了一些判断条件, 判断这个promotedReplica是不是可以做new master
主要就是这几个参数:

以上任意一个判断有问题, 都会终止后续恢复动作, 直接return

最后, checkAndRecoverDeadMaster会对new master和old master做一些收尾工作, 如:

至此, 恢复成功完成.
具体的恢复逻辑, 还要看recoverDeadMaster

recoverDeadMaster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// recoverDeadMaster recovers a dead master, complete logic inside
func recoverDeadMaster(topologyRecovery *TopologyRecovery, candidateInstanceKey *inst.InstanceKey, skipProcesses bool) (recoveryAttempted bool, promotedReplica *inst.Instance, lostReplicas [](*inst.Instance), err error) {

// 传进来的实参, topologyRecovery, nil, false

topologyRecovery.Type = MasterRecovery
analysisEntry := &topologyRecovery.AnalysisEntry
failedInstanceKey := &analysisEntry.AnalyzedInstanceKey
var cannotReplicateReplicas [](*inst.Instance)
postponedAll := false

inst.AuditOperation("recover-dead-master", failedInstanceKey, "problem found; will recover")
if !skipProcesses { // 执行钩子
if err := executeProcesses(config.Config.PreFailoverProcesses, "PreFailoverProcesses", topologyRecovery, true); err != nil {
return false, nil, lostReplicas, topologyRecovery.AddError(err)
}
}
...省略部分代码
// topologyRecovery.RecoveryType = MasterRecoveryGTID
topologyRecovery.RecoveryType = GetMasterRecoveryType(analysisEntry)
...省略部分代码

// 这个函数判断GetCandidateReplica返回的candidate是不是"理想的"
// 如果是"理想的" 一些从库的恢复操作可以异步推迟执行
// 否则要同步执行
promotedReplicaIsIdeal := func(promoted *inst.Instance, hasBestPromotionRule bool) bool {
if promoted == nil {
return false
}
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: promotedReplicaIsIdeal(%+v)", promoted.Key))

// candidateInstanceKey实参是nil, 所以走不到这个if
if candidateInstanceKey != nil { //explicit request to promote a specific server
return promoted.Key.Equals(candidateInstanceKey)
}
if promoted.DataCenter == topologyRecovery.AnalysisEntry.AnalyzedInstanceDataCenter &&
promoted.PhysicalEnvironment == topologyRecovery.AnalysisEntry.AnalyzedInstancePhysicalEnvironment {
if promoted.PromotionRule == inst.MustPromoteRule || promoted.PromotionRule == inst.PreferPromoteRule ||
(hasBestPromotionRule && promoted.PromotionRule != inst.MustNotPromoteRule) {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: found %+v to be ideal candidate; will optimize recovery", promoted.Key))
postponedAll = true
return true
}
}
return false
}
switch topologyRecovery.RecoveryType {
case MasterRecoveryGTID:
{
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: regrouping replicas via GTID"))
// promotedReplica 有可能==nil
lostReplicas, _, cannotReplicateReplicas, promotedReplica, err = inst.RegroupReplicasGTID(failedInstanceKey, true, false, nil, &topologyRecovery.PostponedFunctionsContainer, promotedReplicaIsIdeal)
}
// case MasterRecoveryPseudoGTID:
// ...省略部分代码
// case MasterRecoveryBinlogServer:
// ...省略部分代码
}
topologyRecovery.AddError(err)
lostReplicas = append(lostReplicas, cannotReplicateReplicas...)
...省略部分代码

if promotedReplica != nil && len(lostReplicas) > 0 && config.Config.DetachLostReplicasAfterMasterFailover {
postponedFunction := func() error {
AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: lost %+v replicas during recovery process; detaching them", len(lostReplicas)))
for _, replica := range lostReplicas {
replica := replica
inst.DetachReplicaMasterHost(&replica.Key)
}
return nil
}
// 对lostReplicas并发执行 DetachReplicaMasterHost
topologyRecovery.AddPostponedFunction(postponedFunction, fmt.Sprintf("RecoverDeadMaster, detach %+v lost replicas", len(lostReplicas)))
}

...省略部分代码

AuditTopologyRecovery(topologyRecovery, fmt.Sprintf("RecoverDeadMaster: %d postponed functions", topologyRecovery.PostponedFunctionsContainer.Len()))

if promotedReplica != nil && !postponedAll { // 有候选人, 并且不是理想的候选人
// 这里的意思是, 我们一开始选了一个new master, 可能因为他有最全的日志, 但是, 他不是我们设置的prefer的实例, 那么我们就重新组织一下拓扑, 把prefer的再变成new master
promotedReplica, err = replacePromotedReplicaWithCandidate(topologyRecovery, &analysisEntry.AnalyzedInstanceKey, promotedReplica, candidateInstanceKey)
topologyRecovery.AddError(err)
}

if promotedReplica == nil {
message := "Failure: no replica promoted."
AuditTopologyRecovery(topologyRecovery, message)
inst.AuditOperation("recover-dead-master", failedInstanceKey, message)
} else {
message := fmt.Sprintf("promoted replica: %+v", promotedReplica.Key)
AuditTopologyRecovery(topologyRecovery, message)
inst.AuditOperation("recover-dead-master", failedInstanceKey, message)
}
return true, promotedReplica, lostReplicas, err
}

recoverDeadMaster 主要做了几件事 ^a0c808

  1. GetMasterRecoveryType, 确定到底用什么方式恢复, 是基于GTID? PseudoGTID? 还是BinlogServer?
  2. 重组拓扑, 我们的案例是使用RegroupReplicasGTID.
    但这里有一个问题, 可能现在我们的新主库并不是我们”期望”的实例, 就是说之所以选他做主库可能是因为他有最全的日志. 但不是我们设置的prefer的
    所以通过一个闭包promotedReplicaIsIdeal去做了判断和标记(通过postponedAll)
    如果postponedAll=false, 则需要重组拓扑, 选择prefer的replica作为新主库

    postponedAll=true表是candidate就是”理想型”, 所有replica恢复可以并行异步执行

  3. 如果存在lostReplicas, 并且开启了DetachLostReplicasAfterMasterFailover, 那么会并行的对所有lostReplicas执行DetachReplicaMasterHost

    其实就是执行change master to master_host=’// {host}’

  4. 如果当前选举的new master不是我们prefer的实例, 重组拓扑, 用prefer做新主库

recoverDeadMaster还是干了挺多事儿的, 尤其是RegroupReplicasGTID中的逻辑还是很复杂的. 本文就先不继续展开了, 要知后事如何, 请看Orchestrator Failover过程源码分析-III

初步流程总结

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2022 Fan() All Rights Reserved.

访客数 : | 访问量 :