前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python套接字多线程文件传输(大文件切割) - 协程async动态添加任务

python套接字多线程文件传输(大文件切割) - 协程async动态添加任务

作者头像
zmh-program
发布2023-02-06 09:58:29
3950
发布2023-02-06 09:58:29
举报
文章被收录于专栏:信息技术博客

类似于在像QQ一样的文件+聊天服务器中, 既要传输文件,同时要把消息传过去, 而文件太大, 直接阻塞 可以将文件切分为小部分, 协程传递

文章目录

字节(可以二进制读取文件)切分

代码语言:javascript
复制
def cut(self, byte:bytes, seg=segment) -> list:
   return [byte[x:x+seg] for x in range(0, len(byte), seg)]

十分简洁

实现

自定义的是2kb一切分, 改一下segment即可 0.2秒已更新, 更改 delay

代码语言:javascript
复制
import asyncio
import os
import sys
from threading import Thread

base = 1024
segment = base*2
delay = 0.2

new_file = 0
update_file = 1
request_file = 2
normal_text = 3

loop = asyncio.get_event_loop()
main_thread = Thread(target=loop.run_forever)

def save_bytes(file, byte:bytes):
    with open(file, "wb") as f:
        f.write(byte)
def get_eval(str, defined=None):
    try:
        res = eval(str)
        if isinstance(res, type(defined)):
            return res
        raise TypeError
    except:
        return defined
class SEND():
    def __init__(self, index, name, fbytes:bytes, conn:callable, encode='utf8'):
        self.segs = self.cut(fbytes)
        self.encode = encode
        self.total = len(self.segs)
        self.index = index
        self.conn = conn
        self.finish = False
        self.name = name
        asyncio.run_coroutine_threadsafe(self.update(), loop)
    async def update(self):
        self.conn(self.header().encode(self.encode))
        for n, data in enumerate(self.segs):
            self.conn(self.format(n, data).encode(self.encode))
            await asyncio.sleep(delay)
        self.finish = True
    def cut(self, byte:bytes, seg=segment) -> list:
        return [byte[x:x+seg] for x in range(0, len(byte), seg)]
    def format(self, process, data) -> str:
        return repr( (update_file, (self.index, process, data)) )
    def header(self) -> str:
        return repr((new_file, (self.index,
                              self.name,
                              self.total)
                     ))

class RECV():
    def __init__(self, index: int, name: str, total: int,):
        self.index,self.name,self.total = index, name, total
        self.progress = -1
        self.file = []
        self.finish = False
    def update(self, p, data):
        if isinstance(p, int) and p - 1 == self.progress:
            self.progress = p
            self.file.append(data)
            if len(self.file) == self.total:
                self.finish = True
                return True

    def mix(self):
        return b''.join(self.file)
    def save(self, path, ):
        if self.finish:
            b = self.mix()
            save_bytes(os.path.join(path, self.name), b)
            self.__save = os.path.join(path, self.name)
            print(f"Save {self.name} at {path}, size {len(b)} b.")
            del self.file  #释放空间
            return True
        return False
    def savepath(self) -> str:
        if self.finish:
            return self.__save
        return False

class send_files():
    def __init__(self, encode='utf8'):
        self.sends = []
        self.encode = encode
    def new_sends(self, name, file, conn):
        if type(file) == str:
                file = file.encode(self.encode)

        self.sends.append(SEND(len(self.sends),name,file,conn)) # index: len(self.sends)-1+1  => len(self.sends)
    def localfile(self, file, conn):
        if os.path.isfile(file):
            _, name = os.path.split(file)
            with open(file, "rb") as f:
                file = f.read()
            self.new_sends(name, file, conn)

class recv_files():
    def __init__(self, decode='utf8', path=None):
        self.recvs = []
        self.decode = decode
        if path is None:
            path = sys.path[0]
        self.path = path
    def new_files(self, index, name, total):
        self.recvs.append(RECV(index, name, total))
    def apply(self, index, progess, data):
        if len(self.recvs) - 1 >= index:
            if self.recvs[index].update(progess, data):
                return self.save(index)

    def save(self, index):
        if len(self.recvs) -1 >= index:
            return self.recvs[index].save(self.path)

class message_handle():
    codec = "utf8"

    def __init__(self, path,):
        self.Sender = send_files(self.codec, )
        self.Receiver = recv_files(self.codec, path)
    def handle(self, data, client):
        _res = get_eval(data.decode(self.codec), (None, ) )
        if len(_res) == 2:
            type, arguments = _res
            if type == new_file:
                self.Receiver.new_files(*arguments)
            elif type == update_file:
                self.Receiver.apply(*arguments)
            elif type == request_file:
                path = self.Receiver.recvs[arguments].savepath()
                if path is False:
                    return
                self.Sender.localfile(path, client._send) # 如若无, 报错False
            elif type == normal_text:
                return arguments
    def send(self, sendpath, conn):
        return self.Sender.localfile(sendpath, conn)
    def get_index(self, index):
        if index + 1 <= len(self.Receiver.recvs):
            return self.Receiver.recvs[index]

save_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(save_path):
    os.makedirs(save_path)
_handle = message_handle(save_path, )
main_thread.start()
想必大家都知道TCP服务器 v1.9更啥了吧!
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 字节(可以二进制读取文件)切分
  • 实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档