首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >微信群自动发消息机器人,自动回复群发消息脚本,微信群机器人软件【python】

微信群自动发消息机器人,自动回复群发消息脚本,微信群机器人软件【python】

原创
作者头像
用户11749621
发布2025-07-25 10:47:43
发布2025-07-25 10:47:43
5680
举报

下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1892

代码说明:这个微信机器人实现了定时群发消息、关键词自动回复、管理员控制等功能。主程序使用wxpy库处理微信消息,通过定时器实现周期性任务。使用时需要先安装依赖库,扫码登录微信账号。

代码语言:txt
复制

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import itchat
import time
import re
from threading import Timer
from wxpy import *
import logging
from datetime import datetime

# 配置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('WeChatBot')

class WeChatBot:
    def __init__(self):
        self.bot = None
        self.scheduled_msgs = []
        self.auto_reply_rules = []
        self.admin_users = []  # 管理员列表
        self.blacklist = []    # 黑名单
        
    def login(self):
        """登录微信"""
        logger.info("正在登录微信...")
        self.bot = Bot(
            cache_path=True,
            console_qr=True,
            qr_path='qr.png',
            login_callback=self._login_success,
            logout_callback=self._logout_callback
        )
        self.bot.enable_puid()
        logger.info("微信登录成功")
        
    def _login_success(self):
        logger.info("登录成功回调")
        self._register_handlers()
        
    def _logout_callback(self):
        logger.info("微信已登出")
        
    def _register_handlers(self):
        """注册消息处理器"""
        @self.bot.register(Group, TEXT)
        def group_msg_handler(msg):
            self._handle_group_message(msg)
            
        @self.bot.register(Friend, TEXT)
        def friend_msg_handler(msg):
            self._handle_friend_message(msg)
            
        @self.bot.register(msg_types=NOTE)
        def system_notice_handler(msg):
            self._handle_system_notice(msg)
            
    def _handle_group_message(self, msg):
        """处理群消息"""
        if msg.sender.puid in self.blacklist:
            return
            
        logger.info(f"收到群消息: {msg.text} 来自: {msg.sender.name}")
        
        # 自动回复逻辑
        for rule in self.auto_reply_rules:
            if re.search(rule['pattern'], msg.text, re.IGNORECASE):
                reply = rule['reply']
                if callable(reply):
                    reply = reply(msg)
                msg.reply(reply)
                break
                
    def _handle_friend_message(self, msg):
        """处理好友消息"""
        logger.info(f"收到好友消息: {msg.text} 来自: {msg.sender.name}")
        
    def _handle_system_notice(self, msg):
        """处理系统通知"""
        logger.info(f"系统通知: {msg.text}")
        
    def add_scheduled_msg(self, group_name, content, interval=3600):
        """添加定时消息"""
        job = {
            'group_name': group_name,
            'content': content,
            'interval': interval,
            'last_send': 0
        }
        self.scheduled_msgs.append(job)
        logger.info(f"添加定时消息: {group_name} - 每{interval}秒发送一次")
        
    def add_auto_reply_rule(self, pattern, reply):
        """添加自动回复规则"""
        self.auto_reply_rules.append({
            'pattern': pattern,
            'reply': reply
        })
        logger.info(f"添加自动回复规则: {pattern} -> {reply}")
        
    def add_admin(self, puid):
        """添加管理员"""
        self.admin_users.append(puid)
        logger.info(f"添加管理员: {puid}")
        
    def add_to_blacklist(self, puid):
        """添加到黑名单"""
        self.blacklist.append(puid)
        logger.info(f"添加到黑名单: {puid}")
        
    def start_scheduler(self):
        """启动定时任务"""
        logger.info("启动定时任务调度器")
        self._schedule_next()
        
    def _schedule_next(self):
        """调度下一个任务"""
        now = time.time()
        for job in self.scheduled_msgs:
            if now - job['last_send'] >= job['interval']:
                try:
                    group = self._find_group_by_name(job['group_name'])
                    if group:
                        group.send(job['content'])
                        job['last_send'] = now
                        logger.info(f"发送定时消息到 {job['group_name']}")
                except Exception as e:
                    logger.error(f"发送定时消息失败: {e}")
                    
        Timer(1, self._schedule_next).start()
        
    def _find_group_by_name(self, name):
        """通过群名查找群"""
        groups = self.bot.groups().search(name)
        return groups[0] if groups else None
        
    def run(self):
        """运行机器人"""
        try:
            self.login()
            self.start_scheduler()
            self.bot.join()
        except Exception as e:
            logger.error(f"机器人运行异常: {e}")
            raise
            
if __name__ == '__main__':
    bot = WeChatBot()
    
    # 示例配置
    bot.add_scheduled_msg("测试群", "这是自动发送的测试消息", 1800)
    bot.add_auto_reply_rule(r'你好|hello', '你好呀,我是机器人~')
    bot.add_auto_reply_rule(r'时间|几点', lambda msg: f"现在是: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    bot.run()
代码语言:txt
复制
    "admin_users": ["user1_puid", "user2_puid"],
    "blacklist": ["spammer_puid"],
    "scheduled_messages": [
        {
            "group_name": "测试群",
            "content": "每日提醒:记得喝水哦~",
            "interval": 3600
        }
    ],
    "auto_reply_rules": [
        {
            "pattern": "你好|hello",
            "reply": "你好,我是机器人助手"
        },
        {
            "pattern": "时间|几点",
            "reply": "function" 
        }
    ]
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档