🗣 Wd提问: 服务器监控源码定制
🤖 Ai回答: 我来为您提供一个完整的服务器监控系统源码方案。
1、基础监控类设计
python
import psutil
import time
import json
import logging
from datetime import datetime
from typing import Dict, Any
import requests
import sqlite3
import threading
class ServerMonitor:
def __init__(self, config_path=config.json):
self.config = self.load_config(config_path)
self.setup_logging()
self.db_connection = self.init_database()
def load_config(self, config_path):
加载配置文件
try:
with open(config_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
默认配置
default_config = {
monitor_interval: 60,
alert_thresholds: {
cpu_usage: 80,
memory_usage: 85,
disk_usage: 90
},
notification: {
webhook_url: ,
email_enabled: False
}
}
with open(config_path, 'w') as f:
json.dump(default_config, f, indent=2)
return default_config
def setup_logging(self):
设置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
handlers=[
logging.FileHandler('server_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def init_database(self):
初始化数据库
conn = sqlite3.connect('monitor_data.db', check_same_thread=False)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME,
cpu_percent REAL,
memory_percent REAL,
disk_usage_percent REAL,
network_bytes_sent INTEGER,
network_bytes_recv INTEGER,
load_average_1m REAL,
load_average_5m REAL,
load_average_15m REAL
)
''')
conn.commit()
return conn
2、系统资源监控模块
python
class SystemMetricsCollector:
def __init__(self):
self.previous_net_io = psutil.net_io_counters()
self.previous_time = time.time()
def get_cpu_info(self):
获取CPU信息
return {
cpu_percent: psutil.cpu_percent(interval=1),
cpu_count: psutil.cpu_count(),
cpu_count_logical: psutil.cpu_count(logical=True),
cpu_freq: psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None
}
def get_memory_info(self):
获取内存信息
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
return {
total: memory.total,
available: memory.available,
used: memory.used,
free: memory.free,
percent: memory.percent,
swap_total: swap.total,
swap_used: swap.used,
swap_free: swap.free,
swap_percent: swap.percent
}
def get_disk_info(self):
获取磁盘信息
partitions = []
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
partitions.append({
device: partition.device,
mountpoint: partition.mountpoint,
file_system: partition.fstype,
total: usage.total,
used: usage.used,
free: usage.free,
percent: usage.percent
})
except PermissionError:
continue
return {partitions: partitions}
def get_network_info(self):
获取网络信息
current_net_io = psutil.net_io_counters()
current_time = time.time()
计算网络速度
time_delta = current_time self.previous_time
bytes_sent_per_sec = (current_net_io.bytes_sent self.previous_net_io.bytes_sent) / time_delta
bytes_recv_per_sec = (current_net_io.bytes_recv self.previous_net_io.bytes_recv) / time_delta
self.previous_net_io = current_net_io
self.previous_time = current_time
return {
bytes_sent: current_net_io.bytes_sent,
bytes_recv: current_net_io.bytes_recv,
packets_sent: current_net_io.packets_sent,
packets_recv: current_net_io.packets_recv,
bytes_sent_per_sec: bytes_sent_per_sec,
bytes_recv_per_sec: bytes_recv_per_sec
}
def get_load_average(self):
获取系统负载
try:
load_avg = psutil.getloadavg()
return {
1min: load_avg[0],
5min: load_avg[1],
15min: load_avg[2]
}
except AttributeError:
Windows不支持getloadavg
return {1min: 0, 5min: 0, 15min: 0}
def collect_all_metrics(self):
收集所有指标
return {
timestamp: datetime.now().isoformat(),
cpu: self.get_cpu_info(),
memory: self.get_memory_info(),
disk: self.get_disk_info(),
network: self.get_network_info(),
load_average: self.get_load_average()
}
3、数据存储和告警模块
python
class DataStorage:
def __init__(self, db_connection):
self.db = db_connection
def save_metrics(self, metrics):
保存监控数据到数据库
cursor = self.db.cursor()
cursor.execute('''
INSERT INTO system_metrics
(timestamp, cpu_percent, memory_percent, disk_usage_percent,
network_bytes_sent, network_bytes_recv,
load_average_1m, load_average_5m, load_average_15m)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
metrics['timestamp'],
metrics['cpu']['cpu_percent'],
metrics['memory']['percent'],
max([p['percent'] for p in metrics['disk']['partitions']] + [0]),
metrics['network']['bytes_sent'],
metrics['network']['bytes_recv'],
metrics['load_average']['1min'],
metrics['load_average']['5min'],
metrics['load_average']['15min']
))
self.db.commit()
def get_recent_metrics(self, hours=24):
获取最近的监控数据
cursor = self.db.cursor()
since = datetime.now().timestamp() (hours * 3600)
cursor.execute('''
SELECT * FROM system_metrics
WHERE timestamp datetime(?, 'unixepoch')
ORDER BY timestamp DESC
''', (since,))
return cursor.fetchall()
class AlertManager:
def __init__(self, config):
self.config = config
self.alert_history = {}
def check_thresholds(self, metrics):
检查阈值并触发告警
alerts = []
CPU使用率检查
cpu_percent = metrics['cpu']['cpu_percent']
if cpu_percent self.config['alert_thresholds']['cpu_usage']:
alerts.append({
type: CPU_USAGE_HIGH,
value: cpu_percent,
threshold: self.config['alert_thresholds']['cpu_usage'],
message: fCPU使用率过高: {cpu_percent}%
})
内存使用率检查
memory_percent = metrics['memory']['percent']
if memory_percent self.config['alert_thresholds']['memory_usage']:
alerts.append({
type: MEMORY_USAGE_HIGH,
value: memory_percent,
threshold: self.config['alert_thresholds']['memory_usage'],
message: f内存使用率过高: {memory_percent}%
})
磁盘使用率检查
disk_usage = max([p['percent'] for p in metrics['disk']['partitions']] + [0])
if disk_usage self.config['alert_thresholds']['disk_usage']:
alerts.append({
type: DISK_USAGE_HIGH,
value: disk_usage,
threshold: self.config['alert_thresholds']['disk_usage'],
message: f磁盘使用率过高: {disk_usage}%
})
if alerts:
self.send_alert(alerts, metrics)
def send_alert(self, alerts, metrics):
发送告警通知
alert_message = {
timestamp: metrics['timestamp'],
alerts: alerts,
system_info: {
hostname: psutil.Process().username(),
platform: psutil.LINUX if hasattr(psutil, 'LINUX') else 'unknown'
}
}
发送到Webhook
if self.config['notification']['webhook_url']:
try:
requests.post(
self.config['notification']['webhook_url'],
json=alert_message,
timeout=10
)
except Exception as e:
print(f发送告警失败: {e})
print(f触发告警: {json.dumps(alert_message, indent=2)})
4、Web界面展示
python
from flask import Flask, render_template, jsonify, request
import plotly.graph_objs as go
import plotly.utils
import json
app = Flask(__name__)
class WebInterface:
def __init__(self, monitor_instance):
self.monitor = monitor_instance
self.setup_routes
📣 商家广告:





0
IP地址: 36.168.223.102
搜索次数: 2
提问时间: 2025-11-05 20:53:39
🛒 域名购买:
4666.com.cn
te.biz
gy0.cn
z-j.net
6-6.com.cn
e-j.com.cn
9.bj.cn
838.net.cn
soukan.com.cn
chuisou.com.cn
❓️ 热门提问:
云主机价格单
电子邮件地址怎么填写和注册
可以挂机游戏的云平台
印尼服务器
免费国内服务器试用
免root虚拟空间
云空间虚拟主机
r7主机
vps搬瓦工
y域名解析ip
豌豆Ai站群搜索引擎系统
🌐 域名评估:
je.gs
ae.cq.cn
vivo.sc.cn
oo.gz.cn
oo.ah.cn
su8.hk.cn
h.tj.cn
p.hn.cn
ypettoy.com
mi.com
⛏ 最新挖掘:
凯里酸汤鱼
贵阳烤脑花
都匀毛尖茶
花溪牛肉粉
贵阳烤生蚝
贵阳烤豆腐
贵阳纸包鱼
贵阳特色菜
🖌 热门作画:
🤝 关于我们:
豌豆Ai
域名
建站
站群
留痕
推广
评估
源码
开发Ai
工具
日记
价格
加盟
广告
流量
留言
联系
🗨 加入群聊:
🔗 友情链接:
搜站点
贵州风淋室
ai提问
🧰 站长工具:
Ai工具 whois查询 搜索
📢 温馨提示:本站所有问答由Ai自动创作,内容仅供参考,若有误差请用“联系”里面信息通知我们人工修改或删除。
👉 技术支持:本站由豌豆Ai提供技术支持,使用的最新版:《豌豆Ai站群搜索引擎系统 V.25.10.25》搭建本站。