下载地址:https://www.pan38.com/dow/share.php?code=JCnzE 提取密码:1135
这个58同城自动发布工具包含三个主要模块:主程序、验证码识别和任务管理。主程序使用Selenium实现自动化操作,验证码识别模块使用OpenCV和Tesseract处理验证码,任务管理模块负责管理发布任务队列。使用时需要先配置config.ini文件和tasks.json任务列表。
import requests
from bs4 import BeautifulSoup
import time
import random
from PIL import Image
import pytesseract
import os
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import configparser
import json
import threading
import queue
class FiveEightAutoPoster:
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read('config.ini')
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.login_url = 'https://passport.58.com/login'
self.post_url = 'https://post.58.com/'
self.driver = None
self.task_queue = queue.Queue()
self.logged_in = False
def init_driver(self):
options = webdriver.ChromeOptions()
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument('--disable-infobars')
options.add_argument('--disable-notifications')
self.driver = webdriver.Chrome(options=options)
def login(self):
self.init_driver()
self.driver.get(self.login_url)
try:
username = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, 'username'))
)
password = self.driver.find_element(By.ID, 'password')
submit = self.driver.find_element(By.ID, 'submitBtn')
username.send_keys(self.config['ACCOUNT']['username'])
password.send_keys(self.config['ACCOUNT']['password'])
submit.click()
WebDriverWait(self.driver, 15).until(
EC.url_contains('my.58.com')
)
self.logged_in = True
print("登录成功")
cookies = self.driver.get_cookies()
for cookie in cookies:
self.session.cookies.set(cookie['name'], cookie['value'])
except TimeoutException:
print("登录超时")
return False
return True
def handle_captcha(self):
try:
captcha_img = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, 'captcha-img'))
)
captcha_img.screenshot('captcha.png')
image = Image.open('captcha.png')
captcha_text = pytesseract.image_to_string(image).strip()
captcha_input = self.driver.find_element(By.ID, 'captcha')
captcha_input.send_keys(captcha_text)
return True
except Exception as e:
print(f"验证码处理失败: {str(e)}")
return False
def load_tasks(self):
with open('tasks.json', 'r', encoding='utf-8') as f:
tasks = json.load(f)
for task in tasks:
self.task_queue.put(task)
def post_item(self, item_data):
if not self.logged_in:
if not self.login():
return False
try:
self.driver.get(self.post_url)
category_select = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//select[@name="category"]'))
)
# 选择分类逻辑...
title_input = self.driver.find_element(By.NAME, 'title')
title_input.send_keys(item_data['title'])
# 其他字段填写...
submit_btn = self.driver.find_element(By.ID, 'submit-btn')
submit_btn.click()
WebDriverWait(self.driver, 10).until(
EC.url_contains('success')
)
print(f"发布成功: {item_data['title']}")
return True
except Exception as e:
print(f"发布失败: {str(e)}")
return False
def start(self):
if not os.path.exists('config.ini'):
self.create_default_config()
print("请先配置config.ini文件")
return
self.load_tasks()
if not self.login():
return
while not self.task_queue.empty():
task = self.task_queue.get()
self.post_item(task)
time.sleep(random.randint(5, 15)) # 随机延时避免被封
self.driver.quit()
def create_default_config(self):
self.config['ACCOUNT'] = {
'username': 'your_username',
'password': 'your_password'
}
self.config['SETTINGS'] = {
'delay_min': '5',
'delay_max': '15',
'retry_times': '3'
}
with open('config.ini', 'w') as configfile:
self.config.write(configfile)
default_tasks = [
{
"title": "测试信息1",
"content": "这是第一条测试信息",
"category": "生活服务"
},
{
"title": "测试信息2",
"content": "这是第二条测试信息",
"category": "二手交易"
}
]
with open('tasks.json', 'w', encoding='utf-8') as f:
json.dump(default_tasks, f, ensure_ascii=False, indent=4)
if __name__ == '__main__':
poster = FiveEightAutoPoster()
poster.start()
cv2
import numpy as np
import pytesseract
from PIL import Image
class CaptchaRecognizer:
def __init__(self):
self.tesseract_config = r'--tessdata-dir "C:\Program Files\Tesseract-OCR\tessdata"'
def preprocess_image(self, image_path):
img = cv2.imread(image_path)
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
kernel = np.ones((2, 2), np.uint8)
cleaned = cv2.morphologyEx(thresh, cv2.MORPH_CLOSE, kernel)
return cleaned
def recognize_captcha(self, image_path):
processed = self.preprocess_image(image_path)
temp_path = 'temp_processed.png'
cv2.imwrite(temp_path, processed)
try:
text = pytesseract.image_to_string(
Image.open(temp_path),
config=self.tesseract_config
).strip()
return text
except Exception as e:
print(f"验证码识别错误: {str(e)}")
return None
def batch_recognize(self, image_paths):
results = []
for path in image_paths:
result = self.recognize_captcha(path)
results.append((path, result))
return results
import json
import os
import time
from datetime import datetime
import threading
import queue
class TaskManager:
def __init__(self):
self.task_queue = queue.Queue()
self.completed_tasks = []
self.failed_tasks = []
self.lock = threading.Lock()
def load_tasks(self, file_path='tasks.json'):
if not os.path.exists(file_path):
self.create_sample_tasks(file_path)
with open(file_path, 'r', encoding='utf-8') as f:
tasks = json.load(f)
for task in tasks:
self.task_queue.put(task)
def create_sample_tasks(self, file_path):
sample_tasks = [
{
"id": 1,
"title": "测试任务1",
"content": "这是第一个测试任务的内容",
"category": "生活服务",
"images": [],
"price": "",
"contact": "",
"post_time": ""
},
{
"id": 2,
"title": "测试任务2",
"content": "这是第二个测试任务的内容",
"category": "二手交易",
"images": [],
"price": "100",
"contact": "13800138000",
"post_time": ""
}
]
self.save_tasks(sample_tasks, file_path)
def save_tasks(self, tasks, file_path):
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(tasks, f, ensure_ascii=False, indent=4)
def add_completed_task(self, task):
with self.lock:
task['complete_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.completed_tasks.append(task)
self.save_status()
def add_failed_task(self, task, reason):
with self.lock:
task['fail_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
task['fail_reason'] = reason
self.failed_tasks.append(task)
self.save_status()
def save_status(self):
status = {
'completed': self.completed_tasks,
'failed': self.failed_tasks,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
with open('task_status.json', 'w', encoding='utf-8') as f:
json.dump(status, f, ensure_ascii=False, indent=4)
def get_next_task(self):
if not self.task_queue.empty():
return self.task_queue.get()
return None
def get_progress(self):
total = self.task_queue.qsize() + len(self.completed_tasks) + len(self.failed_tasks)
if total == 0:
return 0
return (len(self.completed_tasks) / total) * 100
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。