MySQL Error log监控
告警日志监控其实比较简单了, 怎么做都行. 目前我们这里是使用下面的方法
FileBeat采集日志 -> Kafka -> 自己写脚本消费出来 -> 企业微信机器人告警
效果如下
这里给出FileBeat配置, 很简单1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46[root@node002142 filebeat-7.2.0-dba]# cat filebeat.yml
logging:
level: warning
json: true
filebeat.config.inputs:
enabled: true
path: configs/*.yml
reload.enabled: true
reload.period: 10s
http:
enabled: true
host: "0.0.0.0"
port: 5066
processors:
- drop_fields:
fields: ["beat.name", "beat.version", "input_type", "offset"]
- add_host_metadata:
netinfo.enabled: true
output.kafka:
hosts: ["192.168.x.xx:9092","192.168.x.xx:9092","192.168.x.xx:9092"]
topic: '%{[kafka_topic]}'
partition.round_robin:
reachable_only: true
required_acks: 1
max_message_bytes: 8388608
compression: gzip
bulk_max_size: 2048
worker: 6
keep_alive: 600
channel_buffer_size: 2560
version: 2.0.0 --filebeat7.2才支持最新版本的kafka, 虽然我们的kafka是2.1.2 但是这里也得写2.0.0
[root@node002142 configs]# ll
total 8
-rw-r--r-- 1 root root 247 Jul 15 18:40 mysql_error_log.yml
[root@node002142 filebeat-7.2.0-dba]# cat configs/mysql_error_log.yml
- type: log
paths:
- /data/mysql_*/logs/*.err
fields:
type: mysql_error_log
format: plain
kafka_topic: log_mysql_error_log
fields_under_root: true
max_backoff: 3s
写入Kafka的消息如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49{
"@timestamp": "2019-07-21T01:43:22.091Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.2.0",
"topic": "log_mysql_error_log"
},
"input": {
"type": "log"
},
"kafka_topic": "log_mysql_error_log",
"type": "mysql_error_log",
"format": "plain",
"host": {
"name": "node00xxx",
"id": "ea3afe477be14c22abf234dd3cb80f55",
"containerized": false,
"ip": ["10.1.x.xx, "fe80::xx:xx:xx", "192.168.x.xx", "fe80::xx:xx:xx"],
"mac": ["80:18:xx:xx:xx", "80:18:xx:xx:xx", "80:18:44:xx:xx:xx", "80:18:44:xx:xx:xx"],
"hostname": "node002111",
"architecture": "x86_64",
"os": {
"platform": "centos",
"version": "7 (Core)",
"family": "redhat",
"name": "CentOS Linux",
"kernel": "4.15.9-1.el7.elrepo.x86_64",
"codename": "Core"
}
},
"agent": {
"ephemeral_id": "c97d819c-c456-42d0-xxxx-xxxxxxxxx",
"hostname": "node00xxx",
"id": "359f6ddb-de27-42c9-9ce1-1624205d6af0",
"version": "7.2.0",
"type": "filebeat"
},
"log": {
"offset": 2585741,
"file": {
"path": "/data/mysql_3306/logs/node00xxxx.err"
}
},
"message": "2019-07-21T01:43:21.585528Z 4345026 [Note] Access denied for user 'user'@'192.168.x.xx' (using password: YES)",
"ecs": {
"version": "1.0.0"
}
}
Python脚本1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195# -*- coding: utf8 -*-
# __author__ = 'Fan()'
# Date: 2019-07-18
import time
import json
import pytz
import requests
import datetime
import logging
from utils.conn_db import Fandb
from utils.config import *
from confluent_kafka import Consumer, KafkaError, TopicPartition, OFFSET_END, OFFSET_BEGINNING, Producer
class MyRequest():
@staticmethod
def get(url, params=None, timeout=(2, 5)):
response = requests.get(url=url, params=params, timeout=timeout)
if response.status_code == requests.codes.ok:
return response.json()
else:
response.raise_for_status()
@staticmethod
def post(url, data=None, json=None, timeout=(5)):
response = requests.post(url=url, data=data, json=json, timeout=timeout)
if response.status_code == requests.codes.ok:
return response.json()
else:
response.raise_for_status()
def confLog(logfile):
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename=logfile,
filemode='a')
def _on_send_response(err, partations):
pt = partations[0]
if isinstance(err, KafkaError):
print('Topic {} 偏移量 {} 提交异常. {}'.format(pt.topic, pt.offset, err))
logging.error('Topic {} 偏移量 {} 提交异常. {}'.format(pt.topic, pt.offset, err))
# raise Exception(err)
def getConsumer(topic_name, bootstrap_servers, offset_end=True):
config = {'bootstrap.servers': bootstrap_servers,
"group.id": topic_name,
'enable.auto.commit': True,
"fetch.wait.max.ms": 3000,
"max.poll.interval.ms": 60000,
'session.timeout.ms': 60000,
"on_commit": _on_send_response,
"default.topic.config": {"auto.offset.reset": "latest"}}
consumer = Consumer(config)
offset = OFFSET_END if offset_end else OFFSET_BEGINNING
pt = TopicPartition(topic_name, 0, offset) # 动态获取 一级kafka的 topic
consumer.assign([pt])
# consumer.seek(pt)
try:
while True:
ret = consumer.consume(num_messages=10, timeout=0.1)
if ret is None:
print("No message Continue!")
continue
for msg in ret:
if msg.error() is None:
# print("Received message:{}".format(msg.value().decode("utf-8")))
yield msg.value().decode("utf-8")
elif msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise Exception(msg.error())
except Exception as e:
print(e)
consumer.close()
except KeyboardInterrupt:
consumer.close()
def utc_to_local(utc_time_str, utc_format='%Y-%m-%dT%H:%M:%S.%fZ'):
local_tz = pytz.timezone('Asia/Chongqing')
local_format = "%Y-%m-%d %H:%M:%S"
utc_dt = datetime.datetime.strptime(utc_time_str, utc_format)
local_dt = utc_dt.replace(tzinfo=pytz.utc).astimezone(local_tz)
time_str = local_dt.strftime(local_format)
return time_str
def get_mysql_ip(ips, hostname):
last_ip = str(int(hostname[-3:]))
for i in ips:
x = i.split('.')
if len(x) == 4:
# 过滤掉本机vip
if x[3] == last_ip and x[2] not in ('3', '8', '16') and x[0] == '192':
return i
def get_mysql_port(error_log_file):
spliter = 'mysql_'
mysql_port = error_log_file.split(spliter)[1].split('/')[0]
if mysql_port == '':
mysql_port = 3306
return mysql_port
def sendWechatBot(send_message):
web_hoot_address = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你的机器人key'
body = {
"msgtype": "markdown",
"markdown": {
"content": '''<font color=\"warning\">捕获告警日志报错信息:</font>\n
>产品线:<font color=\"comment\">{product_name}</font>
>项目名称:<font color=\"comment\">{project_name}</font>
>高可用组:<font color=\"comment\">{ha_group_name}</font>
>IP:<font color=\"comment\">{ip_app}</font>
>PORT:<font color=\"comment\">{port}</font>
>节点角色:<font color=\"comment\">{role_name}</font>
>告警时间:<font color=\"comment\">{error_timestamp}</font>
>捕获时间:<font color=\"comment\">{catch_timestamp}</font>
>下发时间:<font color=\"comment\">{send_timestamp}</font>
告警内容:
<font color=\"comment\">{error_message}</font>\n
'''.format(**send_message)
}
}
data = MyRequest.post(web_hoot_address, json=body)
return data
def get_mysql_info(mysql_ip, mysql_port):
conn = Fandb(cmdb_host, cmdb_port, cmdb_user, cmdb_pass, cmdb_schema, dic=True)
sql = '一个根据ip端口查询数据库实例想关信息的SQL'
res = conn.dql(sql)
conn.close()
return res[0]
if __name__ == '__main__':
verbose = 1
logfile = '/tools/mysql_error_log_watchdog.log'
confLog(logfile)
topic = 'log_mysql_error_log'
bootstrap_servers = "192.168.x.xxx:9092,192.168.x.xxx:9092,192.168.x.xxx:9092"
consumer = getConsumer(topic, bootstrap_servers)
for message in consumer:
message_dict = json.loads(message)
if verbose >= 3: print(message_dict)
catch_timestamp = utc_to_local(message_dict['@timestamp'])
error_log_file = message_dict['log']['file']['path']
hostname = message_dict['host']['hostname']
ips = message_dict['host']['ip']
mysql_ip = get_mysql_ip(ips, hostname)
mysql_port = get_mysql_port(error_log_file)
error_log_message = message_dict['message']
if '[ERROR]' in error_log_message:
try:
catch_timestamp = utc_to_local(message_dict['@timestamp'])
error_log_file = message_dict['log']['file']['path']
hostname = message_dict['host']['hostname']
ips = message_dict['host']['ip']
mysql_ip = get_mysql_ip(ips, hostname)
mysql_port = get_mysql_port(error_log_file)
error_timestamp = utc_to_local(error_log_message.split(' ')[0])
error_message = ' '.join(message_dict['message'].split(' ')[2:])
send_message_dict = get_mysql_info(mysql_ip, mysql_port)
send_message_dict['catch_timestamp'] = catch_timestamp
send_message_dict['error_timestamp'] = error_timestamp
send_message_dict['send_timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
send_message_dict['error_message'] = error_message
logging.info(send_message_dict)
sendWechatBot(send_message_dict)
time.sleep(3) #企业微信机器人限制3秒一条
except Exception as e:
logging.exception(e)
logging.exception(error_log_message)
如果觉得上面的太麻烦,其实一个shell脚本也可以搞, 就是得在每个机器部署运行
1 |
|