Orchestrator Discover源码分析

Orchestrator Discover源码分析

在梳理HostnameResolveMethodMySQLHostnameResolveMethod 两个参数时我产生了一些迷惑. 所以深入看了下orchestrator源码, 再此记录下.

orchestrator-client

阅读orchestrator-client可以发现, 我们可以通过运行以下命令做”服务发现”

1
2
3
orchestrator-client -c discover -i 172.16.120.10:3306

假设 172.16.120.10 主机名为 centos-1

orchestrator-client是一个脚本, 用方便的命令行界面包装API调用.

它可以自动确定orchestrator集群的leader, 并在这种情况下将所有请求转发给leader.

它非常接近于orchestrator command line interface.

orchestrator-client -help 有bug, 已提交PR.

orchestrator-client help信息也没有介绍-i参数

查看orchestrator-client源码

1
2
3
4
5
6
while getopts "c:i:d:s:a:D:U:o:r:u:R:t:l:H:P:q:b:e:n:S:h" OPTION
do
case $OPTION in
h) command="help" ;;
c) command="$OPTARG" ;;
i) instance="$OPTARG" ;;

可以看出-i 的值给了instance变量. 在main行数中会先处理instance

1
2
3
4
5
6
7
8
9
function main {
check_requirements
detect_leader_api

instance_hostport=$(to_hostport $instance)
destination_hostport=$(to_hostport $destination)

run_command
}

to_hostport是一个函数. 可以看出 -i 可接受hostname:port / ip:port / hostname / ip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# to_hostport transforms:
# - fqdn:port => fqdn/port
# - fqdn => fqdn/default_port
function to_hostport {
instance_key="$1"

if [ -z "$instance_key" ] ; then # 如果 instance_key 为空
echo ""
return
fi

if [[ $instance_key == *":"* ]]; then # 如果-i中包含 ':'
echo $instance_key | tr ':' '/' # 将引号替换为 '/'
else # 否则, 认为只指定了 hostname/ip, 那么端口用default_port. default_port在脚本开头定义了, 就是3306
echo "$instance_key/$default_port"
fi
}

run_command会实际根据命令行传参, 调用对应的函数

1
2
3
4
5
6
7
8
9
function run_command {
if [ -z "$command" ] ; then
fail "No command given. Use $myname -c <command> [...] or $myname --command <command> [...] to do something useful"
fi
command=$(echo $command | universal_sed -e 's/slave/replica/')
case $command in
"help") prompt_help ;; # Show available commands
...
"discover") discover ;; # Lookup an instance, investigate it

discover函数

1
2
3
4
5
function discover {
assert_nonempty "instance" "$instance_hostport"
api "discover/$instance_hostport"
print_details | filter_key | print_key
}

这里api其实也是个函数, 就不展开看了. 其本质最终是执行curl命令

1
curl ${curl_auth_params} -s "/api/discover/$instance_hostport" | jq '.' 

其实就是调用http接口

orchestrator discover接口

^99dde2

在 go/http/api.go 中定义了orchestrator提供的接口. 其中, 可以找到discover对应的路由信息

1
this.registerAPIRequest(m, "discover/:host/:port", this.Discover)

那接下来重点, 就是看Discover方法了

Discover

以下内中↓↓↓ 表示下钻进对应函数中

这里一点一点看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Discover issues a synchronous read on an instance
func (this *HttpAPI) Discover(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
instanceKey, err := this.getInstanceKey(params["host"], params["port"])
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.ReadTopologyInstance(&instanceKey)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

if orcraft.IsRaftEnabled() {
orcraft.PublishCommand("discover", instanceKey)
} else {
logic.DiscoverInstance(instanceKey)
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Instance discovered: %+v", instance.Key), Details: instance})
}

看第7行

->getInstanceKey

1
2
3
4
5
6
7
8
instanceKey, err := this.getInstanceKey(params["host"], params["port"])


↓↓↓ getInstanceKey
func (this *HttpAPI) getInstanceKey(host string, port string) (inst.InstanceKey, error) {
// host, port就是 orchestrator-client -c discover -i 172.16.120.10:3306 中的 -i
return this.getInstanceKeyInternal(host, port, true)
}
getInstanceKeyInternal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (this *HttpAPI) getInstanceKeyInternal(host string, port string, resolve bool) (inst.InstanceKey, error) {
var instanceKey *inst.InstanceKey
var err error
if resolve { // getInstanceKey传的resolve是true
// 所以走这里
instanceKey, err = inst.NewResolveInstanceKeyStrings(host, port)
} else {
instanceKey, err = inst.NewRawInstanceKeyStrings(host, port)
}
if err != nil {
return emptyInstanceKey, err
}
instanceKey, err = inst.FigureInstanceKey(instanceKey, nil)
if err != nil {
return emptyInstanceKey, err
}
if instanceKey == nil {
return emptyInstanceKey, fmt.Errorf("Unexpected nil instanceKey in getInstanceKeyInternal(%+v, %+v, %+v)", host, port, resolve)
}
return *instanceKey, nil
}
->NewResolveInstanceKeyStrings
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// NewResolveInstanceKeyStrings creates and resolves a new instance key based on string params
func NewResolveInstanceKeyStrings(hostname string, port string) (*InstanceKey, error) {
return newInstanceKeyStrings(hostname, port, true)
}

↓↓↓ newInstanceKeyStrings

// newInstanceKeyStrings
func newInstanceKeyStrings(hostname string, port string, resolve bool) (*InstanceKey, error) {
if portInt, err := strconv.Atoi(port); err != nil { // 将port字符串转int
return nil, fmt.Errorf("Invalid port: %s", port)
} else { // 转换成功, 走这里
return newInstanceKey(hostname, portInt, resolve)
}
}

↓↓↓ newInstanceKey

func newInstanceKey(hostname string, port int, resolve bool) (instanceKey *InstanceKey, err error) {
if hostname == "" {
return instanceKey, fmt.Errorf("NewResolveInstanceKey: Empty hostname")
}

instanceKey = &InstanceKey{Hostname: hostname, Port: port} // 构造一个instanceKey
if resolve { // 传的是true, 所以走这里
instanceKey, err = instanceKey.ResolveHostname()
}
return instanceKey, err
}

↓↓↓ instanceKey.ResolveHostname

func (this *InstanceKey) ResolveHostname() (*InstanceKey, error) {
if !this.IsValid() { // 进行一些校验, 需满足: Hostname!="_" Hostname不以"//"开头, Hostname长度>0, port>0
return this, nil
}

hostname, err := ResolveHostname(this.Hostname) // 解析主机名, 解析后重新赋值给instanceKey.Hostname
if err == nil {
this.Hostname = hostname
}
return this, err
}

####### ResolveHostname
这个函数代码较多, 主要是这段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
resolvedHostname, err := resolveHostname(hostname)

↓↓↓ resolveHostname

func resolveHostname(hostname string) (string, error) {
switch strings.ToLower(config.Config.HostnameResolveMethod) {
case "none":
return hostname, nil
case "default":
return hostname, nil
case "cname":
return GetCNAME(hostname)
case "ip":
return getHostnameIP(hostname)
}
return hostname, nil
}

这里就是根据 HostnameResolveMethod参数进行解析了

  • none, default: 什么也不处理, 直接原样返回
  • cname: 取CNAME
  • ip: 根据hostname取ip

getHostnameIP 实际会调用 net.LookupIP

LookupIP looks up host using the local resolver. It returns a slice of that host’s IPv4 and IPv6 addresses.

简单看一下net.LookupIP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    // /etc/host
// 172.16.201.206 namenode-1
// 172.16.201.220 namenode-2

fmt.Println(net.LookupIP("namenode-1"))
fmt.Println(net.LookupIP("namenode-8"))
// ping 172.16.120.18
// PING 172.16.120.18 (172.16.120.18): 56 data bytes
// Request timeout for icmp_seq 0
fmt.Println(net.LookupIP("172.16.120.18")) // 不存在的一个ip, 也ping不通
fmt.Println(net.LookupIP("192.168.124.130"))

output=>
[172.16.201.206] <nil>
[] lookup namenode-8: no such host
[172.16.120.18] <nil>
[192.168.124.130] <nil>

所以说, getHostnameIP, 你给他传ip, 返回的还是ip. 给他传主机名, 就要看你有没有配置解析了

<-NewResolveInstanceKeyStrings

所以NewResolveInstanceKeyStrings就是根据HostnameResolveMethod将传进来的host 解析了一下. 以HostnameResolveMethod默认值default为例, 就是原样返回. instanceKey.Hostname就是-i host:port 中的host. 这个host可以是ip也可以是主机名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (this *HttpAPI) getInstanceKeyInternal(host string, port string, resolve bool) (inst.InstanceKey, error) {
var instanceKey *inst.InstanceKey
var err error
if resolve { // getInstanceKey传的resolve是true
// 所以走这里
instanceKey, err = inst.NewResolveInstanceKeyStrings(host, port)
} else {
instanceKey, err = inst.NewRawInstanceKeyStrings(host, port)
}
if err != nil {
return emptyInstanceKey, err
}

// 从这里继续
// FigureInstanceKey是尝试去backend db模糊匹配 '%host%', port, 如果取到了, 就从backend db把这个instanceKey取出来(还包含ServerID, ServerUUID, Binlog等信息)
// 如果instanceKey.Hostname是 ip, 就不模糊匹配了, 直接原样返回
// 如果模糊匹配到多行, 也是原因返回, 只有模糊匹配到1行才从数据库取instanceKey信息返回 instanceKey
instanceKey, err = inst.FigureInstanceKey(instanceKey, nil)
if err != nil {
return emptyInstanceKey, err
}
if instanceKey == nil {
return emptyInstanceKey, fmt.Errorf("Unexpected nil instanceKey in getInstanceKeyInternal(%+v, %+v, %+v)", host, port, resolve)
}
return *instanceKey, nil
}

<-getInstanceKey

继续往下读Discover

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Discover issues a synchronous read on an instance
func (this *HttpAPI) Discover(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
instanceKey, err := this.getInstanceKey(params["host"], params["port"])
// 从这继续, 上面简单来说就是解析了下 host
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.ReadTopologyInstance(&instanceKey)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

if orcraft.IsRaftEnabled() {
orcraft.PublishCommand("discover", instanceKey)
} else {
logic.DiscoverInstance(instanceKey)
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Instance discovered: %+v", instance.Key), Details: instance})
}

->ReadTopologyInstance

这是重点, 开始”发现” 拓扑结构

1
2
3
4
5
6
7
8
9
10
11
12
instance, err := inst.ReadTopologyInstance(&instanceKey)

↓↓↓ inst.ReadTopologyInstance

// ReadTopologyInstance collects information on the state of a MySQL
// server and writes the result synchronously to the orchestrator
// backend.
func ReadTopologyInstance(instanceKey *InstanceKey) (*Instance, error) {
return ReadTopologyInstanceBufferable(instanceKey, false, nil) // 这里bufferWrites=false
}

↓↓↓ ReadTopologyInstanceBufferable

-> ReadTopologyInstanceBufferable

就是读拓扑节点, 读出这个节点的信息, 和他有哪些从库以及他的主库. 然后将一些信息写入backend db, 还会缓存一些信息到”buffer”, 下次读可以直接从”buffer”读

这个函数很长, 这里只摘部分代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// ReadTopologyInstanceBufferable connects to a topology MySQL instance
// and collects information on the server and its replication state.
// It writes the information retrieved into orchestrator's backend.
// - writes are optionally buffered.
// - timing information can be collected for the stages performed.
func ReadTopologyInstanceBufferable(instanceKey *InstanceKey, bufferWrites bool, latency *stopwatch.NamedStopwatch) (inst *Instance, err error) {
// bufferWrites传进来的是false

// time.AfterFunc函数作用
// AfterFunc waits for the duration to elapse and then calls f
// in its own goroutine. It returns a Timer that can
// be used to cancel the call using its Stop method.
// 所以这里就是1秒后调用UpdateInstanceLastAttemptedCheck, 这个函数更新指定instanceKey database_instance表中的last_attempted_check字段为当前时间
lastAttemptedCheckTimer := time.AfterFunc(time.Second, func() {
go UpdateInstanceLastAttemptedCheck(instanceKey)
})


... 省略部分不重要代码
db, err := db.OpenDiscovery(instanceKey.Hostname, instanceKey.Port)
// 连到指定的实例
if err != nil {
goto Cleanup // 注意这里很重要, 如果连接数据库报错, 会走goto
}


instance.Key = *instanceKey
... 省略部分不重要代码

var mysqlHostname, mysqlReportHost string
err = db.QueryRow("select @@global.hostname, ifnull(@@global.report_host, ''), @@global.server_id, @@global.version, @@global.version_comment, @@global.read_only, @@global.binlog_format, @@global.log_bin, @@global.log_slave_updates").Scan(
&mysqlHostname, &mysqlReportHost, &instance.ServerID, &instance.Version, &instance.VersionComment, &instance.ReadOnly, &instance.Binlog_format, &instance.LogBinEnabled, &instance.LogReplicationUpdatesEnabled)
if err != nil {
goto Cleanup
}
partialSuccess = true // We at least managed to read something from the server.
// 上面通过 select @@global.hostname 取到了主机名
// 根据MySQLHostnameResolveMethod 来取resolvedHostname
switch strings.ToLower(config.Config.MySQLHostnameResolveMethod) {
case "none":
resolvedHostname = instance.Key.Hostname
case "default", "hostname", "@@hostname": // MySQLHostnameResolveMethod 默认值 就是 @@hostname. 以此为例 resolvedHostname就是select @@global.hostname取到的主机名
resolvedHostname = mysqlHostname
case "report_host", "@@report_host":
if mysqlReportHost == "" {
err = fmt.Errorf("MySQLHostnameResolveMethod configured to use @@report_host but %+v has NULL/empty @@report_host", instanceKey)
goto Cleanup
}
resolvedHostname = mysqlReportHost
default:
resolvedHostname = instance.Key.Hostname
}

... 省略部分不重要代码
// 以我们的例子 resolvedHostname是主机名: centos-1
// instance.Key.Hostname 是 172.16.120.10
if resolvedHostname != instance.Key.Hostname {
latency.Start("backend")
UpdateResolvedHostname(instance.Key.Hostname, resolvedHostname)
// UpdateResolvedHostname 将 hostname, resolved_hostname 记录到backend db
// _, err := db.ExecOrchestrator(`
// insert into
// hostname_resolve (hostname, resolved_hostname, resolved_timestamp)
// values
// (?, ?, NOW())
// on duplicate key update
// resolved_hostname = VALUES(resolved_hostname),
// resolved_timestamp = VALUES(resolved_timestamp)
// `,
// hostname,
// resolvedHostname)
latency.Stop("backend")
instance.Key.Hostname = resolvedHostname // 并将instance.Key.Hostname 从172.16.120.10 改成了 centos-1
}
if instance.Key.Hostname == "" {
err = fmt.Errorf("ReadTopologyInstance: empty hostname (%+v). Bailing out", *instanceKey)
goto Cleanup
}
go ResolveHostnameIPs(instance.Key.Hostname) // 这里有尝试取ip, 这里以我们的例子instance.Key.Hostname已经是centos-1了. 如果orchestrator所在服务器没配置主机名解析(如/etc/hosts), 那么是取不到的
// 如果能取到, 会往 insert into hostname_ips (hostname, ipv4, ipv6, last_updated) ... ODKU ...

... 省略部分不重要代码

// 下面这段就是取了 172.16.120.10:3306 的master主库
// 注意这里通过show slave status 查主库. show slave status多源复制是可能返回多行的. orchestrator官方文档说了, 不支持多源复制, 所以这里instance.MasterKey只是一个结构体, 不是切片
err = sqlutils.QueryRowsMap(db, "show slave status", func(m sqlutils.RowMap) error {
...
masterHostname := m.GetString("Master_Host")
if isMaxScale110 {
// Buggy buggy maxscale 1.1.0. Reported Master_Host can be corrupted.
// Therefore we (currently) take @@hostname (which is masquarading as master host anyhow)
masterHostname = maxScaleMasterHostname
}
masterKey, err := NewResolveInstanceKey(masterHostname, m.GetInt("Master_Port"))
if err != nil {
logReadTopologyInstanceError(instanceKey, "NewResolveInstanceKey", err)
}
masterKey.Hostname, resolveErr = ResolveHostname(masterKey.Hostname)
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, fmt.Sprintf("ResolveHostname(%q)", masterKey.Hostname), resolveErr)
}
instance.MasterKey = *masterKey

... 省略部分不重要代码
instanceFound = true

// -------------------------------------------------------------------------
// Anything after this point does not affect the fact the instance is found.
// No `goto Cleanup` after this point.
// -------------------------------------------------------------------------

// Get replicas, either by SHOW SLAVE HOSTS or via PROCESSLIST
// MaxScale does not support PROCESSLIST, so SHOW SLAVE HOSTS is the only option

// 下面这一大段代码, 就是取 172.16.120.10:3306的从库, 如果有从库, 就放到 instance.Replicas里

// Get replicas, either by SHOW SLAVE HOSTS or via PROCESSLIST
// MaxScale does not support PROCESSLIST, so SHOW SLAVE HOSTS is the only option
if config.Config.DiscoverByShowSlaveHosts || isMaxScale {
err := sqlutils.QueryRowsMap(db, `show slave hosts`,
func(m sqlutils.RowMap) error {
// MaxScale 1.1 may trigger an error with this command, but
// also we may see issues if anything on the MySQL server locks up.
// Consequently it's important to validate the values received look
// good prior to calling ResolveHostname()
host := m.GetString("Host")
port := m.GetIntD("Port", 0)
if host == "" || port == 0 {
if isMaxScale && host == "" && port == 0 {
// MaxScale reports a bad response sometimes so ignore it.
// - seen in 1.1.0 and 1.4.3.4
return nil
}
// otherwise report the error to the caller
return fmt.Errorf("ReadTopologyInstance(%+v) 'show slave hosts' returned row with <host,port>: <%v,%v>", instanceKey, host, port)
}

replicaKey, err := NewResolveInstanceKey(host, port)
if err == nil && replicaKey.IsValid() {
if !FiltersMatchInstanceKey(replicaKey, config.Config.DiscoveryIgnoreReplicaHostnameFilters) {
instance.AddReplicaKey(replicaKey)
}
foundByShowSlaveHosts = true
}
return err
})

logReadTopologyInstanceError(instanceKey, "show slave hosts", err)
}
if !foundByShowSlaveHosts && !isMaxScale {
// Either not configured to read SHOW SLAVE HOSTS or nothing was there.
// Discover by information_schema.processlist
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := sqlutils.QueryRowsMap(db, `
select
substring_index(host, ':', 1) as slave_hostname
from
information_schema.processlist
where
command IN ('Binlog Dump', 'Binlog Dump GTID')
`,
func(m sqlutils.RowMap) error {
cname, resolveErr := ResolveHostname(m.GetString("slave_hostname"))
if resolveErr != nil {
logReadTopologyInstanceError(instanceKey, "ResolveHostname: processlist", resolveErr)
}
replicaKey := InstanceKey{Hostname: cname, Port: instance.Key.Port}
if !FiltersMatchInstanceKey(&replicaKey, config.Config.DiscoveryIgnoreReplicaHostnameFilters) {
instance.AddReplicaKey(&replicaKey)
}
return err
})

logReadTopologyInstanceError(instanceKey, "processlist", err)
}()
}
... 省略部分不重要代码

Cleanup: // 注意Cleanup的位置 是在 设置 instanceFound = true 之后, 所以一但 db, err := db.OpenDiscovery(instanceKey.Hostname, instanceKey.Port)报错, 就跳转到这里, 于是instanceFound还是false
xxxx

... 省略部分不重要代码

if instanceFound {
instance.LastDiscoveryLatency = time.Since(readingStartTime)
instance.IsLastCheckValid = true
instance.IsRecentlyChecked = true
instance.IsUpToDate = true
latency.Start("backend")
if bufferWrites { // bufferWrites传进来的是false
enqueueInstanceWrite(instance, instanceFound, err)
} else {
WriteInstance(instance, instanceFound, err) // 这里最终是把获取的instance各种信息写入 database_instance表, 以及很重要的, 更新 last_check=Now() last_seen=Now() last_attempted_check=Now() last_check_parital_success=1
} lastAttemptedCheckTimer.Stop()
latency.Stop("backend")
return instance, nil
}

// Something is wrong, could be network-wise. Record that we
// tried to check the instance. last_attempted_check is also
// updated on success by writeInstance.
latency.Start("backend")
_ = UpdateInstanceLastChecked(&instance.Key, partialSuccess) // 连接数据库失败, 或执行一些查询如show slave status失败, 会Goto到Cleanup, 此时instanceFound仍然是false
// 于是就会执行UpdateInstanceLastChecked. 这函数是更新指定instance.Key的 last_checked = NOW(), last_check_partial_success = 0/1(如过在err = db.QueryRow("select @@global.hostname...执行前Goto Cleanup, 则partialSuccess=False , last_check_partial_success=0, 否则partialSuccess是True, last_check_partial_success=1)
// last_check_partial_success 表示, 至少我们连接到数据库, 而且能执行select @@global.hostname... 查询

// 由此可以看出
latency.Stop("backend")
return nil, err

<- ReadTopologyInstance

继续往下读Discover

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Discover issues a synchronous read on an instance
func (this *HttpAPI) Discover(params martini.Params, r render.Render, req *http.Request, user auth.User) {
if !isAuthorizedForAction(req, user) {
Respond(r, &APIResponse{Code: ERROR, Message: "Unauthorized"})
return
}
instanceKey, err := this.getInstanceKey(params["host"], params["port"])
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
instance, err := inst.ReadTopologyInstance(&instanceKey)

// 从这继续, 上面简单来说就是解析了下 host
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

if orcraft.IsRaftEnabled() { // 如果orch是raft模式部署
// 这段代码我没看, 我猜测是让其他节点也去discover一遍
orcraft.PublishCommand("discover", instanceKey)
} else {
// 继续顺藤摸瓜, "发现"
logic.DiscoverInstance(instanceKey)
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("Instance discovered: %+v", instance.Key), Details: instance})
}

树藤摸瓜DiscoverInstance

从上面可以看到, 只发现了 172.16.120.10:3306 自身, 也发现了他的从库和主库, 但只是把他的从库ip,port存入了instance.Replicas, 主库存入了instance.MasterKey, 没有再进一步探测这些从库和主库.

其实继续的”发现”工作是在DiscoverInstance 中做的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// DiscoverInstance will attempt to discover (poll) an instance (unless
// it is already up to date) and will also ensure that its master and
// replicas (if any) are also checked.
func DiscoverInstance(instanceKey inst.InstanceKey) {

... 省略部分不重要代码
// Calculate the expiry period each time as InstancePollSeconds
// _may_ change during the run of the process (via SIGHUP) and
// it is not possible to change the cache's default expiry..
// 这个if很重要, 这里尝试向recentDiscoveryOperationKeys这个cache里add key, key就是instanceKey.DisplayString()
// add前会先查这个cache里有没有这个key, 如果有, 会返回err
// 如果没查到这个key, 就会添加到cache, 并且设置过期时间为Config.InstancePollSeconds默认5秒, 所以从这里控制每个instance的发现间隔为5秒
if existsInCacheError := recentDiscoveryOperationKeys.Add(instanceKey.DisplayString(), true, instancePollSecondsDuration()); existsInCacheError != nil {
// Just recently attempted
return
}

... 省略部分不重要代码
// 从库里再把`172.16.120.10:3306` 信息取出来
instance, found, err := inst.ReadInstance(&instanceKey)

... 省略部分不重要代码

// First we've ever heard of this instance. Continue investigation:
instance, err = inst.ReadTopologyInstanceBufferable(&instanceKey, config.Config.BufferInstanceWrites, latency)

... 省略部分不重要代码

// Investigate replicas and members of the same replication group:
// 把从库取出来了
for _, replicaKey := range append(instance.ReplicationGroupMembers.GetInstanceKeys(), instance.Replicas.GetInstanceKeys()...) {
replicaKey := replicaKey // not needed? no concurrency here?

// Avoid noticing some hosts we would otherwise discover
if inst.FiltersMatchInstanceKey(&replicaKey, config.Config.DiscoveryIgnoreReplicaHostnameFilters) {
continue
}

if replicaKey.IsValid() {
// 放到一个discoveryQueue发现队列里了
discoveryQueue.Push(replicaKey) // Push时, 会记录这个key是什么时间push进去的
}
}
// Investigate master:
// 把主库也取出来了
if instance.MasterKey.IsValid() {
if !inst.FiltersMatchInstanceKey(&instance.MasterKey, config.Config.DiscoveryIgnoreMasterHostnameFilters) {
// 主库也放到一个discoveryQueue发现队列里了
discoveryQueue.Push(instance.MasterKey)
}
}

discoveryQueue

全局搜discoveryQueue, 肯定有人消费这个队列

果然搜到 instanceKey := discoveryQueue.Consume()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// handleDiscoveryRequests iterates the discoveryQueue channel and calls upon
// instance discovery per entry.
func handleDiscoveryRequests() {
discoveryQueue = discovery.CreateOrReturnQueue("DEFAULT")

// create a pool of discovery workers
for i := uint(0); i < config.Config.DiscoveryMaxConcurrency; i++ {
go func() {
for {
instanceKey := discoveryQueue.Consume()
// Possibly this used to be the elected node, but has
// been demoted, while still the queue is full.
if !IsLeaderOrActive() {
log.Debugf("Node apparently demoted. Skipping discovery of %+v. "+
"Remaining queue size: %+v", instanceKey, discoveryQueue.QueueLen())
discoveryQueue.Release(instanceKey)
continue
}
// 看这里, 继续通过DiscoverInstance发现
DiscoverInstance(instanceKey)
discoveryQueue.Release(instanceKey)
}
}()
}
}

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2022 Fan() All Rights Reserved.

访客数 : | 访问量 :