上周刚忙完某安全证书的考证,本来这周就应该开始学一些应急响应的东西,碰巧碰到某大佬,问我有一个功能想实现,能不能写一个脚本凑合一下
大概功能如下
机器会定时在某个时间点,做一个机器的备份,文件目录的拓扑如上
然后在x号机器中,会有一个增量备份,所以需要在各个机器的备份文件夹中,找到该最新的增量备份,上传至阿里的OSS中
我一想,这不简单?于是就在13号的时候,上手写了个小脚本,有大佬觉得哪里不对的话欢迎指指点点
我代码逻辑是
用os.walk()获取所有指定xxx后缀的备份文件->获取该文件路径->判断有没有查询过这个文件夹->利用listdir获取该文件当前路径下的所有文件->查找该目录下,最新创建的文件,并判断起后缀是否为xxx->上传oss
首先是,引用库
import oss2
import sys
import os
oss2这个是阿里封装好的sdk,直接github下载源码,或者pip install oss2
安装也行
然后是,写一个oss的验证
...
auth = oss2.Auth('KeyId','KeySecret')
py_file_path=sys.path[0] #获取当前脚本所在的路径
bucket = oss2.Bucket(auth,'oss-cn-where.aliyuncs.com','bucket_name')
接着是,上传进度条和上传功能
...
def percentage(consumed_bytes, total_bytes):
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print('\r{0}% '.format(rate), end='')
sys.stdout.flush()
def upload_file(uploadName,filename):
bucket.put_object_from_file(str(uploadName),str(filename),progress_callback=percentage)
接着是查找当前目录下最新创建的文件,且后缀指定为xxx
...
def find_new_file(dir):
'''查找目录下最新的文件'''
file_lists = os.listdir(dir)
new_file_lists=[]
for files_check1 in file_lists:
if files_check1.endswith('xxx'):
new_file_lists.append(files_check1)
else:
pass
new_file_lists.sort(key=lambda fn: os.path.getmtime(dir + "\\" + fn)
if not os.path.isdir(dir + "\\" + fn) else 0)
#print('最新的文件为: ' + file_lists[-1])
file = os.path.join(dir, new_file_lists[-1])
#print('完整路径:', file)
return file,new_file_lists[-1]
接着就可以写本体了
先创建一个函数,用来检查这个路径是否已经查询过了
...
file_check_root=[]
然后用os.walk()遍历所有目录,返回文件路径、文件夹名、文件名这三个变量
...
for root,dirs,files in os.walk(py_file_path):
接着,for循环所有文件名,判断该文件是否以xxx
结尾
...
for i in files:
#print('files: '+i)
#sys.stdout.flush()
if i.endswith('xxx'):
如果判断为指定后缀,那么执行查找,和上传
...
if i.endswith('xxx'):
if not root in file_check_root:
new_file_root=find_new_file(root)
#print(new_file_root[0])
#sys.stdout.flush()
file_check_root.append(root)
filename=os.path.join(root,new_file_root[1])
upload_file(new_file_root[1],filename)
else:
pass
补完全
...
else:
continue
代码上传到github了,虽然很垃圾,但是至少也能用嘛
https://github.com/Ernket/python-backup-alioss