告別加班!Python腳本實(shí)現(xiàn)運(yùn)維工作自動(dòng)化的5個(gè)實(shí)用案例
前言:還在為重復(fù)性運(yùn)維工作而煩惱?每天被各種告警、監(jiān)控、部署搞得焦頭爛額?作為一名有10年經(jīng)驗(yàn)的運(yùn)維老司機(jī),今天分享5個(gè)超實(shí)用的Python自動(dòng)化腳本,讓你的運(yùn)維工作效率提升300%!這些都是我在生產(chǎn)環(huán)境中實(shí)際使用的案例,代碼簡(jiǎn)潔高效,拿來即用!
案例1:批量服務(wù)器健康檢查腳本
痛點(diǎn):每天早上需要檢查幾十臺(tái)服務(wù)器的CPU、內(nèi)存、磁盤使用情況,手動(dòng)登錄太費(fèi)時(shí)。
解決方案:一鍵批量檢查,異常自動(dòng)告警!
#!/usr/bin/env python3 importpsutil importsmtplib fromemail.mime.textimportMIMEText importjson fromdatetimeimportdatetime classServerHealthChecker: def__init__(self, thresholds=None): self.thresholds = thresholdsor{ 'cpu_percent':80, 'memory_percent':85, 'disk_percent':90 } self.alerts = [] defcheck_system_health(self): """檢查系統(tǒng)健康狀況""" health_data = { 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'hostname': psutil.boot_time(), 'cpu_percent': psutil.cpu_percent(interval=1), 'memory': psutil.virtual_memory(), 'disk': psutil.disk_usage('/'), 'processes':len(psutil.pids()) } # CPU檢查 ifhealth_data['cpu_percent'] >self.thresholds['cpu_percent']: self.alerts.append(f" CPU使用率過高:{health_data['cpu_percent']:.1f}%") # 內(nèi)存檢查 memory_percent = health_data['memory'].percent ifmemory_percent >self.thresholds['memory_percent']: self.alerts.append(f" 內(nèi)存使用率過高:{memory_percent:.1f}%") # 磁盤檢查 disk_percent = (health_data['disk'].used / health_data['disk'].total) *100 ifdisk_percent >self.thresholds['disk_percent']: self.alerts.append(f" 磁盤使用率過高:{disk_percent:.1f}%") returnhealth_data,self.alerts defsend_alert_email(self, alerts, to_email): """發(fā)送告警郵件""" ifnotalerts: return msg = MIMEText(' '.join(alerts),'plain','utf-8') msg['Subject'] =f' 服務(wù)器健康檢查告警 -{datetime.now().strftime("%Y-%m-%d %H:%M")}' msg['From'] ='monitor@company.com' msg['To'] = to_email # 這里需要配置SMTP服務(wù)器 print(f"告警郵件內(nèi)容: {chr(10).join(alerts)}") # 使用示例 if__name__ =="__main__": checker = ServerHealthChecker() health_data, alerts = checker.check_system_health() print(f" 系統(tǒng)健康檢查完成 -{health_data['timestamp']}") print(f" CPU:{health_data['cpu_percent']:.1f}%") print(f" 內(nèi)存:{health_data['memory'].percent:.1f}%") print(f" 磁盤:{(health_data['disk'].used / health_data['disk'].total *100):.1f}%") ifalerts: checker.send_alert_email(alerts,'admin@company.com')
效果:原本需要30分鐘的檢查工作,現(xiàn)在1分鐘搞定!
案例2:自動(dòng)化日志分析與異常檢測(cè)
痛點(diǎn):每天幾GB的日志文件,人工查找異常像大海撈針。
解決方案:智能分析日志,自動(dòng)提取關(guān)鍵異常信息!
#!/usr/bin/env python3 importre importos fromcollectionsimportCounter, defaultdict fromdatetimeimportdatetime, timedelta importgzip classLogAnalyzer: def__init__(self, log_path): self.log_path = log_path self.error_patterns = [ r'ERROR|FATAL|CRITICAL', r'Exception|Error|Failed', r'timeout|refused|denied', r'5d{2}s', # HTTP 5xx錯(cuò)誤 ] self.results = defaultdict(list) defparse_log_line(self, line): """解析日志行,提取時(shí)間戳、級(jí)別、消息""" # 匹配常見日志格式:2024-01-15 1045 [ERROR] message pattern =r'(d{4}-d{2}-d{2}sd{2}:d{2}:d{2})s+[(w+)]s+(.*)' match= re.match(pattern, line) ifmatch: return{ 'timestamp':match.group(1), 'level':match.group(2), 'message':match.group(3) } returnNone defanalyze_errors(self, hours_back=24): """分析指定時(shí)間內(nèi)的錯(cuò)誤""" cutoff_time = datetime.now() - timedelta(hours=hours_back) error_counter = Counter() error_details = [] # 支持壓縮日志 open_func = gzip.openifself.log_path.endswith('.gz')elseopen try: withopen_func(self.log_path,'rt', encoding='utf-8')asf: forline_num, lineinenumerate(f,1): parsed =self.parse_log_line(line.strip()) ifnotparsed: continue # 檢查是否在時(shí)間范圍內(nèi) try: log_time = datetime.strptime(parsed['timestamp'],'%Y-%m-%d %H:%M:%S') iflog_time < cutoff_time: ? ? ? ? ? ? ? ? ? ? ? ? ? ??continue ? ? ? ? ? ? ? ? ? ??except?ValueError: ? ? ? ? ? ? ? ? ? ? ? ??continue ? ? ? ? ? ? ? ? ? ?? ? ? ? ? ? ? ? ? ? ??# 檢查是否匹配錯(cuò)誤模式 ? ? ? ? ? ? ? ? ? ??for?pattern?in?self.error_patterns: ? ? ? ? ? ? ? ? ? ? ? ??if?re.search(pattern, line, re.IGNORECASE): ? ? ? ? ? ? ? ? ? ? ? ? ? ? error_counter[parsed['level']] +=?1 ? ? ? ? ? ? ? ? ? ? ? ? ? ? error_details.append({ ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??'line': line_num, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??'timestamp': parsed['timestamp'], ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??'level': parsed['level'], ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??'message': parsed['message'][:100] +?'...'?if?len(parsed['message']) >100elseparsed['message'] }) break exceptExceptionase: print(f" 分析日志文件失敗:{e}") returnNone return{ 'error_summary':dict(error_counter), 'error_details': error_details[-10:], # 只返回最近10條 'total_errors':sum(error_counter.values()) } defgenerate_report(self, analysis_result): """生成分析報(bào)告""" ifnotanalysis_result: return" 日志分析失敗" report = [ f" 日志分析報(bào)告 -{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f" 日志文件:{os.path.basename(self.log_path)}", f" 總錯(cuò)誤數(shù):{analysis_result['total_errors']}", "", " 錯(cuò)誤級(jí)別統(tǒng)計(jì):" ] forlevel, countinanalysis_result['error_summary'].items(): report.append(f" {level}:{count}次") report.append(" 最近錯(cuò)誤詳情:") forerrorinanalysis_result['error_details']: report.append(f" [{error['timestamp']}]{error['level']}:{error['message']}") return' '.join(report) # 使用示例 if__name__ =="__main__": # 替換為實(shí)際日志路徑 log_file ="/var/log/application.log" ifos.path.exists(log_file): analyzer = LogAnalyzer(log_file) result = analyzer.analyze_errors(hours_back=24) report = analyzer.generate_report(result) print(report) else: print(f" 日志文件不存在:{log_file}")
效果:自動(dòng)識(shí)別異常模式,快速定位問題,節(jié)省80%的日志分析時(shí)間!
案例3:自動(dòng)化部署腳本
痛點(diǎn):每次發(fā)版都要重復(fù)執(zhí)行一堆命令,容易出錯(cuò),效率低。
解決方案:一鍵自動(dòng)化部署,支持回滾,安全可靠!
#!/usr/bin/env python3 importos importsubprocess importjson fromdatetimeimportdatetime importshutil importtime classAutoDeployer: def__init__(self, config_file="deploy_config.json"): self.config =self.load_config(config_file) self.backup_dir =self.config.get('backup_dir','/backup') self.deploy_log = [] defload_config(self, config_file): """加載部署配置""" default_config = { "app_name":"myapp", "deploy_path":"/opt/myapp", "git_repo":"git@github.com:company/myapp.git", "branch":"main", "backup_dir":"/backup", "services": ["myapp"], "health_check_url":"http://localhost:8080/health", "rollback_keep":3 } ifos.path.exists(config_file): withopen(config_file,'r')asf: user_config = json.load(f) default_config.update(user_config) returndefault_config deflog(self, message, level="INFO"): """記錄部署日志""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry =f"[{timestamp}]{level}:{message}" print(log_entry) self.deploy_log.append(log_entry) defrun_command(self, command, check=True): """執(zhí)行shell命令""" self.log(f"執(zhí)行命令:{command}") try: result = subprocess.run( command, shell=True, capture_output=True, text=True, check=check ) ifresult.stdout: self.log(f"輸出:{result.stdout.strip()}") returnresult exceptsubprocess.CalledProcessErrorase: self.log(f"命令執(zhí)行失敗:{e}","ERROR") self.log(f"錯(cuò)誤輸出:{e.stderr}","ERROR") raise defcreate_backup(self): """創(chuàng)建當(dāng)前版本備份""" ifnotos.path.exists(self.config['deploy_path']): self.log("部署目錄不存在,跳過備份") returnNone timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name =f"{self.config['app_name']}_{timestamp}" backup_path = os.path.join(self.backup_dir, backup_name) os.makedirs(self.backup_dir, exist_ok=True) shutil.copytree(self.config['deploy_path'], backup_path) self.log(f"創(chuàng)建備份:{backup_path}") returnbackup_path defdeploy_new_version(self): """部署新版本""" # 創(chuàng)建臨時(shí)目錄 temp_dir =f"/tmp/{self.config['app_name']}_deploy_{int(time.time())}" try: # 克隆代碼 self.run_command(f"git clone -b{self.config['branch']}{self.config['git_repo']}{temp_dir}") # 停止服務(wù) forserviceinself.config['services']: self.run_command(f"systemctl stop{service}", check=False) # 備份當(dāng)前版本 backup_path =self.create_backup() # 部署新版本 ifos.path.exists(self.config['deploy_path']): shutil.rmtree(self.config['deploy_path']) shutil.copytree(temp_dir,self.config['deploy_path']) # 設(shè)置權(quán)限 self.run_command(f"chown -R appuser:appuser{self.config['deploy_path']}") # 啟動(dòng)服務(wù) forserviceinself.config['services']: self.run_command(f"systemctl start{service}") self.run_command(f"systemctl enable{service}") # 健康檢查 ifself.health_check(): self.log(" 部署成功!") self.cleanup_old_backups() returnTrue else: self.log(" 健康檢查失敗,開始回滾","ERROR") ifbackup_path: self.rollback(backup_path) returnFalse exceptExceptionase: self.log(f"部署失敗:{str(e)}","ERROR") returnFalse finally: # 清理臨時(shí)目錄 ifos.path.exists(temp_dir): shutil.rmtree(temp_dir) defhealth_check(self, max_retries=5): """健康檢查""" importrequests foriinrange(max_retries): try: self.log(f"健康檢查 ({i+1}/{max_retries})") response = requests.get( self.config['health_check_url'], timeout=10 ) ifresponse.status_code ==200: self.log(" 健康檢查通過") returnTrue exceptExceptionase: self.log(f"健康檢查失敗:{e}") ifi < max_retries -?1: ? ? ? ? ? ? ? ? time.sleep(10) ? ? ? ?? ? ? ? ??return?False ? ?? ? ??def?rollback(self, backup_path): ? ? ? ??"""回滾到指定備份""" ? ? ? ??try: ? ? ? ? ? ??# 停止服務(wù) ? ? ? ? ? ??for?service?in?self.config['services']: ? ? ? ? ? ? ? ??self.run_command(f"systemctl stop?{service}", check=False) ? ? ? ? ? ?? ? ? ? ? ? ??# 恢復(fù)備份 ? ? ? ? ? ??if?os.path.exists(self.config['deploy_path']): ? ? ? ? ? ? ? ? shutil.rmtree(self.config['deploy_path']) ? ? ? ? ? ?? ? ? ? ? ? ? shutil.copytree(backup_path,?self.config['deploy_path']) ? ? ? ? ? ?? ? ? ? ? ? ??# 啟動(dòng)服務(wù) ? ? ? ? ? ??for?service?in?self.config['services']: ? ? ? ? ? ? ? ??self.run_command(f"systemctl start?{service}") ? ? ? ? ? ?? ? ? ? ? ? ??self.log(" 回滾完成") ? ? ? ? ? ?? ? ? ? ??except?Exception?as?e: ? ? ? ? ? ??self.log(f"回滾失敗:?{str(e)}",?"ERROR") ? ?? ? ??def?cleanup_old_backups(self): ? ? ? ??"""清理舊備份""" ? ? ? ??if?not?os.path.exists(self.backup_dir): ? ? ? ? ? ??return ? ? ? ?? ? ? ? ? backups = [d?for?d?in?os.listdir(self.backup_dir)? ? ? ? ? ? ? ? ? ??if?d.startswith(self.config['app_name'])] ? ? ? ? backups.sort(reverse=True) ? ? ? ?? ? ? ? ??# 保留指定數(shù)量的備份 ? ? ? ??for?backup?in?backups[self.config['rollback_keep']:]: ? ? ? ? ? ? backup_path = os.path.join(self.backup_dir, backup) ? ? ? ? ? ? shutil.rmtree(backup_path) ? ? ? ? ? ??self.log(f"清理舊備份:?{backup}") # 使用示例 if?__name__ ==?"__main__": ? ? deployer = AutoDeployer() ? ?? ? ??print(" 開始自動(dòng)化部署...") ? ? success = deployer.deploy_new_version() ? ?? ? ??if?success: ? ? ? ??print(" 部署成功完成!") ? ??else: ? ? ? ??print(" 部署失敗,請(qǐng)檢查日志") ? ?? ? ??# 保存部署日志 ? ??with?open(f"deploy_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log",?'w')?as?f: ? ? ? ? f.write(' '.join(deployer.deploy_log))
效果:部署時(shí)間從30分鐘縮短到5分鐘,出錯(cuò)率降低90%!
案例4:資源使用情況監(jiān)控與報(bào)告
痛點(diǎn):需要定期統(tǒng)計(jì)各服務(wù)器資源使用情況,制作報(bào)表給領(lǐng)導(dǎo)看。
解決方案:自動(dòng)收集數(shù)據(jù),生成精美圖表報(bào)告!
#!/usr/bin/env python3 importpsutil importmatplotlib.pyplotasplt importjson importsqlite3 fromdatetimeimportdatetime, timedelta importos # 設(shè)置中文字體(避免圖表中文亂碼) plt.rcParams['font.sans-serif'] = ['SimHei','Arial Unicode MS'] plt.rcParams['axes.unicode_minus'] =False classResourceMonitor: def__init__(self, db_path="resource_monitor.db"): self.db_path = db_path self.init_database() definit_database(self): """初始化數(shù)據(jù)庫""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS resource_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, cpu_percent REAL NOT NULL, memory_percent REAL NOT NULL, disk_percent REAL NOT NULL, network_sent INTEGER NOT NULL, network_recv INTEGER NOT NULL, process_count INTEGER NOT NULL ) ''') conn.commit() conn.close() defcollect_metrics(self): """收集系統(tǒng)指標(biāo)""" # CPU使用率 cpu_percent = psutil.cpu_percent(interval=1) # 內(nèi)存使用情況 memory = psutil.virtual_memory() memory_percent = memory.percent # 磁盤使用情況 disk = psutil.disk_usage('/') disk_percent = (disk.used / disk.total) *100 # 網(wǎng)絡(luò)流量 network = psutil.net_io_counters() network_sent = network.bytes_sent network_recv = network.bytes_recv # 進(jìn)程數(shù)量 process_count =len(psutil.pids()) return{ 'timestamp': datetime.now().isoformat(), 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'disk_percent': disk_percent, 'network_sent': network_sent, 'network_recv': network_recv, 'process_count': process_count } defsave_metrics(self, metrics): """保存指標(biāo)到數(shù)據(jù)庫""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO resource_data (timestamp, cpu_percent, memory_percent, disk_percent, network_sent, network_recv, process_count) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( metrics['timestamp'], metrics['cpu_percent'], metrics['memory_percent'], metrics['disk_percent'], metrics['network_sent'], metrics['network_recv'], metrics['process_count'] )) conn.commit() conn.close() defget_metrics_by_period(self, hours=24): """獲取指定時(shí)間段的指標(biāo)數(shù)據(jù)""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() start_time = (datetime.now() - timedelta(hours=hours)).isoformat() cursor.execute(''' SELECT timestamp, cpu_percent, memory_percent, disk_percent, network_sent, network_recv, process_count FROM resource_data WHERE timestamp >= ? ORDER BY timestamp ''', (start_time,)) data = cursor.fetchall() conn.close() returndata defgenerate_report(self, hours=24): """生成資源使用報(bào)告""" data =self.get_metrics_by_period(hours) ifnotdata: print(" 沒有找到監(jiān)控?cái)?shù)據(jù)") return # 解析數(shù)據(jù) timestamps = [datetime.fromisoformat(row[0])forrowindata] cpu_data = [row[1]forrowindata] memory_data = [row[2]forrowindata] disk_data = [row[3]forrowindata] process_data = [row[6]forrowindata] # 創(chuàng)建圖表 fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2,2, figsize=(15,10)) fig.suptitle(f'系統(tǒng)資源監(jiān)控報(bào)告 - 最近{hours}小時(shí)', fontsize=16) # CPU使用率圖表 ax1.plot(timestamps, cpu_data,'b-', linewidth=2) ax1.set_title('CPU使用率 (%)') ax1.set_ylabel('使用率 (%)') ax1.grid(True, alpha=0.3) ax1.axhline(y=80, color='r', linestyle='--', alpha=0.7, label='警戒線(80%)') ax1.legend() # 內(nèi)存使用率圖表 ax2.plot(timestamps, memory_data,'g-', linewidth=2) ax2.set_title('內(nèi)存使用率 (%)') ax2.set_ylabel('使用率 (%)') ax2.grid(True, alpha=0.3) ax2.axhline(y=85, color='r', linestyle='--', alpha=0.7, label='警戒線(85%)') ax2.legend() # 磁盤使用率圖表 ax3.plot(timestamps, disk_data,'orange', linewidth=2) ax3.set_title('磁盤使用率 (%)') ax3.set_ylabel('使用率 (%)') ax3.set_xlabel('時(shí)間') ax3.grid(True, alpha=0.3) ax3.axhline(y=90, color='r', linestyle='--', alpha=0.7, label='警戒線(90%)') ax3.legend() # 進(jìn)程數(shù)量圖表 ax4.plot(timestamps, process_data,'purple', linewidth=2) ax4.set_title('系統(tǒng)進(jìn)程數(shù)量') ax4.set_ylabel('進(jìn)程數(shù)') ax4.set_xlabel('時(shí)間') ax4.grid(True, alpha=0.3) # 調(diào)整時(shí)間軸顯示 foraxin[ax1, ax2, ax3, ax4]: ax.tick_params(axis='x', rotation=45) plt.tight_layout() # 保存圖表 report_path =f"resource_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" plt.savefig(report_path, dpi=300, bbox_inches='tight') print(f" 報(bào)告已生成:{report_path}") # 顯示統(tǒng)計(jì)信息 self.print_statistics(cpu_data, memory_data, disk_data, process_data) returnreport_path defprint_statistics(self, cpu_data, memory_data, disk_data, process_data): """打印統(tǒng)計(jì)信息""" print(" 統(tǒng)計(jì)摘要:") print(f" CPU: 平均{sum(cpu_data)/len(cpu_data):.1f}%, 最大{max(cpu_data):.1f}%") print(f" 內(nèi)存: 平均{sum(memory_data)/len(memory_data):.1f}%, 最大{max(memory_data):.1f}%") print(f" 磁盤: 平均{sum(disk_data)/len(disk_data):.1f}%, 最大{max(disk_data):.1f}%") print(f" 進(jìn)程: 平均{sum(process_data)//len(process_data)}個(gè), 最大{max(process_data)}個(gè)") defcleanup_old_data(self, days=7): """清理舊數(shù)據(jù)""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cutoff_time = (datetime.now() - timedelta(days=days)).isoformat() cursor.execute('DELETE FROM resource_data WHERE timestamp < ?', (cutoff_time,)) ? ? ? ? deleted_rows = cursor.rowcount ? ? ? ?? ? ? ? ? conn.commit() ? ? ? ? conn.close() ? ? ? ?? ? ? ? ??print(f" 清理了?{deleted_rows}?條舊數(shù)據(jù)") # 使用示例 if?__name__ ==?"__main__": ? ? monitor = ResourceMonitor() ? ?? ? ??# 收集當(dāng)前指標(biāo) ? ??print(" 正在收集系統(tǒng)指標(biāo)...") ? ? metrics = monitor.collect_metrics() ? ? monitor.save_metrics(metrics) ? ??print(" 指標(biāo)收集完成") ? ?? ? ??# 生成報(bào)告(如果有足夠數(shù)據(jù)) ? ??print(" 正在生成資源監(jiān)控報(bào)告...") ? ??try: ? ? ? ? report_path = monitor.generate_report(hours=24) ? ? ? ??if?report_path: ? ? ? ? ? ??print(f" 報(bào)告生成完成:?{report_path}") ? ??except?Exception?as?e: ? ? ? ??print(f" 報(bào)告生成失敗:?{e}") ? ?? ? ??# 清理舊數(shù)據(jù) ? ? monitor.cleanup_old_data(days=7)
效果:自動(dòng)生成專業(yè)圖表報(bào)告,領(lǐng)導(dǎo)看了都說好!數(shù)據(jù)分析效率提升500%!
案例5:智能告警系統(tǒng)
痛點(diǎn):系統(tǒng)異常時(shí)不能及時(shí)發(fā)現(xiàn),經(jīng)常是用戶投訴后才知道出問題。
解決方案:多維度監(jiān)控,多渠道告警,確保第一時(shí)間響應(yīng)!
#!/usr/bin/env python3 importrequests importsmtplib importtime importjson importpsutil fromdatetimeimportdatetime fromemail.mime.textimportMIMEText fromemail.mime.multipartimportMIMEMultipart importlogging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('alert_system.log'), logging.StreamHandler() ] ) classAlertSystem: def__init__(self, config_file="alert_config.json"): self.config =self.load_config(config_file) self.alert_history = {} # 防止重復(fù)告警 self.logger = logging.getLogger(__name__) defload_config(self, config_file): """加載告警配置""" default_config = { "monitors": { "system": { "cpu_threshold":85, "memory_threshold":90, "disk_threshold":95 }, "services": [ {"name":"nginx","port":80}, {"name":"mysql","port":3306}, {"name":"redis","port":6379} ], "urls": [ {"name":"主站","url":"https://www.example.com","timeout":10}, {"name":"API","url":"https://api.example.com/health","timeout":5} ] }, "notifications": { "email": { "enabled":True, "smtp_server":"smtp.company.com", "smtp_port":587, "username":"alert@company.com", "password":"your_password", "recipients": ["admin@company.com","ops@company.com"] }, "webhook": { "enabled":True, "url":"https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK", "channel":"#alerts" } }, "alert_cooldown":300# 5分鐘內(nèi)不重復(fù)同類告警 } ifos.path.exists(config_file): withopen(config_file,'r')asf: user_config = json.load(f) self._merge_config(default_config, user_config) returndefault_config def_merge_config(self, default, user): """遞歸合并配置""" forkey, valueinuser.items(): ifkeyindefaultandisinstance(default[key],dict)andisinstance(value,dict): self._merge_config(default[key], value) else: default[key] = value defcheck_system_resources(self): """檢查系統(tǒng)資源""" alerts = [] thresholds =self.config["monitors"]["system"] # CPU檢查 cpu_percent = psutil.cpu_percent(interval=1) ifcpu_percent > thresholds["cpu_threshold"]: alerts.append({ "type":"system", "level":"critical"ifcpu_percent >95else"warning", "message":f"CPU使用率過高:{cpu_percent:.1f}% (閾值:{thresholds['cpu_threshold']}%)", "metric":"cpu", "value": cpu_percent }) # 內(nèi)存檢查 memory = psutil.virtual_memory() ifmemory.percent > thresholds["memory_threshold"]: alerts.append({ "type":"system", "level":"critical"ifmemory.percent >95else"warning", "message":f"內(nèi)存使用率過高:{memory.percent:.1f}% (閾值:{thresholds['memory_threshold']}%)", "metric":"memory", "value": memory.percent }) # 磁盤檢查 disk = psutil.disk_usage('/') disk_percent = (disk.used / disk.total) *100 ifdisk_percent > thresholds["disk_threshold"]: alerts.append({ "type":"system", "level":"critical", "message":f"磁盤使用率過高:{disk_percent:.1f}% (閾值:{thresholds['disk_threshold']}%)", "metric":"disk", "value": disk_percent }) returnalerts defcheck_services(self): """檢查服務(wù)端口""" alerts = [] forserviceinself.config["monitors"]["services"]: try: importsocket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) result = sock.connect_ex(('localhost', service["port"])) sock.close() ifresult !=0: alerts.append({ "type":"service", "level":"critical", "message":f"服務(wù){(diào)service['name']}端口{service['port']}無法連接", "metric":"service_port", "service": service["name"], "port": service["port"] }) exceptExceptionase: alerts.append({ "type":"service", "level":"critical", "message":f"檢查服務(wù){(diào)service['name']}時(shí)發(fā)生錯(cuò)誤:{str(e)}", "metric":"service_check_error", "service": service["name"] }) returnalerts defcheck_urls(self): """檢查URL可用性""" alerts = [] forurl_configinself.config["monitors"]["urls"]: try: response = requests.get( url_config["url"], timeout=url_config["timeout"] ) ifresponse.status_code !=200: alerts.append({ "type":"url", "level":"critical", "message":f"URL{url_config['name']}返回狀態(tài)碼:{response.status_code}", "metric":"http_status", "url": url_config["url"], "status_code": response.status_code }) elifresponse.elapsed.total_seconds() > url_config["timeout"] *0.8: alerts.append({ "type":"url", "level":"warning", "message":f"URL{url_config['name']}響應(yīng)較慢:{response.elapsed.total_seconds():.2f}秒", "metric":"response_time", "url": url_config["url"], "response_time": response.elapsed.total_seconds() }) exceptrequests.exceptions.Timeout: alerts.append({ "type":"url", "level":"critical", "message":f"URL{url_config['name']}請(qǐng)求超時(shí) (>{url_config['timeout']}秒)", "metric":"timeout", "url": url_config["url"] }) exceptExceptionase: alerts.append({ "type":"url", "level":"critical", "message":f"URL{url_config['name']}檢查失敗:{str(e)}", "metric":"connection_error", "url": url_config["url"] }) returnalerts defshould_send_alert(self, alert): """檢查是否應(yīng)該發(fā)送告警(防止重復(fù)告警)""" alert_key =f"{alert['type']}_{alert['metric']}" current_time = time.time() ifalert_keyinself.alert_history: last_alert_time =self.alert_history[alert_key] ifcurrent_time - last_alert_time""" ifcritical_alerts: html +="系統(tǒng)監(jiān)控告警
檢測(cè)時(shí)間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
嚴(yán)重告警
" foralertincritical_alerts: html +=f'{alert["message"]}' ifwarning_alerts: html +="警告告警
" foralertinwarning_alerts: html +=f'{alert["message"]}' html +="""