类似于在像QQ一样的文件+聊天服务器中, 既要传输文件,同时要把消息传过去, 而文件太大, 直接阻塞 可以将文件切分为小部分, 协程传递
def cut(self, byte:bytes, seg=segment) -> list:
return [byte[x:x+seg] for x in range(0, len(byte), seg)]
十分简洁
自定义的是2kb一切分, 改一下segment
即可
0.2秒已更新, 更改 delay
import asyncio
import os
import sys
from threading import Thread
base = 1024
segment = base*2
delay = 0.2
new_file = 0
update_file = 1
request_file = 2
normal_text = 3
loop = asyncio.get_event_loop()
main_thread = Thread(target=loop.run_forever)
def save_bytes(file, byte:bytes):
with open(file, "wb") as f:
f.write(byte)
def get_eval(str, defined=None):
try:
res = eval(str)
if isinstance(res, type(defined)):
return res
raise TypeError
except:
return defined
class SEND():
def __init__(self, index, name, fbytes:bytes, conn:callable, encode='utf8'):
self.segs = self.cut(fbytes)
self.encode = encode
self.total = len(self.segs)
self.index = index
self.conn = conn
self.finish = False
self.name = name
asyncio.run_coroutine_threadsafe(self.update(), loop)
async def update(self):
self.conn(self.header().encode(self.encode))
for n, data in enumerate(self.segs):
self.conn(self.format(n, data).encode(self.encode))
await asyncio.sleep(delay)
self.finish = True
def cut(self, byte:bytes, seg=segment) -> list:
return [byte[x:x+seg] for x in range(0, len(byte), seg)]
def format(self, process, data) -> str:
return repr( (update_file, (self.index, process, data)) )
def header(self) -> str:
return repr((new_file, (self.index,
self.name,
self.total)
))
class RECV():
def __init__(self, index: int, name: str, total: int,):
self.index,self.name,self.total = index, name, total
self.progress = -1
self.file = []
self.finish = False
def update(self, p, data):
if isinstance(p, int) and p - 1 == self.progress:
self.progress = p
self.file.append(data)
if len(self.file) == self.total:
self.finish = True
return True
def mix(self):
return b''.join(self.file)
def save(self, path, ):
if self.finish:
b = self.mix()
save_bytes(os.path.join(path, self.name), b)
self.__save = os.path.join(path, self.name)
print(f"Save {self.name} at {path}, size {len(b)} b.")
del self.file #释放空间
return True
return False
def savepath(self) -> str:
if self.finish:
return self.__save
return False
class send_files():
def __init__(self, encode='utf8'):
self.sends = []
self.encode = encode
def new_sends(self, name, file, conn):
if type(file) == str:
file = file.encode(self.encode)
self.sends.append(SEND(len(self.sends),name,file,conn)) # index: len(self.sends)-1+1 => len(self.sends)
def localfile(self, file, conn):
if os.path.isfile(file):
_, name = os.path.split(file)
with open(file, "rb") as f:
file = f.read()
self.new_sends(name, file, conn)
class recv_files():
def __init__(self, decode='utf8', path=None):
self.recvs = []
self.decode = decode
if path is None:
path = sys.path[0]
self.path = path
def new_files(self, index, name, total):
self.recvs.append(RECV(index, name, total))
def apply(self, index, progess, data):
if len(self.recvs) - 1 >= index:
if self.recvs[index].update(progess, data):
return self.save(index)
def save(self, index):
if len(self.recvs) -1 >= index:
return self.recvs[index].save(self.path)
class message_handle():
codec = "utf8"
def __init__(self, path,):
self.Sender = send_files(self.codec, )
self.Receiver = recv_files(self.codec, path)
def handle(self, data, client):
_res = get_eval(data.decode(self.codec), (None, ) )
if len(_res) == 2:
type, arguments = _res
if type == new_file:
self.Receiver.new_files(*arguments)
elif type == update_file:
self.Receiver.apply(*arguments)
elif type == request_file:
path = self.Receiver.recvs[arguments].savepath()
if path is False:
return
self.Sender.localfile(path, client._send) # 如若无, 报错False
elif type == normal_text:
return arguments
def send(self, sendpath, conn):
return self.Sender.localfile(sendpath, conn)
def get_index(self, index):
if index + 1 <= len(self.Receiver.recvs):
return self.Receiver.recvs[index]
save_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "resource")
if not os.path.isdir(save_path):
os.makedirs(save_path)
_handle = message_handle(save_path, )
main_thread.start()