MySQL Error log监控

MySQL Error log监控

告警日志监控其实比较简单了, 怎么做都行. 目前我们这里是使用下面的方法
FileBeat采集日志 -> Kafka -> 自己写脚本消费出来 -> 企业微信机器人告警
image
效果如下
image

这里给出FileBeat配置, 很简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
[root@node002142 filebeat-7.2.0-dba]# cat filebeat.yml
logging:
level: warning
json: true
filebeat.config.inputs:
enabled: true
path: configs/*.yml
reload.enabled: true
reload.period: 10s
http:
enabled: true
host: "0.0.0.0"
port: 5066
processors:
- drop_fields:
fields: ["beat.name", "beat.version", "input_type", "offset"]
- add_host_metadata:
netinfo.enabled: true
output.kafka:
hosts: ["192.168.x.xx:9092","192.168.x.xx:9092","192.168.x.xx:9092"]
topic: '%{[kafka_topic]}'
partition.round_robin:
reachable_only: true
required_acks: 1
max_message_bytes: 8388608
compression: gzip
bulk_max_size: 2048
worker: 6
keep_alive: 600
channel_buffer_size: 2560
version: 2.0.0 --filebeat7.2才支持最新版本的kafka, 虽然我们的kafka是2.1.2 但是这里也得写2.0.0

[root@node002142 configs]# ll
total 8
-rw-r--r-- 1 root root 247 Jul 15 18:40 mysql_error_log.yml

[root@node002142 filebeat-7.2.0-dba]# cat configs/mysql_error_log.yml
- type: log
paths:
- /data/mysql_*/logs/*.err
fields:
type: mysql_error_log
format: plain
kafka_topic: log_mysql_error_log
fields_under_root: true
max_backoff: 3s

写入Kafka的消息如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
{
"@timestamp": "2019-07-21T01:43:22.091Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.2.0",
"topic": "log_mysql_error_log"
},
"input": {
"type": "log"
},
"kafka_topic": "log_mysql_error_log",
"type": "mysql_error_log",
"format": "plain",
"host": {
"name": "node00xxx",
"id": "ea3afe477be14c22abf234dd3cb80f55",
"containerized": false,
"ip": ["10.1.x.xx, "fe80::xx:xx:xx", "192.168.x.xx", "fe80::xx:xx:xx"],
"mac": ["80:18:xx:xx:xx", "80:18:xx:xx:xx", "80:18:44:xx:xx:xx", "80:18:44:xx:xx:xx"],
"hostname": "node002111",
"architecture": "x86_64",
"os": {
"platform": "centos",
"version": "7 (Core)",
"family": "redhat",
"name": "CentOS Linux",
"kernel": "4.15.9-1.el7.elrepo.x86_64",
"codename": "Core"
}
},
"agent": {
"ephemeral_id": "c97d819c-c456-42d0-xxxx-xxxxxxxxx",
"hostname": "node00xxx",
"id": "359f6ddb-de27-42c9-9ce1-1624205d6af0",
"version": "7.2.0",
"type": "filebeat"
},
"log": {
"offset": 2585741,
"file": {
"path": "/data/mysql_3306/logs/node00xxxx.err"
}
},
"message": "2019-07-21T01:43:21.585528Z 4345026 [Note] Access denied for user 'user'@'192.168.x.xx' (using password: YES)",
"ecs": {
"version": "1.0.0"
}
}

Python脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# -*- coding: utf8 -*-
# __author__ = 'Fan()'
# Date: 2019-07-18

import time
import json
import pytz
import requests
import datetime
import logging

from utils.conn_db import Fandb
from utils.config import *
from confluent_kafka import Consumer, KafkaError, TopicPartition, OFFSET_END, OFFSET_BEGINNING, Producer


class MyRequest():
@staticmethod
def get(url, params=None, timeout=(2, 5)):
response = requests.get(url=url, params=params, timeout=timeout)
if response.status_code == requests.codes.ok:
return response.json()
else:
response.raise_for_status()

@staticmethod
def post(url, data=None, json=None, timeout=(5)):
response = requests.post(url=url, data=data, json=json, timeout=timeout)
if response.status_code == requests.codes.ok:
return response.json()
else:
response.raise_for_status()


def confLog(logfile):
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=logfile,
filemode='a')


def _on_send_response(err, partations):
pt = partations[0]
if isinstance(err, KafkaError):
print('Topic {} 偏移量 {} 提交异常. {}'.format(pt.topic, pt.offset, err))
logging.error('Topic {} 偏移量 {} 提交异常. {}'.format(pt.topic, pt.offset, err))
# raise Exception(err)


def getConsumer(topic_name, bootstrap_servers, offset_end=True):
config = {'bootstrap.servers': bootstrap_servers,
"group.id": topic_name,
'enable.auto.commit': True,
"fetch.wait.max.ms": 3000,
"max.poll.interval.ms": 60000,
'session.timeout.ms': 60000,
"on_commit": _on_send_response,
"default.topic.config": {"auto.offset.reset": "latest"}}

consumer = Consumer(config)
offset = OFFSET_END if offset_end else OFFSET_BEGINNING
pt = TopicPartition(topic_name, 0, offset) # 动态获取 一级kafka的 topic
consumer.assign([pt])
# consumer.seek(pt)

try:
while True:
ret = consumer.consume(num_messages=10, timeout=0.1)
if ret is None:
print("No message Continue!")
continue
for msg in ret:
if msg.error() is None:
# print("Received message:{}".format(msg.value().decode("utf-8")))
yield msg.value().decode("utf-8")
elif msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise Exception(msg.error())
except Exception as e:
print(e)
consumer.close()
except KeyboardInterrupt:
consumer.close()


def utc_to_local(utc_time_str, utc_format='%Y-%m-%dT%H:%M:%S.%fZ'):
local_tz = pytz.timezone('Asia/Chongqing')
local_format = "%Y-%m-%d %H:%M:%S"
utc_dt = datetime.datetime.strptime(utc_time_str, utc_format)
local_dt = utc_dt.replace(tzinfo=pytz.utc).astimezone(local_tz)
time_str = local_dt.strftime(local_format)
return time_str


def get_mysql_ip(ips, hostname):
last_ip = str(int(hostname[-3:]))
for i in ips:
x = i.split('.')
if len(x) == 4:
# 过滤掉本机vip
if x[3] == last_ip and x[2] not in ('3', '8', '16') and x[0] == '192':
return i


def get_mysql_port(error_log_file):
spliter = 'mysql_'

mysql_port = error_log_file.split(spliter)[1].split('/')[0]
if mysql_port == '':
mysql_port = 3306

return mysql_port


def sendWechatBot(send_message):
web_hoot_address = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key'
body = {
"msgtype": "markdown",
"markdown": {
"content": '''<font color=\"warning\">捕获告警日志报错信息:</font>\n
>产品线:<font color=\"comment\">{product_name}</font>
>项目名称:<font color=\"comment\">{project_name}</font>
>高可用组:<font color=\"comment\">{ha_group_name}</font>
>IP:<font color=\"comment\">{ip_app}</font>
>PORT:<font color=\"comment\">{port}</font>
>节点角色:<font color=\"comment\">{role_name}</font>
>告警时间:<font color=\"comment\">{error_timestamp}</font>
>捕获时间:<font color=\"comment\">{catch_timestamp}</font>
>下发时间:<font color=\"comment\">{send_timestamp}</font>

告警内容:
<font color=\"comment\">{error_message}</font>\n
'''.format(**send_message)
}
}
data = MyRequest.post(web_hoot_address, json=body)
return data


def get_mysql_info(mysql_ip, mysql_port):
conn = Fandb(cmdb_host, cmdb_port, cmdb_user, cmdb_pass, cmdb_schema, dic=True)
sql = '一个根据ip端口查询数据库实例想关信息的SQL'
res = conn.dql(sql)
conn.close()
return res[0]


if __name__ == '__main__':
verbose = 1
logfile = '/tools/mysql_error_log_watchdog.log'
confLog(logfile)

topic = 'log_mysql_error_log'
bootstrap_servers = "192.168.x.xxx:9092,192.168.x.xxx:9092,192.168.x.xxx:9092"

consumer = getConsumer(topic, bootstrap_servers)

for message in consumer:
message_dict = json.loads(message)

if verbose >= 3: print(message_dict)

catch_timestamp = utc_to_local(message_dict['@timestamp'])
error_log_file = message_dict['log']['file']['path']
hostname = message_dict['host']['hostname']
ips = message_dict['host']['ip']
mysql_ip = get_mysql_ip(ips, hostname)
mysql_port = get_mysql_port(error_log_file)

error_log_message = message_dict['message']
if '[ERROR]' in error_log_message:
try:
catch_timestamp = utc_to_local(message_dict['@timestamp'])
error_log_file = message_dict['log']['file']['path']
hostname = message_dict['host']['hostname']
ips = message_dict['host']['ip']
mysql_ip = get_mysql_ip(ips, hostname)
mysql_port = get_mysql_port(error_log_file)

error_timestamp = utc_to_local(error_log_message.split(' ')[0])
error_message = ' '.join(message_dict['message'].split(' ')[2:])
send_message_dict = get_mysql_info(mysql_ip, mysql_port)
send_message_dict['catch_timestamp'] = catch_timestamp
send_message_dict['error_timestamp'] = error_timestamp
send_message_dict['send_timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
send_message_dict['error_message'] = error_message
logging.info(send_message_dict)
sendWechatBot(send_message_dict)
time.sleep(3) #企业微信机器人限制3秒一条
except Exception as e:
logging.exception(e)
logging.exception(error_log_message)

如果觉得上面的太麻烦,其实一个shell脚本也可以搞, 就是得在每个机器部署运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/bin/bash
# Oracle警告日志文件监控脚本
# 2015/4/14 King.

# 发送邮件
sendMail()
{
echo "$1"
mailTo=$(echo $mailTo | sed 's/,/ /g')
echo "$1" | /usr/bin/mutt -s "$(date +"%Y-%m-%d %H:%M:%S") 警告日志错误" -b ${mailTo} -c ${mailCc}
}

# 判断错误信息中是否有未扫描的行
checkHis()
{
tag=0
hisLine=$(cat $errLineNumFile)
for i in $hisLine
do
if [ "$1" == "$i" ]; then
tag=1
fi
done
return $tag
}

#
scriptDir=`pwd $0`
scriptName=`basename $0`
logDir=$scriptDir/logs
logfile=$logDir/alert_error.log
errLineNumFile=$logDir/.alert_errLineNum


#设置警告日日志文件路径
alertFilePath="/data/mysql_3306/logs/nodexx.err"

# 设置邮件接收者,多个用逗号分隔
mailTo="xx@163.com"

# 设置邮件抄送者,多个用逗号分隔
mailCc="a@163.com,b@163.com"

[ ! -f $alertFilePath ] && echo "[Error]: $alertFilePath no such file or directory." && exit 1
[ ! -d $logDir ] && mkdir -p $logDir
touch $errLineNumFile
echo "正在监控 $alertFilePath... "
while true
do
arrayNum=()
isError=false
# 取出警告日志中 ”ORA?“关键字所在的行
errNum=$(cat $alertFilePath | grep -n -i "[ERROR]]")
n=0
if [ "x$errNum" != "x" ]; then
# 取出错误行号
errLineNum=$(echo "$errNum" | awk -F: '{print $1}')
for num in $errLineNum
do
#判断该行错误信息是否已扫描
if [ "x$errLineNum" != "x" ]; then
checkHis "$num"
if [ $? -eq 0 ]; then
# 如果该行错误未扫描,记录该行信息
isError=true
echo $num >> $errLineNumFile
arrayNum[$n]=$num
let n++
fi
else
#如果没有错误休眠10s后重新扫描
sleep 10s
break
fi
done
fi
# 如果发现未扫描的错误信息则根据行号取出该行信息记录日志,并且发送邮件
if [ "$isError" == "true" ]
then
echo "-------------------------------- $(date +"%Y-%m-%d %H:%M:%S") ---------------------------------------" >> $logfile
i=0
errMsg=$(
while [ $i -lt ${#arrayNum[@]} ]; do
echo "$errNum" | grep "^${arrayNum[$i]}:"
let i++
done)
echo "$errMsg" >> $logfile
sendMail "$errMsg"
fi
#每10s,扫描一次警告日志文件
sleep 10s
done

Powered by Hexo and Hexo-theme-hiker

Copyright © 2013 - 2022 Fan() All Rights Reserved.

访客数 : | 访问量 :