首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【RAG】知识库搭建-文档预处理-数据清洗:基于异步的AI文本批处理系统实践

【RAG】知识库搭建-文档预处理-数据清洗:基于异步的AI文本批处理系统实践

作者头像
訾博ZiBo
发布于 2025-03-26 05:32:10
发布于 2025-03-26 05:32:10
1.2K014
代码可运行
举报
运行总次数:14
代码可运行

知识库搭建-文档预处理-数据清洗:基于异步的AI文本批处理系统实践

项目背景

在构建企业级知识库和RAG(检索增强生成)系统时,文档预处理和数据清洗是至关重要的环节。原始文档往往存在格式不统一、内容冗余、质量参差不齐等问题,这些问题会直接影响到向量数据库的检索质量和后续AI模型的表现。为了解决这个问题,我开发了一个基于Python的异步文本批处理系统。通过这个系统的处理,我们可以显著提升知识库的质量,为后续的向量检索和AI应用打下坚实的基础。

技术架构

项目采用了以下核心技术栈:

  • Python 3.x
  • asyncio:用于异步并发处理
  • aiohttp:处理异步HTTP请求
  • OpenAI兼容API:用于文本处理
  • tqdm:提供进度条显示
  • Pathlib:处理文件路径

核心功能

  1. 多格式支持:支持txt、md、doc、docx等多种文本格式
  2. 异步处理:使用Python的asyncio实现高效的并发处理
  3. 并发控制:通过信号量限制最大并发数,避免API限制
  4. 自动化管理:自动创建输入输出目录,统一文件命名
  5. 错误处理:完善的异常处理机制
  6. 进度显示:实时显示处理进度

完整代码

1、batch_processor.py
代码语言:javascript
代码运行次数:11
运行
AI代码解释
复制
import os
import asyncio
from pathlib import Path
from typing import List
import openai
from tqdm import tqdm
from datetime import datetime
from config import API_KEY, API_BASE, MODEL_NAME, PROMPT_TEMPLATE, INPUT_DIR, OUTPUT_DIR

class BatchProcessor:
    def __init__(self):
        """初始化批处理器"""
        # 配置OpenAI客户端
        self.client = openai.OpenAI(
            api_key=API_KEY,
            base_url=API_BASE
        )
        
        # 创建输入输出目录
        self._create_directories()
        
        # 设置并发限制
        self.semaphore = asyncio.Semaphore(5)  # 限制最大并发数为5
        
    def _create_directories(self):
        """创建必要的目录"""
        os.makedirs(INPUT_DIR, exist_ok=True)
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        
    def get_input_files(self) -> List[Path]:
        """获取输入文件列表,支持多种文本文件格式"""
        input_path = Path(INPUT_DIR)
        # 支持多种文本文件格式
        text_extensions = ['.txt', '.md', '.doc', '.docx', '.rtf', '.html', '.htm']
        input_files = []
        for ext in text_extensions:
            input_files.extend(list(input_path.glob(f'*{ext}')))
        return input_files
    
    async def process_file(self, input_file: Path) -> str:
        """异步处理单个文件"""
        async with self.semaphore:  # 使用信号量控制并发
            try:
                # 读取文件内容
                with open(input_file, 'r', encoding='utf-8') as f:
                    content = f.read()
                
                # 构建提示词
                prompt = PROMPT_TEMPLATE.format(content=content)
                
                # 调用API
                response = await asyncio.to_thread(
                    self.client.chat.completions.create,
                    model=MODEL_NAME,
                    messages=[
                        {"role": "system", "content": "你是一个专业的文本优化助手。"},
                        {"role": "user", "content": prompt}
                    ],
                    temperature=0.7,
                    max_tokens=4096  # 修改为模型支持的最大token数
                )
                
                # 获取生成的内容
                generated_content = response.choices[0].message.content
                
                # 添加延迟以避免API限制
                await asyncio.sleep(1)
                
                return generated_content
                
            except Exception as e:
                print(f"处理文件 {input_file} 时出错: {str(e)}")
                return None
    
    async def save_output(self, input_file: Path, content: str):
        """异步保存输出文件,统一使用.md格式"""
        if content is None:
            return
            
        # 生成带时间戳的输出文件名,统一使用.md后缀
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = Path(OUTPUT_DIR) / f"{input_file.stem}_processed_{timestamp}.md"
        
        # 使用异步文件操作
        await asyncio.to_thread(
            lambda: open(output_file, 'w', encoding='utf-8').write(content)
        )
    
    async def process_all_files(self):
        """异步处理所有文件"""
        input_files = self.get_input_files()
        
        if not input_files:
            print(f"请在 {INPUT_DIR} 目录中放入要处理的文本文件")
            return
            
        print(f"找到 {len(input_files)} 个文件待处理")
        
        # 创建所有任务
        tasks = []
        for input_file in input_files:
            # 创建处理文件的任务
            task = asyncio.create_task(self.process_file(input_file))
            tasks.append((input_file, task))
        
        # 使用tqdm显示进度条
        with tqdm(total=len(tasks), desc="处理文件") as pbar:
            # 等待所有任务完成
            for input_file, task in tasks:
                content = await task
                await self.save_output(input_file, content)
                pbar.update(1)

def main():
    """主函数"""
    processor = BatchProcessor()
    # 使用asyncio.run()来运行异步主函数
    asyncio.run(processor.process_all_files())

if __name__ == "__main__":
    main() 
2、config.py
代码语言:javascript
代码运行次数:3
运行
AI代码解释
复制
import os
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# API配置
API_KEY = os.getenv(
    "API_KEY", "sk-xxx"
)  # 从环境变量获取API密钥
API_BASE = os.getenv("API_BASE", "https://xxx/v1")  # API基础URL
MODEL_NAME = os.getenv("MODEL_NAME", "gpt-3.5-turbo")  # 使用的模型名称

# 提示词模板集合
PROMPT_TEMPLATES = {
    "optimize": """
你是一个专业的文档数据清洗专家,负责处理和优化用于RAG知识库构建的文档。请按照以下指南对提供的文档进行全面清洗和标准化处理:

### 数据清洗任务:

1. 去除无关内容:
   - 删除所有广告内容
   - 移除页眉页脚信息(如页码、章节标题等重复出现的元素)
   - 清除水印文本
   - 去除版权声明、免责声明等非核心内容
   - 删除装饰性特殊字符和符号

2. 标准化格式:
   - 将所有文本转换为UTF-8编码
   - 统一标点符号(如将全角标点转为半角,或根据文档主要语言选择合适的标点规范)
   - 规范化空格使用(删除多余空格,保持段落间隔一致)
   - 对于中文文档,确保使用标准中文标点
   - 对于英文部分,统一大小写规范(如专有名词、缩写等)

3. 处理缺失值和噪声:
   - 修正OCR错误(如"0""O""1""l"的混淆)
   - 识别并修复断行导致的词语分割
   - 合并被错误分割的段落
   - 修正明显的拼写和语法错误
   - 标记无法修复的损坏内容

4. 结构优化:
   - 重新组织文档的层次结构(标题、小标题、段落)
   - 确保列表格式一致(编号、项目符号等)
   - 保持表格数据的完整性和可读性
   - 确保图表引用的连贯性

5. 语义保全:
   - 确保清洗过程不改变原文档的核心含义
   - 保留专业术语和领域特定词汇
   - 维持上下文关系和逻辑连贯性

### 输出要求:

1. 严格要求:仅输出清洗后的文档内容,不要包含任何解释、说明或其他额外内容
2. 使用markdown格式输出清洗后的完整文档
3. 不要添加任何前缀、后缀或额外的评论
4. 输出结果强化:
   - 确保文档具有清晰的层次结构,使用适当的标题级别(#、##、###等)
   - 主要章节使用一级标题(#),子部分使用二级标题(##),更细分的内容使用三级标题(###)
   - 相关内容应组织在同一部分下,保持逻辑连贯性
   - 为没有明确标题的重要段落添加适当的小标题
   - 确保标题层级之间的逻辑关系清晰,避免层级跳跃

请处理以下文档内容:

{content}
""",
}

# 当前激活的提示词模板
ACTIVE_TEMPLATE = os.getenv("ACTIVE_TEMPLATE", "optimize")  # 默认使用优化模板

# 获取当前激活的提示词模板
PROMPT_TEMPLATE = PROMPT_TEMPLATES.get(ACTIVE_TEMPLATE, PROMPT_TEMPLATES["optimize"])

# 文件路径配置
INPUT_DIR = "docs"  # 输入文件目录
OUTPUT_DIR = "docs-output"  # 输出文件目录
3、requirements.txt
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
openai>=1.0.0
python-dotenv>=0.19.0
tqdm>=4.65.0 
aiohttp>=3.8.0

技术实现细节

1. 异步并发设计
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
class BatchProcessor:
    def __init__(self):
        # 限制最大并发数为5,避免API过载
        self.semaphore = asyncio.Semaphore(5)
        
    async def process_file(self, input_file: Path) -> str:
        async with self.semaphore:  # 使用信号量控制并发
            # 处理逻辑
            pass
2. 智能文件处理
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
def get_input_files(self) -> List[Path]:
    """获取输入文件列表,支持多种文本文件格式"""
    text_extensions = ['.txt', '.md', '.doc', '.docx', '.rtf', '.html', '.htm']
    input_files = []
    for ext in text_extensions:
        input_files.extend(list(input_path.glob(f'*{ext}')))
    return input_files
3. 输出文件管理
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
async def save_output(self, input_file: Path, content: str):
    # 生成带时间戳的输出文件名
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = Path(OUTPUT_DIR) / f"{input_file.stem}_processed_{timestamp}.md"

项目优势

  1. 高效性:采用异步并发处理,显著提升处理效率
  2. 可扩展性:模块化设计,易于添加新功能
  3. 稳定性:完善的错误处理机制,确保系统稳定运行
  4. 易用性:简单的配置和使用方式,上手门槛低

使用方法

  1. 安装依赖:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
pip install -r requirements.txt
  1. 配置系统:
  • 创建配置文件,设置必要的环境变量
  • 可自定义处理模板和目录路径
  1. 运行系统:
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
python batch_processor.py
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2025-03-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
[大模型]基于 InternLM 和 LangChain 搭建知识库助手
同时,我们需要使用到开源词向量模型 Sentence Transformer,可以将其模型参数以类似于下载 InternLM 模型参数的方式下载到本地 /root/autodl-tmp/embedding_model。同时,在本节中,InternLM-Chat-7B-V1.1 的模型参数文件存储在本地 /root/autodl-tmp/model。
云未归来
2025/07/17
1090
[大模型]基于 InternLM 和 LangChain 搭建知识库助手
大模型实战|企业知识库问答系统构建全流程:从数据接入到RAG架构落地
尤其是信息孤岛林立、知识文档散落在各业务系统的企业中,员工在寻找答案的时间,可能比解决问题本身还久。这不禁让我思考:有没有一种方式,能让大家像用ChatGPT一样,在公司内部随时提问,立刻获得准确、专业的回答?
Echo_Wish
2025/07/19
3511
大模型实战|企业知识库问答系统构建全流程:从数据接入到RAG架构落地
如何用 Python 实现高效的文件处理
在日常开发中,文件处理是常见任务,比如读取日志、处理CSV数据或批量转换文件格式。Python 提供了强大的文件操作功能,但稍不注意就可能导致性能瓶颈或代码冗余。本文将分享几个实用技巧,帮助你用 Python 更高效地处理文件,涵盖大文件读取、批量处理和错误管理。
是山河呀
2025/07/15
1000
自媒体批量发布工具,抖音批量发布作品工具,上传视频用python模块~
成品下载地址:https://www.pan38.com/yun/share.php?code=JCnzE 提取密码:8819
用户11719788
2025/07/13
900
AIOps系列 | 大模型入门实战
!! 大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。
没有故事的陈师傅
2025/07/16
2090
AIOps系列 | 大模型入门实战
👾打开 RAG 对接大模型的黑盒 —— 9 大隐藏问题
前一段时间,各个大模型在争斗:谁能携带更长、更大的上下文 Prompt,比如 Kimi 说 200 万字,阿里通义千问又说自己能达 1000 万字;大家都知道 Prompt 很重要,但是 RAG 和 长的上下文文本携带 是两个不同的技术方向。
掘金安东尼
2024/04/04
5110
👾打开 RAG 对接大模型的黑盒 —— 9  大隐藏问题
基于GPT搭建私有知识库聊天机器人(三)向量数据训练
在前面的文章中,我们介绍了实现原理和基本环境安装。本文将重点介绍数据训练的流程,以及如何加载、切割、训练数据,并使用向量数据库Milvus进行数据存储。
夕阳也是醉了
2023/10/16
9630
基于GPT搭建私有知识库聊天机器人(三)向量数据训练
基于RAG的企业级代码生成系统:从数据清洗到工程化实现
在现代软件开发中,利用大型语言模型(LLM)生成代码已成为提高开发效率的重要手段。然而,对于企业来说,如何让这些模型了解并遵循内部的代码规范、使用自定义组件和公共库,仍然是一个挑战。本文将详细介绍如何通过检索增强生成(RAG)技术,结合企业特定的知识库,构建一个适合企业内部使用的代码生成系统。
老码小张
2024/07/11
1.7K0
基于RAG的企业级代码生成系统:从数据清洗到工程化实现
基于InternLM和LangChain搭建自己的知识库
为了突破LLM的局限性,目前有两种范式可行:RAG(检索增强生成)和Finetune(模型微调)。
阿提说说
2024/03/01
1.2K0
基于InternLM和LangChain搭建自己的知识库
基础RAG实现,最佳入门选择(五)
> 增强生成(RAG)通过在生成响应之前检索相关的外部知识来提高语言模型的事实准确性。然而,标准组块经常丢失重要的上下文,使得检索不太有效。上下文块标头(CCH)通过在嵌入每个块之前为每个块添加高级上下文(如文档标题或部分标头)来增强RAG。这提高了检索质量并防止了断章取义的响应。
舒一笑不秃头
2025/06/20
640
基础RAG实现,最佳入门选择(五)
[大模型]基于 ChatGLM3 和 LangChain 搭建知识库助手
同时,我们需要使用到开源词向量模型 Sentence Transformer(HuggingFace 链接名为:sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2),可以将其模型参数以类似于下载 ChatGLM3 模型参数的方式下载到本地 /root/autodl-tmp/sentence-transformer。同时,在本节中,ChatGLM3-6B 的模型参数文件存储在本地 autodl-tmp/ZhipuAI/chatglm3-6b。
云未归来
2025/07/17
1650
[大模型]基于 ChatGLM3 和 LangChain 搭建知识库助手
如何只用一行Python代码制作一个GUI(图形界面)?
GUI(图形用户界面),顾名思义就是用图形的方式,来显示计算机操作的界面,更加方便且直观。
小F
2022/05/26
1.8K0
如何只用一行Python代码制作一个GUI(图形界面)?
基于GPT3.5实现本地知识库解决方案-利用向量数据库和GPT向量接口-实现智能回复并限制ChatGPT回答的范围
标题有点长,但是基本也说明出了这篇文章的主旨,那就是利用GPT AI智能回答自己设置好的问题
唯一Chat
2023/03/25
6.9K0
基于GPT3.5实现本地知识库解决方案-利用向量数据库和GPT向量接口-实现智能回复并限制ChatGPT回答的范围
AI大模型全栈工程师课程笔记 - RAG 检索增强生成
课程学习自 知乎知学堂 https://www.zhihu.com/education/learning
Michael阿明
2023/12/09
1.6K0
AI大模型全栈工程师课程笔记 - RAG 检索增强生成
2.7K Star 本地高精度OCR!由GPT-4o-mini驱动的开源OCR!
如果你正在寻找一款高精度、本地运行、支持复杂布局的 OCR 工具,那么 Zerox OCR 无疑是一个极佳的选择。
Python兴趣圈
2024/10/21
2K0
2.7K Star 本地高精度OCR!由GPT-4o-mini驱动的开源OCR!
[大模型]Qwen-7B-Chat 接入langchain搭建知识库助手
在autodl平台中租一个3090等24G显存的显卡机器,如下图所示镜像选择PyTorch–>2.0.0–>3.8(ubuntu20.04)–>11.8
云未归来
2025/07/17
1750
[大模型]Qwen-7B-Chat 接入langchain搭建知识库助手
Python使用PyPDF2库进行PDF文件操作的详细教程
在Python中,PyPDF2是一个强大的库,用于处理PDF文件。无论是合并多个PDF文件、拆分PDF文件、提取文本或者旋转页面,PyPDF2都提供了简单而灵活的解决方案。本教程将介绍PyPDF2库的基本概念和用法,帮助你更好地理解如何在Python中进行PDF文件的各种操作。
Michel_Rolle
2024/01/25
4.7K0
python小工具
http://blog.csdn.net/pipisorry/article/details/46754515
用户7886150
2020/12/30
5970
如何从文档创建 RAG 评估数据集
通过上传 PDF 文件并将其存储在矢量数据库中,我们可以通过矢量相似性搜索检索这些知识,然后将检索到的文本作为附加上下文插入到 LLM 提示中。
致Great
2025/01/03
1.1K0
如何从文档创建 RAG 评估数据集
基于 hugging face 预训练模型的实体识别智能标注方案:生成doccano要求json格式
强烈推荐:数据标注平台doccano----简介、安装、使用、踩坑记录_汀、的博客-CSDN博客_doccano
汀丶人工智能
2022/12/21
8770
基于 hugging face 预训练模型的实体识别智能标注方案:生成doccano要求json格式
推荐阅读
相关推荐
[大模型]基于 InternLM 和 LangChain 搭建知识库助手
更多 >
交个朋友
加入腾讯云官网粉丝站
蹲全网底价单品 享第一手活动信息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档