记录下如何使用python中的boto3,连接并操作S3对象服务
# -*- coding: utf-8 -*-
"""
@Time : 2021/9/23 17:19
@Author : summer
@File : s3_client.py
@Software: PyCharm
"""
import json
import os
from collections import defaultdict
import urllib3
import boto3
import datetime
import mimetypes
from config.common import DOWNLOAD_DIR
urllib3.disable_warnings()
class s3_resource:
def __init__(self, endpoint_url, aws_access_key_id, aws_secret_access_key, port=8080):
self.endpoint_url = endpoint_url
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.port = port
def __enter__(self):
self.resource = self.create_s3_resource()
return self
def create_s3_resource(self):
return boto3.resource(
service_name='s3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
endpoint_url=self.endpoint_url + ':' + str(self.port),
verify=False
)
def create_bucket(self, str_bucket_name):
self.resource.create_bucket(Bucket=str_bucket_name)
def get_all_bucket(self):
arr_bucket = list()
for bucket in self.resource.buckets.all(): # 获取所有bucket
# 将实际转为本地时间
arr_bucket.append({"name": bucket.name,
"create_date": datetime.datetime.strftime(
bucket.creation_date + datetime.timedelta(hours=8),
"%Y-%m-%d %H:%M:%S")})
return arr_bucket
def get_bucket_info(self, str_bucket_name):
arr_bucket = list()
my_bucket = self.resource.Bucket(str_bucket_name)
for file in my_bucket.objects.all():
arr_bucket.append(file.key)
return arr_bucket
def del_bucket(self, str_bucket_name):
# 删除指定的bucket
for bucket in self.resource.buckets.all():
if bucket.name == str_bucket_name:
bucket.objects.all().delete()
bucket.delete()
def upload_file_s3(self, file_path, str_bucket_name):
for bucket in self.resource.buckets.all():
if bucket.name == str_bucket_name:
data = open(os.path.normpath(file_path), 'rb')
file_basename = os.path.basename(file_path)
self.resource.Bucket(str_bucket_name).put_object(Key=file_basename, Body=data)
def upload_nested_directory(self, file_path, str_bucket_name, fileext):
data = open(os.path.normpath(os.path.join(r'C:\autest_hci_resource\TMP\endpoint', file_path)), 'rb')
content_type = defaultdict(lambda: 'application/octet-stream', mimetypes.types_map)
# file_basename = os.path.basename(file_path)
self.resource.Object(str_bucket_name, file_path).put(Body=data,
ACL='public-read',
ContentType=content_type[fileext])
return None
def download_file_s3(self, str_bucket_name):
for i in self.get_bucket_info(str_bucket_name):
self.resource.Bucket(str_bucket_name).download_file(Key=i, Filename=os.path.join(DOWNLOAD_DIR, i))
def get_bucket_lifecycle(self, str_bucket_name):
bucket_lifecycle = self.resource.BucketLifecycle(str_bucket_name)
print(bucket_lifecycle)
# def set_bucket_tag(self, str_bucket_name):
# A = self.resource.BucketTagging(str_bucket_name)
# # A.tag_set[{'Key':'xsw','Value':'xsw'}]
# A.put(Tagging={
# 'TagSet': [
# {
# 'Key': 'xsw',
# 'Value': 'xsw'
# },
# ]
# }
# )
def get_bucket_tag(self, str_bucket_name):
bucket_tag = self.resource.BucketTagging(str_bucket_name)
arr_tag = list()
for dit_info in bucket_tag.tag_set:
# for key, value in dit_info.items():
arr_tag.append(dit_info)
return arr_tag
def enable_bucket_version(self, str_bucket_name):
bkt_versioning = self.resource.BucketVersioning(str_bucket_name)
bkt_versioning.enable()
print(bkt_versioning.status)
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class s3_client:
def __init__(self, endpoint_url, aws_access_key_id, aws_secret_access_key, port=8080):
self.endpoint_url = endpoint_url
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.port = port
def __enter__(self):
self.client = self.create_s3_client()
return self
def create_s3_client(self):
return boto3.client(
service_name='s3',
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
endpoint_url=self.endpoint_url + ':' + str(self.port),
verify=False
)
def put_now_object_acl(self, object_acl, str_bucket_name, str_object_name):
self.client.put_object_acl(ACL=object_acl, Bucket=str_bucket_name, Key=str_object_name)
def retrieve_object_acl(self, str_bucket_name, str_object_name):
# for bucket in self.resource.buckets.all():
# print(bucket.name)
object_acl_info = self.client.get_object_acl(Bucket=str_bucket_name, Key=str_object_name)
acl = object_acl_info.get("Grants", [])
return acl
def put_now_bucket_acl(self, bucket_acl, str_bucket_name):
self.client.put_bucket_acl(ACL=bucket_acl, Bucket=str_bucket_name)
# qwe = self.client.get_bucket_acl(Bucket=str_bucket_name)
# print(qwe)
def retrieve_bucket_acl(self, str_bucket_name):
# for bucket in self.resource.buckets.all():
# print(bucket.name)
bucket_acl_info = self.client.get_bucket_acl(Bucket=str_bucket_name, ExpectedBucketOwner='string')
acl = bucket_acl_info.get("Grants", [])
return acl
def retrieve_bucket_policy(self, str_bucket_name):
# for bucket in self.resource.buckets.all():
# print(bucket.name)
bucket_policy_info = self.client.get_bucket_policy(Bucket=str_bucket_name)
print(bucket_policy_info['Policy'])
return bucket_policy_info
def set_bucket_policy(self, str_bucket_name):
bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'AddPerm',
'Effect': 'Allow',
'Principal': '*',
'Action': ['s3:GetObject'],
'Resource': f'arn:aws:s3:::{str_bucket_name}/*'
}]
}
# Convert the policy from JSON dict to string
bucket_policy = json.dumps(bucket_policy)
self.client.put_bucket_policy(Bucket=str_bucket_name, Policy=bucket_policy)
def del_bucket_policy(self, str_bucket_name):
self.client.delete_bucket_policy(Bucket=str_bucket_name)
def put_s3_bucket_website(self, str_bucket_name):
# Define the website configuration
website_configuration = {
'ErrorDocument': {'Key': 'error.html'},
'IndexDocument': {'Suffix': 'index.html'},
'RoutingRules': [
{
'Condition': {
'KeyPrefixEquals': '/'
},
'Redirect': {
'ReplaceKeyWith': 'index.html'
}
},
]
}
# Set the website configuration
self.client.put_bucket_website(Bucket=str_bucket_name,
WebsiteConfiguration=website_configuration)
def set_object_tag_by_client(self, str_bucket_name, key, update=True, **new_tags):
old_tags = {}
if update:
old = self.client.get_object_tagging(
Bucket=str_bucket_name,
Key=key,
)
old_tags = {i['Key']: i['Value'] for i in old['TagSet']}
new_tags = {**old_tags, **new_tags}
response = self.client.put_object_tagging(
Bucket=str_bucket_name,
Key=key,
Tagging={
'TagSet': [{'Key': str(k), 'Value': str(v)} for k, v in new_tags.items()]
}
)
return response['ResponseMetadata']['HTTPStatusCode'] == 200
def set_bucket_tags(self, str_bucket_name):
tags = [{'Key': 'Owner', 'Value': 'xsw'}, {'Key': 'PrincipalId', 'Value': 'principal'}]
self.client.put_bucket_tagging(Bucket=str_bucket_name, Tagging={'TagSet': tags})
def get_s3_bucket_website(self, str_bucket_name):
i = self.client.get_bucket_website(Bucket=str_bucket_name)
return i
def get_s3_bucket_version_status(self, str_bucket_name):
i = self.client.get_bucket_versioning(Bucket=str_bucket_name)
if 'Status' in i and i['Status'] == 'Enabled':
return True
else:
return False
def get_location(self, str_bucket_name):
response = self.client.get_bucket_location(Bucket=str_bucket_name)
return response['LocationConstraint']
def create_bucket(self, str_bucket_name, LocationConstraint):
self.client.create_bucket(
Bucket=str_bucket_name,
CreateBucketConfiguration={
'LocationConstraint': LocationConstraint,
},
)
def __exit__(self, exc_type, exc_val, exc_tb):
pass
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有