首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >运维老司机必备!5个Python脚本让你的工作效率翻倍

运维老司机必备!5个Python脚本让你的工作效率翻倍

作者头像
悠悠12138
发布2025-11-20 16:08:51
发布2025-11-20 16:08:51
100
举报

最近在整理自己的脚本库,发现这些年积累下来的Python脚本真是不少。想起刚入行那会儿,每天都在重复做一些机械性的工作,后来慢慢学会了用Python来解决这些问题,工作效率确实提升了不少。

今天就分享5个我平时用得最多的Python运维脚本,都是在Debian 13环境下测试过的,基本上覆盖了日常运维工作的方方面面。这些脚本不算复杂,但真的很实用,希望能帮到正在做运维的朋友们。

1. 系统资源监控脚本

这个脚本我几乎每天都在用,可以实时监控CPU、内存、磁盘使用情况。比那些复杂的监控系统轻量多了,而且可以根据自己的需求随时调整。

代码语言:javascript
复制
#!/usr/bin/env python3
import psutil
import time
import datetime

def get_system_info():
    # CPU使用率
    cpu_percent = psutil.cpu_percent(interval=1)
    
    # 内存信息
    memory = psutil.virtual_memory()
    memory_percent = memory.percent
    memory_used = round(memory.used / 1024 / 1024 / 1024, 2)
    memory_total = round(memory.total / 1024 / 1024 / 1024, 2)
    
    # 磁盘信息
    disk = psutil.disk_usage('/')
    disk_percent = round((disk.used / disk.total) * 100, 2)
    disk_used = round(disk.used / 1024 / 1024 / 1024, 2)
    disk_total = round(disk.total / 1024 / 1024 / 1024, 2)
    
    # 网络IO
    net_io = psutil.net_io_counters()
    
    return {
        'timestamp': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'cpu_percent': cpu_percent,
        'memory_percent': memory_percent,
        'memory_used': memory_used,
        'memory_total': memory_total,
        'disk_percent': disk_percent,
        'disk_used': disk_used,
        'disk_total': disk_total,
        'bytes_sent': round(net_io.bytes_sent / 1024 / 1024, 2),
        'bytes_recv': round(net_io.bytes_recv / 1024 / 1024, 2)
    }

def monitor_system():
    while True:
        info = get_system_info()
        print(f"[{info['timestamp']}]")
        print(f"CPU使用率: {info['cpu_percent']}%")
        print(f"内存使用: {info['memory_used']}GB/{info['memory_total']}GB ({info['memory_percent']}%)")
        print(f"磁盘使用: {info['disk_used']}GB/{info['disk_total']}GB ({info['disk_percent']}%)")
        print(f"网络IO: 发送 {info['bytes_sent']}MB, 接收 {info['bytes_recv']}MB")
        print("-" * 50)
        
        # 如果资源使用率过高,可以在这里添加告警逻辑
        if info['cpu_percent'] > 80 or info['memory_percent'] > 85:
            print("⚠️  警告:系统资源使用率过高!")
        
        time.sleep(10)

if __name__ == "__main__":
    monitor_system()

这个脚本的好处是可以持续监控,而且输出格式比较清晰。我一般会在服务器上后台运行,然后把输出重定向到日志文件里。

2. 批量服务器SSH连接检测

管理多台服务器的时候,经常需要检查哪些机器能正常SSH连接。手动一台台去ping太麻烦了,这个脚本可以批量检测。

代码语言:javascript
复制
#!/usr/bin/env python3
import paramiko
import threading
import queue
import time

class SSHChecker:
    def __init__(self, hosts_file, username, password=None, key_file=None):
        self.hosts = self.load_hosts(hosts_file)
        self.username = username
        self.password = password
        self.key_file = key_file
        self.results = queue.Queue()
        
    def load_hosts(self, hosts_file):
        try:
            with open(hosts_file, 'r') as f:
                return [line.strip() for line in f if line.strip()]
        except FileNotFoundError:
            print(f"主机列表文件 {hosts_file} 不存在")
            return []
    
    def check_ssh_connection(self, host, port=22):
        try:
            ssh = paramiko.SSHClient()
            ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            
            if self.key_file:
                ssh.connect(host, port=port, username=self.username, 
                           key_filename=self.key_file, timeout=10)
            else:
                ssh.connect(host, port=port, username=self.username, 
                           password=self.password, timeout=10)
            
            # 执行一个简单命令测试
            stdin, stdout, stderr = ssh.exec_command('uptime')
            uptime_info = stdout.read().decode().strip()
            
            ssh.close()
            self.results.put((host, True, uptime_info))
            
        except Exception as e:
            self.results.put((host, False, str(e)))
    
    def batch_check(self, max_threads=10):
        threads = []
        
        for host in self.hosts:
            if len(threads) >= max_threads:
                # 等待一些线程完成
                for t in threads:
                    if not t.is_alive():
                        threads.remove(t)
                time.sleep(0.1)
            
            thread = threading.Thread(target=self.check_ssh_connection, args=(host,))
            thread.start()
            threads.append(thread)
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        # 输出结果
        success_count = 0
        failed_hosts = []
        
        while not self.results.empty():
            host, status, info = self.results.get()
            if status:
                print(f"✅ {host}: 连接成功 - {info}")
                success_count += 1
            else:
                print(f"❌ {host}: 连接失败 - {info}")
                failed_hosts.append(host)
        
        print(f"\n总计: {len(self.hosts)} 台服务器")
        print(f"成功: {success_count} 台")
        print(f"失败: {len(failed_hosts)} 台")
        
        if failed_hosts:
            print(f"失败的主机: {', '.join(failed_hosts)}")

# 使用示例
if __name__ == "__main__":
    # 创建主机列表文件 hosts.txt
    hosts_content = """192.168.1.10
192.168.1.11
192.168.1.12
10.0.0.100"""
    
    with open('hosts.txt', 'w') as f:
        f.write(hosts_content)
    
    checker = SSHChecker('hosts.txt', 'root', password='your_password')
    # 或者使用密钥文件
    # checker = SSHChecker('hosts.txt', 'root', key_file='/root/.ssh/id_rsa')
    checker.batch_check()

这个脚本支持密码和密钥两种认证方式,而且是多线程的,检测速度比较快。我经常用它来检查新部署的服务器是否配置正确。

3. 日志文件分析和清理

日志文件管理是运维工作中很重要的一部分,这个脚本可以分析日志文件大小,并且自动清理过期的日志。

代码语言:javascript
复制
#!/usr/bin/env python3
import os
import glob
import gzip
import shutil
from datetime import datetime, timedelta

class LogManager:
    def __init__(self, log_dirs):
        self.log_dirs = log_dirs if isinstance(log_dirs, list) else [log_dirs]
        
    def get_log_files_info(self):
        log_info = []
        
        for log_dir in self.log_dirs:
            if not os.path.exists(log_dir):
                print(f"目录不存在: {log_dir}")
                continue
                
            # 查找所有日志文件
            patterns = ['*.log', '*.log.*', '*.out']
            
            for pattern in patterns:
                files = glob.glob(os.path.join(log_dir, pattern))
                
                for file_path in files:
                    try:
                        stat = os.stat(file_path)
                        size_mb = round(stat.st_size / 1024 / 1024, 2)
                        mtime = datetime.fromtimestamp(stat.st_mtime)
                        
                        log_info.append({
                            'path': file_path,
                            'size_mb': size_mb,
                            'modified_time': mtime,
                            'age_days': (datetime.now() - mtime).days
                        })
                    except OSError:
                        continue
        
        return sorted(log_info, key=lambda x: x['size_mb'], reverse=True)
    
    def analyze_logs(self):
        log_files = self.get_log_files_info()
        
        if not log_files:
            print("没有找到日志文件")
            return
        
        total_size = sum(f['size_mb'] for f in log_files)
        
        print(f"日志文件分析报告 ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})")
        print("=" * 80)
        print(f"总文件数: {len(log_files)}")
        print(f"总大小: {total_size:.2f} MB")
        print()
        
        print("前10个最大的日志文件:")
        print("-" * 80)
        for i, log_file in enumerate(log_files[:10], 1):
            print(f"{i:2d}. {log_file['path']}")
            print(f"    大小: {log_file['size_mb']} MB, "
                  f"修改时间: {log_file['modified_time'].strftime('%Y-%m-%d %H:%M')}, "
                  f"天数: {log_file['age_days']} 天")
        
        # 统计不同时间段的文件
        recent_files = [f for f in log_files if f['age_days'] <= 7]
        old_files = [f for f in log_files if f['age_days'] > 30]
        
        print(f"\n最近7天的文件: {len(recent_files)} 个, "
              f"大小: {sum(f['size_mb'] for f in recent_files):.2f} MB")
        print(f"超过30天的文件: {len(old_files)} 个, "
              f"大小: {sum(f['size_mb'] for f in old_files):.2f} MB")
    
    def compress_old_logs(self, days_threshold=7):
        log_files = self.get_log_files_info()
        compressed_count = 0
        saved_space = 0
        
        for log_file in log_files:
            if (log_file['age_days'] > days_threshold and 
                not log_file['path'].endswith('.gz') and
                log_file['size_mb'] > 1):  # 只压缩大于1MB的文件
                
                try:
                    original_size = log_file['size_mb']
                    
                    with open(log_file['path'], 'rb') as f_in:
                        with gzip.open(log_file['path'] + '.gz', 'wb') as f_out:
                            shutil.copyfileobj(f_in, f_out)
                    
                    # 检查压缩文件是否创建成功
                    if os.path.exists(log_file['path'] + '.gz'):
                        os.remove(log_file['path'])
                        compressed_size = os.path.getsize(log_file['path'] + '.gz') / 1024 / 1024
                        
                        print(f"压缩: {log_file['path']} "
                              f"({original_size:.2f}MB -> {compressed_size:.2f}MB)")
                        
                        compressed_count += 1
                        saved_space += (original_size - compressed_size)
                        
                except Exception as e:
                    print(f"压缩失败 {log_file['path']}: {e}")
        
        print(f"\n压缩完成: {compressed_count} 个文件, 节省空间: {saved_space:.2f} MB")
    
    def clean_old_logs(self, days_threshold=30, dry_run=True):
        log_files = self.get_log_files_info()
        old_files = [f for f in log_files if f['age_days'] > days_threshold]
        
        if not old_files:
            print(f"没有找到超过 {days_threshold} 天的日志文件")
            return
        
        total_size = sum(f['size_mb'] for f in old_files)
        
        print(f"找到 {len(old_files)} 个超过 {days_threshold} 天的日志文件")
        print(f"总大小: {total_size:.2f} MB")
        
        if dry_run:
            print("\n预览模式 - 以下文件将被删除:")
            for log_file in old_files:
                print(f"  {log_file['path']} ({log_file['size_mb']} MB, {log_file['age_days']} 天)")
            print(f"\n要实际删除这些文件,请设置 dry_run=False")
        else:
            deleted_count = 0
            for log_file in old_files:
                try:
                    os.remove(log_file['path'])
                    print(f"删除: {log_file['path']}")
                    deleted_count += 1
                except Exception as e:
                    print(f"删除失败 {log_file['path']}: {e}")
            
            print(f"\n删除完成: {deleted_count} 个文件, 释放空间: {total_size:.2f} MB")

# 使用示例
if __name__ == "__main__":
    # 指定要管理的日志目录
    log_dirs = ['/var/log', '/opt/app/logs', '/home/user/logs']
    
    manager = LogManager(log_dirs)
    
    # 分析日志文件
    manager.analyze_logs()
    
    print("\n" + "="*50)
    
    # 压缩7天前的日志
    manager.compress_old_logs(days_threshold=7)
    
    print("\n" + "="*50)
    
    # 清理30天前的日志(预览模式)
    manager.clean_old_logs(days_threshold=30, dry_run=True)

这个脚本功能比较全面,可以分析、压缩、清理日志文件。我一般会定期运行,特别是在磁盘空间不足的时候。

4. 进程监控和自动重启

有些服务偶尔会挂掉,这个脚本可以监控指定的进程,如果发现进程不存在就自动重启。

代码语言:javascript
复制
#!/usr/bin/env python3
import psutil
import subprocess
import time
import logging
from datetime import datetime

class ProcessMonitor:
    def __init__(self, config_file=None):
        self.processes = {}
        self.setup_logging()
        
        if config_file:
            self.load_config(config_file)
    
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/process_monitor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def add_process(self, name, command, check_interval=30):
        """添加要监控的进程"""
        self.processes[name] = {
            'command': command,
            'check_interval': check_interval,
            'last_check': 0,
            'restart_count': 0,
            'last_restart': None
        }
        self.logger.info(f"添加监控进程: {name}")
    
    def is_process_running(self, process_name):
        """检查进程是否在运行"""
        for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
            try:
                # 检查进程名或命令行中是否包含关键字
                if (process_name.lower() in proc.info['name'].lower() or
                    any(process_name.lower() in cmd.lower() 
                        for cmd in proc.info['cmdline'] if cmd)):
                    return True, proc.info['pid']
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue
        return False, None
    
    def start_process(self, command):
        """启动进程"""
        try:
            # 使用shell执行命令
            process = subprocess.Popen(
                command, 
                shell=True, 
                stdout=subprocess.PIPE, 
                stderr=subprocess.PIPE
            )
            return True, process.pid
        except Exception as e:
            return False, str(e)
    
    def restart_process(self, name, process_info):
        """重启进程"""
        self.logger.warning(f"进程 {name} 未运行,尝试重启...")
        
        success, result = self.start_process(process_info['command'])
        
        if success:
            process_info['restart_count'] += 1
            process_info['last_restart'] = datetime.now()
            self.logger.info(f"进程 {name} 重启成功,PID: {result}")
            return True
        else:
            self.logger.error(f"进程 {name} 重启失败: {result}")
            return False
    
    def check_process(self, name, process_info):
        """检查单个进程"""
        current_time = time.time()
        
        # 检查是否到了检查时间
        if current_time - process_info['last_check'] < process_info['check_interval']:
            return
        
        process_info['last_check'] = current_time
        
        is_running, pid = self.is_process_running(name)
        
        if is_running:
            self.logger.debug(f"进程 {name} 正在运行,PID: {pid}")
        else:
            # 进程未运行,尝试重启
            self.restart_process(name, process_info)
    
    def get_status_report(self):
        """获取监控状态报告"""
        report = []
        report.append(f"进程监控状态报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("=" * 60)
        
        for name, info in self.processes.items():
            is_running, pid = self.is_process_running(name)
            status = "运行中" if is_running else "未运行"
            
            report.append(f"进程名: {name}")
            report.append(f"  状态: {status}")
            if is_running:
                report.append(f"  PID: {pid}")
            report.append(f"  重启次数: {info['restart_count']}")
            if info['last_restart']:
                  report.append(f"  最后重启: {info['last_restart'].strftime('%Y-%m-%d %H:%M:%S')}")
                report.append(f"  检查间隔: {info['check_interval']}秒")
                report.append("")
        return "\n".join(report)
    
    def monitor_loop(self):
        """主监控循环"""
        self.logger.info("进程监控器启动")
        
        try:
            while True:
                for name, process_info in self.processes.items():
                    self.check_process(name, process_info)
                
                time.sleep(5)  # 主循环间隔
                
        except KeyboardInterrupt:
            self.logger.info("收到停止信号,退出监控")
        except Exception as e:
            self.logger.error(f"监控循环出错: {e}")

# 使用示例
if __name__ == "__main__":
    monitor = ProcessMonitor()
    
    # 添加要监控的进程
    monitor.add_process("nginx", "systemctl start nginx", check_interval=60)
    monitor.add_process("mysql", "systemctl start mysql", check_interval=30)
    monitor.add_process("redis", "systemctl start redis-server", check_interval=45)
    
    # 也可以监控自定义应用
    monitor.add_process("myapp", "/opt/myapp/start.sh", check_interval=30)
    
    # 打印初始状态
    print(monitor.get_status_report())
    
    # 开始监控
    monitor.monitor_loop()

这个脚本我在生产环境用过,效果还不错。不过要注意权限问题,有些系统服务需要root权限才能重启。

5. 网络连通性批量检测

网络问题是运维中经常遇到的,这个脚本可以批量检测多个主机的网络连通性,包括ping和端口检测。

代码语言:javascript
复制
#!/usr/bin/env python3
import subprocess
import socket
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

class NetworkChecker:
    def __init__(self):
        self.results = {}
        
    def ping_host(self, host, count=3, timeout=5):
        """Ping检测"""
        try:
            cmd = f"ping -c {count} -W {timeout} {host}"
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            
            if result.returncode == 0:
                # 解析ping结果
                lines = result.stdout.split('\n')
                for line in lines:
                    if 'packet loss' in line:
                        loss_info = line.strip()
                    elif 'min/avg/max' in line:
                        time_info = line.strip()
                
                return {
                    'status': 'success',
                    'packet_loss': loss_info if 'loss_info' in locals() else 'unknown',
                    'timing': time_info if 'time_info' in locals() else 'unknown'
                }
            else:
                return {
                    'status': 'failed',
                    'error': result.stderr.strip() or 'ping failed'
                }
                
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def check_port(self, host, port, timeout=10):
        """端口连通性检测"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            
            start_time = time.time()
            result = sock.connect_ex((host, port))
            end_time = time.time()
            
            sock.close()
            
            if result == 0:
                return {
                    'status': 'open',
                    'response_time': round((end_time - start_time) * 1000, 2)
                }
            else:
                return {
                    'status': 'closed',
                    'error': f'Connection failed (code: {result})'
                }
                
        except socket.gaierror as e:
            return {
                'status': 'error',
                'error': f'DNS resolution failed: {e}'
            }
        except Exception as e:
            return {
                'status': 'error',
                'error': str(e)
            }
    
    def comprehensive_check(self, host, ports=None):
        """综合检测单个主机"""
        if ports is None:
            ports = [22, 80, 443, 3306, 6379]  # 常用端口
        
        result = {
            'host': host,
            'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
            'ping': self.ping_host(host),
            'ports': {}
        }
        
        # 只有ping通了才检测端口
        if result['ping']['status'] == 'success':
            for port in ports:
                result['ports'][port] = self.check_port(host, port)
        
        return result
    
    def batch_check(self, hosts, ports=None, max_workers=20):
        """批量检测多个主机"""
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            # 提交所有任务
            future_to_host = {
                executor.submit(self.comprehensive_check, host, ports): host 
                for host in hosts
            }
            
            # 收集结果
            for future in as_completed(future_to_host):
                host = future_to_host[future]
                try:
                    result = future.result()
                    results.append(result)
                    print(f"✅ 完成检测: {host}")
                except Exception as e:
                    print(f"❌ 检测失败: {host} - {e}")
                    results.append({
                        'host': host,
                        'error': str(e),
                        'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
                    })
        
        return results
    
    def generate_report(self, results):
        """生成检测报告"""
        print("\n" + "="*80)
        print("网络连通性检测报告")
        print("="*80)
        
        total_hosts = len(results)
        ping_success = 0
        
        for result in results:
            print(f"\n主机: {result['host']}")
            print(f"检测时间: {result['timestamp']}")
            
            if 'error' in result:
                print(f"❌ 检测出错: {result['error']}")
                continue
            
            # Ping结果
            ping_result = result['ping']
            if ping_result['status'] == 'success':
                print(f"✅ Ping: 成功")
                if 'packet_loss' in ping_result:
                    print(f"   丢包率: {ping_result['packet_loss']}")
                if 'timing' in ping_result:
                    print(f"   延迟: {ping_result['timing']}")
                ping_success += 1
            else:
                print(f"❌ Ping: 失败 - {ping_result.get('error', 'unknown error')}")
                continue
            
            # 端口检测结果
            if 'ports' in result and result['ports']:
                print("   端口检测:")
                for port, port_result in result['ports'].items():
                    if port_result['status'] == 'open':
                        response_time = port_result.get('response_time', 'unknown')
                        print(f"     {port}: ✅ 开放 ({response_time}ms)")
                    elif port_result['status'] == 'closed':
                        print(f"     {port}: ❌ 关闭")
                    else:
                        print(f"     {port}: ⚠️  错误 - {port_result.get('error', 'unknown')}")
        
        print(f"\n总结:")
        print(f"总主机数: {total_hosts}")
        print(f"Ping成功: {ping_success}")
        print(f"Ping失败: {total_hosts - ping_success}")
        print(f"成功率: {(ping_success/total_hosts*100):.1f}%")

# 使用示例
if __name__ == "__main__":
    checker = NetworkChecker()
    
    # 定义要检测的主机列表
    hosts = [
        '8.8.8.8',
        '114.114.114.114',
        'www.baidu.com',
        'www.google.com',
        '192.168.1.1',
        '10.0.0.1'
    ]
    
    # 定义要检测的端口
    ports = [22, 80, 443, 53, 8080]
    
    print("开始网络连通性检测...")
    results = checker.batch_check(hosts, ports)
    
    # 生成报告
    checker.generate_report(results)

这个脚本在排查网络问题的时候特别有用,可以快速定位是哪台机器或者哪个端口有问题。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-11-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 运维躬行录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 系统资源监控脚本
  • 2. 批量服务器SSH连接检测
  • 3. 日志文件分析和清理
  • 4. 进程监控和自动重启
  • 5. 网络连通性批量检测
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档