
最近在整理自己的脚本库,发现这些年积累下来的Python脚本真是不少。想起刚入行那会儿,每天都在重复做一些机械性的工作,后来慢慢学会了用Python来解决这些问题,工作效率确实提升了不少。
今天就分享5个我平时用得最多的Python运维脚本,都是在Debian 13环境下测试过的,基本上覆盖了日常运维工作的方方面面。这些脚本不算复杂,但真的很实用,希望能帮到正在做运维的朋友们。
这个脚本我几乎每天都在用,可以实时监控CPU、内存、磁盘使用情况。比那些复杂的监控系统轻量多了,而且可以根据自己的需求随时调整。
#!/usr/bin/env python3
import psutil
import time
import datetime
def get_system_info():
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存信息
memory = psutil.virtual_memory()
memory_percent = memory.percent
memory_used = round(memory.used / 1024 / 1024 / 1024, 2)
memory_total = round(memory.total / 1024 / 1024 / 1024, 2)
# 磁盘信息
disk = psutil.disk_usage('/')
disk_percent = round((disk.used / disk.total) * 100, 2)
disk_used = round(disk.used / 1024 / 1024 / 1024, 2)
disk_total = round(disk.total / 1024 / 1024 / 1024, 2)
# 网络IO
net_io = psutil.net_io_counters()
return {
'timestamp': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'cpu_percent': cpu_percent,
'memory_percent': memory_percent,
'memory_used': memory_used,
'memory_total': memory_total,
'disk_percent': disk_percent,
'disk_used': disk_used,
'disk_total': disk_total,
'bytes_sent': round(net_io.bytes_sent / 1024 / 1024, 2),
'bytes_recv': round(net_io.bytes_recv / 1024 / 1024, 2)
}
def monitor_system():
while True:
info = get_system_info()
print(f"[{info['timestamp']}]")
print(f"CPU使用率: {info['cpu_percent']}%")
print(f"内存使用: {info['memory_used']}GB/{info['memory_total']}GB ({info['memory_percent']}%)")
print(f"磁盘使用: {info['disk_used']}GB/{info['disk_total']}GB ({info['disk_percent']}%)")
print(f"网络IO: 发送 {info['bytes_sent']}MB, 接收 {info['bytes_recv']}MB")
print("-" * 50)
# 如果资源使用率过高,可以在这里添加告警逻辑
if info['cpu_percent'] > 80 or info['memory_percent'] > 85:
print("⚠️ 警告:系统资源使用率过高!")
time.sleep(10)
if __name__ == "__main__":
monitor_system()这个脚本的好处是可以持续监控,而且输出格式比较清晰。我一般会在服务器上后台运行,然后把输出重定向到日志文件里。
管理多台服务器的时候,经常需要检查哪些机器能正常SSH连接。手动一台台去ping太麻烦了,这个脚本可以批量检测。
#!/usr/bin/env python3
import paramiko
import threading
import queue
import time
class SSHChecker:
def __init__(self, hosts_file, username, password=None, key_file=None):
self.hosts = self.load_hosts(hosts_file)
self.username = username
self.password = password
self.key_file = key_file
self.results = queue.Queue()
def load_hosts(self, hosts_file):
try:
with open(hosts_file, 'r') as f:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"主机列表文件 {hosts_file} 不存在")
return []
def check_ssh_connection(self, host, port=22):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.key_file:
ssh.connect(host, port=port, username=self.username,
key_filename=self.key_file, timeout=10)
else:
ssh.connect(host, port=port, username=self.username,
password=self.password, timeout=10)
# 执行一个简单命令测试
stdin, stdout, stderr = ssh.exec_command('uptime')
uptime_info = stdout.read().decode().strip()
ssh.close()
self.results.put((host, True, uptime_info))
except Exception as e:
self.results.put((host, False, str(e)))
def batch_check(self, max_threads=10):
threads = []
for host in self.hosts:
if len(threads) >= max_threads:
# 等待一些线程完成
for t in threads:
if not t.is_alive():
threads.remove(t)
time.sleep(0.1)
thread = threading.Thread(target=self.check_ssh_connection, args=(host,))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 输出结果
success_count = 0
failed_hosts = []
while not self.results.empty():
host, status, info = self.results.get()
if status:
print(f"✅ {host}: 连接成功 - {info}")
success_count += 1
else:
print(f"❌ {host}: 连接失败 - {info}")
failed_hosts.append(host)
print(f"\n总计: {len(self.hosts)} 台服务器")
print(f"成功: {success_count} 台")
print(f"失败: {len(failed_hosts)} 台")
if failed_hosts:
print(f"失败的主机: {', '.join(failed_hosts)}")
# 使用示例
if __name__ == "__main__":
# 创建主机列表文件 hosts.txt
hosts_content = """192.168.1.10
192.168.1.11
192.168.1.12
10.0.0.100"""
with open('hosts.txt', 'w') as f:
f.write(hosts_content)
checker = SSHChecker('hosts.txt', 'root', password='your_password')
# 或者使用密钥文件
# checker = SSHChecker('hosts.txt', 'root', key_file='/root/.ssh/id_rsa')
checker.batch_check()这个脚本支持密码和密钥两种认证方式,而且是多线程的,检测速度比较快。我经常用它来检查新部署的服务器是否配置正确。
日志文件管理是运维工作中很重要的一部分,这个脚本可以分析日志文件大小,并且自动清理过期的日志。
#!/usr/bin/env python3
import os
import glob
import gzip
import shutil
from datetime import datetime, timedelta
class LogManager:
def __init__(self, log_dirs):
self.log_dirs = log_dirs if isinstance(log_dirs, list) else [log_dirs]
def get_log_files_info(self):
log_info = []
for log_dir in self.log_dirs:
if not os.path.exists(log_dir):
print(f"目录不存在: {log_dir}")
continue
# 查找所有日志文件
patterns = ['*.log', '*.log.*', '*.out']
for pattern in patterns:
files = glob.glob(os.path.join(log_dir, pattern))
for file_path in files:
try:
stat = os.stat(file_path)
size_mb = round(stat.st_size / 1024 / 1024, 2)
mtime = datetime.fromtimestamp(stat.st_mtime)
log_info.append({
'path': file_path,
'size_mb': size_mb,
'modified_time': mtime,
'age_days': (datetime.now() - mtime).days
})
except OSError:
continue
return sorted(log_info, key=lambda x: x['size_mb'], reverse=True)
def analyze_logs(self):
log_files = self.get_log_files_info()
if not log_files:
print("没有找到日志文件")
return
total_size = sum(f['size_mb'] for f in log_files)
print(f"日志文件分析报告 ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})")
print("=" * 80)
print(f"总文件数: {len(log_files)}")
print(f"总大小: {total_size:.2f} MB")
print()
print("前10个最大的日志文件:")
print("-" * 80)
for i, log_file in enumerate(log_files[:10], 1):
print(f"{i:2d}. {log_file['path']}")
print(f" 大小: {log_file['size_mb']} MB, "
f"修改时间: {log_file['modified_time'].strftime('%Y-%m-%d %H:%M')}, "
f"天数: {log_file['age_days']} 天")
# 统计不同时间段的文件
recent_files = [f for f in log_files if f['age_days'] <= 7]
old_files = [f for f in log_files if f['age_days'] > 30]
print(f"\n最近7天的文件: {len(recent_files)} 个, "
f"大小: {sum(f['size_mb'] for f in recent_files):.2f} MB")
print(f"超过30天的文件: {len(old_files)} 个, "
f"大小: {sum(f['size_mb'] for f in old_files):.2f} MB")
def compress_old_logs(self, days_threshold=7):
log_files = self.get_log_files_info()
compressed_count = 0
saved_space = 0
for log_file in log_files:
if (log_file['age_days'] > days_threshold and
not log_file['path'].endswith('.gz') and
log_file['size_mb'] > 1): # 只压缩大于1MB的文件
try:
original_size = log_file['size_mb']
with open(log_file['path'], 'rb') as f_in:
with gzip.open(log_file['path'] + '.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# 检查压缩文件是否创建成功
if os.path.exists(log_file['path'] + '.gz'):
os.remove(log_file['path'])
compressed_size = os.path.getsize(log_file['path'] + '.gz') / 1024 / 1024
print(f"压缩: {log_file['path']} "
f"({original_size:.2f}MB -> {compressed_size:.2f}MB)")
compressed_count += 1
saved_space += (original_size - compressed_size)
except Exception as e:
print(f"压缩失败 {log_file['path']}: {e}")
print(f"\n压缩完成: {compressed_count} 个文件, 节省空间: {saved_space:.2f} MB")
def clean_old_logs(self, days_threshold=30, dry_run=True):
log_files = self.get_log_files_info()
old_files = [f for f in log_files if f['age_days'] > days_threshold]
if not old_files:
print(f"没有找到超过 {days_threshold} 天的日志文件")
return
total_size = sum(f['size_mb'] for f in old_files)
print(f"找到 {len(old_files)} 个超过 {days_threshold} 天的日志文件")
print(f"总大小: {total_size:.2f} MB")
if dry_run:
print("\n预览模式 - 以下文件将被删除:")
for log_file in old_files:
print(f" {log_file['path']} ({log_file['size_mb']} MB, {log_file['age_days']} 天)")
print(f"\n要实际删除这些文件,请设置 dry_run=False")
else:
deleted_count = 0
for log_file in old_files:
try:
os.remove(log_file['path'])
print(f"删除: {log_file['path']}")
deleted_count += 1
except Exception as e:
print(f"删除失败 {log_file['path']}: {e}")
print(f"\n删除完成: {deleted_count} 个文件, 释放空间: {total_size:.2f} MB")
# 使用示例
if __name__ == "__main__":
# 指定要管理的日志目录
log_dirs = ['/var/log', '/opt/app/logs', '/home/user/logs']
manager = LogManager(log_dirs)
# 分析日志文件
manager.analyze_logs()
print("\n" + "="*50)
# 压缩7天前的日志
manager.compress_old_logs(days_threshold=7)
print("\n" + "="*50)
# 清理30天前的日志(预览模式)
manager.clean_old_logs(days_threshold=30, dry_run=True)这个脚本功能比较全面,可以分析、压缩、清理日志文件。我一般会定期运行,特别是在磁盘空间不足的时候。
有些服务偶尔会挂掉,这个脚本可以监控指定的进程,如果发现进程不存在就自动重启。
#!/usr/bin/env python3
import psutil
import subprocess
import time
import logging
from datetime import datetime
class ProcessMonitor:
def __init__(self, config_file=None):
self.processes = {}
self.setup_logging()
if config_file:
self.load_config(config_file)
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/process_monitor.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def add_process(self, name, command, check_interval=30):
"""添加要监控的进程"""
self.processes[name] = {
'command': command,
'check_interval': check_interval,
'last_check': 0,
'restart_count': 0,
'last_restart': None
}
self.logger.info(f"添加监控进程: {name}")
def is_process_running(self, process_name):
"""检查进程是否在运行"""
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
# 检查进程名或命令行中是否包含关键字
if (process_name.lower() in proc.info['name'].lower() or
any(process_name.lower() in cmd.lower()
for cmd in proc.info['cmdline'] if cmd)):
return True, proc.info['pid']
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return False, None
def start_process(self, command):
"""启动进程"""
try:
# 使用shell执行命令
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return True, process.pid
except Exception as e:
return False, str(e)
def restart_process(self, name, process_info):
"""重启进程"""
self.logger.warning(f"进程 {name} 未运行,尝试重启...")
success, result = self.start_process(process_info['command'])
if success:
process_info['restart_count'] += 1
process_info['last_restart'] = datetime.now()
self.logger.info(f"进程 {name} 重启成功,PID: {result}")
return True
else:
self.logger.error(f"进程 {name} 重启失败: {result}")
return False
def check_process(self, name, process_info):
"""检查单个进程"""
current_time = time.time()
# 检查是否到了检查时间
if current_time - process_info['last_check'] < process_info['check_interval']:
return
process_info['last_check'] = current_time
is_running, pid = self.is_process_running(name)
if is_running:
self.logger.debug(f"进程 {name} 正在运行,PID: {pid}")
else:
# 进程未运行,尝试重启
self.restart_process(name, process_info)
def get_status_report(self):
"""获取监控状态报告"""
report = []
report.append(f"进程监控状态报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append("=" * 60)
for name, info in self.processes.items():
is_running, pid = self.is_process_running(name)
status = "运行中" if is_running else "未运行"
report.append(f"进程名: {name}")
report.append(f" 状态: {status}")
if is_running:
report.append(f" PID: {pid}")
report.append(f" 重启次数: {info['restart_count']}")
if info['last_restart']:
report.append(f" 最后重启: {info['last_restart'].strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f" 检查间隔: {info['check_interval']}秒")
report.append("")
return "\n".join(report)
def monitor_loop(self):
"""主监控循环"""
self.logger.info("进程监控器启动")
try:
while True:
for name, process_info in self.processes.items():
self.check_process(name, process_info)
time.sleep(5) # 主循环间隔
except KeyboardInterrupt:
self.logger.info("收到停止信号,退出监控")
except Exception as e:
self.logger.error(f"监控循环出错: {e}")
# 使用示例
if __name__ == "__main__":
monitor = ProcessMonitor()
# 添加要监控的进程
monitor.add_process("nginx", "systemctl start nginx", check_interval=60)
monitor.add_process("mysql", "systemctl start mysql", check_interval=30)
monitor.add_process("redis", "systemctl start redis-server", check_interval=45)
# 也可以监控自定义应用
monitor.add_process("myapp", "/opt/myapp/start.sh", check_interval=30)
# 打印初始状态
print(monitor.get_status_report())
# 开始监控
monitor.monitor_loop()这个脚本我在生产环境用过,效果还不错。不过要注意权限问题,有些系统服务需要root权限才能重启。
网络问题是运维中经常遇到的,这个脚本可以批量检测多个主机的网络连通性,包括ping和端口检测。
#!/usr/bin/env python3
import subprocess
import socket
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
class NetworkChecker:
def __init__(self):
self.results = {}
def ping_host(self, host, count=3, timeout=5):
"""Ping检测"""
try:
cmd = f"ping -c {count} -W {timeout} {host}"
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
# 解析ping结果
lines = result.stdout.split('\n')
for line in lines:
if 'packet loss' in line:
loss_info = line.strip()
elif 'min/avg/max' in line:
time_info = line.strip()
return {
'status': 'success',
'packet_loss': loss_info if 'loss_info' in locals() else 'unknown',
'timing': time_info if 'time_info' in locals() else 'unknown'
}
else:
return {
'status': 'failed',
'error': result.stderr.strip() or 'ping failed'
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def check_port(self, host, port, timeout=10):
"""端口连通性检测"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
start_time = time.time()
result = sock.connect_ex((host, port))
end_time = time.time()
sock.close()
if result == 0:
return {
'status': 'open',
'response_time': round((end_time - start_time) * 1000, 2)
}
else:
return {
'status': 'closed',
'error': f'Connection failed (code: {result})'
}
except socket.gaierror as e:
return {
'status': 'error',
'error': f'DNS resolution failed: {e}'
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def comprehensive_check(self, host, ports=None):
"""综合检测单个主机"""
if ports is None:
ports = [22, 80, 443, 3306, 6379] # 常用端口
result = {
'host': host,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'ping': self.ping_host(host),
'ports': {}
}
# 只有ping通了才检测端口
if result['ping']['status'] == 'success':
for port in ports:
result['ports'][port] = self.check_port(host, port)
return result
def batch_check(self, hosts, ports=None, max_workers=20):
"""批量检测多个主机"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_host = {
executor.submit(self.comprehensive_check, host, ports): host
for host in hosts
}
# 收集结果
for future in as_completed(future_to_host):
host = future_to_host[future]
try:
result = future.result()
results.append(result)
print(f"✅ 完成检测: {host}")
except Exception as e:
print(f"❌ 检测失败: {host} - {e}")
results.append({
'host': host,
'error': str(e),
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
})
return results
def generate_report(self, results):
"""生成检测报告"""
print("\n" + "="*80)
print("网络连通性检测报告")
print("="*80)
total_hosts = len(results)
ping_success = 0
for result in results:
print(f"\n主机: {result['host']}")
print(f"检测时间: {result['timestamp']}")
if 'error' in result:
print(f"❌ 检测出错: {result['error']}")
continue
# Ping结果
ping_result = result['ping']
if ping_result['status'] == 'success':
print(f"✅ Ping: 成功")
if 'packet_loss' in ping_result:
print(f" 丢包率: {ping_result['packet_loss']}")
if 'timing' in ping_result:
print(f" 延迟: {ping_result['timing']}")
ping_success += 1
else:
print(f"❌ Ping: 失败 - {ping_result.get('error', 'unknown error')}")
continue
# 端口检测结果
if 'ports' in result and result['ports']:
print(" 端口检测:")
for port, port_result in result['ports'].items():
if port_result['status'] == 'open':
response_time = port_result.get('response_time', 'unknown')
print(f" {port}: ✅ 开放 ({response_time}ms)")
elif port_result['status'] == 'closed':
print(f" {port}: ❌ 关闭")
else:
print(f" {port}: ⚠️ 错误 - {port_result.get('error', 'unknown')}")
print(f"\n总结:")
print(f"总主机数: {total_hosts}")
print(f"Ping成功: {ping_success}")
print(f"Ping失败: {total_hosts - ping_success}")
print(f"成功率: {(ping_success/total_hosts*100):.1f}%")
# 使用示例
if __name__ == "__main__":
checker = NetworkChecker()
# 定义要检测的主机列表
hosts = [
'8.8.8.8',
'114.114.114.114',
'www.baidu.com',
'www.google.com',
'192.168.1.1',
'10.0.0.1'
]
# 定义要检测的端口
ports = [22, 80, 443, 53, 8080]
print("开始网络连通性检测...")
results = checker.batch_check(hosts, ports)
# 生成报告
checker.generate_report(results)这个脚本在排查网络问题的时候特别有用,可以快速定位是哪台机器或者哪个端口有问题。