前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >s3接口测试轮子

s3接口测试轮子

作者头像
summerking
发布于 2022-09-16 04:04:41
发布于 2022-09-16 04:04:41
71800
代码可运行
举报
文章被收录于专栏:summerking的专栏summerking的专栏
运行总次数:0
代码可运行

记录下如何使用python中的boto3,连接并操作S3对象服务

# python版本

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
# -*- coding: utf-8 -*-
"""
@Time    : 2021/9/23 17:19
@Author  : summer
@File    : s3_client.py
@Software: PyCharm
"""
import json
import os
from collections import defaultdict
import urllib3
import boto3
import datetime
import mimetypes
from config.common import DOWNLOAD_DIR

urllib3.disable_warnings()

class s3_resource:
    def __init__(self, endpoint_url, aws_access_key_id, aws_secret_access_key, port=8080):
        self.endpoint_url = endpoint_url
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key
        self.port = port

    def __enter__(self):
        self.resource = self.create_s3_resource()
        return self

    def create_s3_resource(self):
        return boto3.resource(
            service_name='s3',
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            endpoint_url=self.endpoint_url + ':' + str(self.port),
            verify=False
        )

    def create_bucket(self, str_bucket_name):
        self.resource.create_bucket(Bucket=str_bucket_name)

    def get_all_bucket(self):
        arr_bucket = list()
        for bucket in self.resource.buckets.all():  # 获取所有bucket
            # 将实际转为本地时间
            arr_bucket.append({"name": bucket.name,
                               "create_date": datetime.datetime.strftime(
                                   bucket.creation_date + datetime.timedelta(hours=8),
                                   "%Y-%m-%d %H:%M:%S")})
        return arr_bucket

    def get_bucket_info(self, str_bucket_name):
        arr_bucket = list()
        my_bucket = self.resource.Bucket(str_bucket_name)
        for file in my_bucket.objects.all():
            arr_bucket.append(file.key)
        return arr_bucket

    def del_bucket(self, str_bucket_name):
        # 删除指定的bucket
        for bucket in self.resource.buckets.all():
            if bucket.name == str_bucket_name:
                bucket.objects.all().delete()
                bucket.delete()

    def upload_file_s3(self, file_path, str_bucket_name):
        for bucket in self.resource.buckets.all():
            if bucket.name == str_bucket_name:
                data = open(os.path.normpath(file_path), 'rb')
                file_basename = os.path.basename(file_path)
                self.resource.Bucket(str_bucket_name).put_object(Key=file_basename, Body=data)

    def upload_nested_directory(self, file_path, str_bucket_name, fileext):
        data = open(os.path.normpath(os.path.join(r'C:\autest_hci_resource\TMP\endpoint', file_path)), 'rb')
        content_type = defaultdict(lambda: 'application/octet-stream', mimetypes.types_map)
        # file_basename = os.path.basename(file_path)
        self.resource.Object(str_bucket_name, file_path).put(Body=data,
                                                             ACL='public-read',
                                                             ContentType=content_type[fileext])
        return None

    def download_file_s3(self, str_bucket_name):
        for i in self.get_bucket_info(str_bucket_name):
            self.resource.Bucket(str_bucket_name).download_file(Key=i, Filename=os.path.join(DOWNLOAD_DIR, i))

    def get_bucket_lifecycle(self, str_bucket_name):
        bucket_lifecycle = self.resource.BucketLifecycle(str_bucket_name)
        print(bucket_lifecycle)

    # def set_bucket_tag(self, str_bucket_name):
    #     A = self.resource.BucketTagging(str_bucket_name)
    #     # A.tag_set[{'Key':'xsw','Value':'xsw'}]
    #     A.put(Tagging={
    #         'TagSet': [
    #             {
    #                 'Key': 'xsw',
    #                 'Value': 'xsw'
    #             },
    #         ]
    #     }
    #     )

    def get_bucket_tag(self, str_bucket_name):
        bucket_tag = self.resource.BucketTagging(str_bucket_name)
        arr_tag = list()
        for dit_info in bucket_tag.tag_set:
            # for key, value in dit_info.items():
            arr_tag.append(dit_info)
        return arr_tag

    def enable_bucket_version(self, str_bucket_name):
        bkt_versioning = self.resource.BucketVersioning(str_bucket_name)
        bkt_versioning.enable()
        print(bkt_versioning.status)

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass


class s3_client:
    def __init__(self, endpoint_url, aws_access_key_id, aws_secret_access_key, port=8080):
        self.endpoint_url = endpoint_url
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key
        self.port = port

    def __enter__(self):
        self.client = self.create_s3_client()
        return self

    def create_s3_client(self):
        return boto3.client(
            service_name='s3',
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
            endpoint_url=self.endpoint_url + ':' + str(self.port),
            verify=False
        )

    def put_now_object_acl(self, object_acl, str_bucket_name, str_object_name):
        self.client.put_object_acl(ACL=object_acl, Bucket=str_bucket_name, Key=str_object_name)

    def retrieve_object_acl(self, str_bucket_name, str_object_name):
        # for bucket in self.resource.buckets.all():
        #     print(bucket.name)
        object_acl_info = self.client.get_object_acl(Bucket=str_bucket_name, Key=str_object_name)
        acl = object_acl_info.get("Grants", [])
        return acl

    def put_now_bucket_acl(self, bucket_acl, str_bucket_name):
        self.client.put_bucket_acl(ACL=bucket_acl, Bucket=str_bucket_name)
        # qwe = self.client.get_bucket_acl(Bucket=str_bucket_name)
        # print(qwe)

    def retrieve_bucket_acl(self, str_bucket_name):
        # for bucket in self.resource.buckets.all():
        #     print(bucket.name)
        bucket_acl_info = self.client.get_bucket_acl(Bucket=str_bucket_name, ExpectedBucketOwner='string')
        acl = bucket_acl_info.get("Grants", [])
        return acl

    def retrieve_bucket_policy(self, str_bucket_name):
        # for bucket in self.resource.buckets.all():
        #     print(bucket.name)
        bucket_policy_info = self.client.get_bucket_policy(Bucket=str_bucket_name)
        print(bucket_policy_info['Policy'])
        return bucket_policy_info

    def set_bucket_policy(self, str_bucket_name):
        bucket_policy = {
            'Version': '2012-10-17',
            'Statement': [{
                'Sid': 'AddPerm',
                'Effect': 'Allow',
                'Principal': '*',
                'Action': ['s3:GetObject'],
                'Resource': f'arn:aws:s3:::{str_bucket_name}/*'
            }]
        }

        # Convert the policy from JSON dict to string
        bucket_policy = json.dumps(bucket_policy)
        self.client.put_bucket_policy(Bucket=str_bucket_name, Policy=bucket_policy)

    def del_bucket_policy(self, str_bucket_name):
        self.client.delete_bucket_policy(Bucket=str_bucket_name)

    def put_s3_bucket_website(self, str_bucket_name):
        # Define the website configuration
        website_configuration = {
            'ErrorDocument': {'Key': 'error.html'},
            'IndexDocument': {'Suffix': 'index.html'},
            'RoutingRules': [
                {
                    'Condition': {
                        'KeyPrefixEquals': '/'
                    },
                    'Redirect': {
                        'ReplaceKeyWith': 'index.html'
                    }
                },
            ]
        }

        # Set the website configuration
        self.client.put_bucket_website(Bucket=str_bucket_name,
                                       WebsiteConfiguration=website_configuration)

    def set_object_tag_by_client(self, str_bucket_name, key, update=True, **new_tags):
        old_tags = {}

        if update:
            old = self.client.get_object_tagging(
                Bucket=str_bucket_name,
                Key=key,
            )

            old_tags = {i['Key']: i['Value'] for i in old['TagSet']}

        new_tags = {**old_tags, **new_tags}

        response = self.client.put_object_tagging(
            Bucket=str_bucket_name,
            Key=key,
            Tagging={
                'TagSet': [{'Key': str(k), 'Value': str(v)} for k, v in new_tags.items()]
            }
        )

        return response['ResponseMetadata']['HTTPStatusCode'] == 200

    def set_bucket_tags(self, str_bucket_name):
        tags = [{'Key': 'Owner', 'Value': 'xsw'}, {'Key': 'PrincipalId', 'Value': 'principal'}]
        self.client.put_bucket_tagging(Bucket=str_bucket_name, Tagging={'TagSet': tags})

    def get_s3_bucket_website(self, str_bucket_name):
        i = self.client.get_bucket_website(Bucket=str_bucket_name)
        return i

    def get_s3_bucket_version_status(self, str_bucket_name):
        i = self.client.get_bucket_versioning(Bucket=str_bucket_name)
        if 'Status' in i and i['Status'] == 'Enabled':
            return True
        else:
            return False

    def get_location(self, str_bucket_name):
        response = self.client.get_bucket_location(Bucket=str_bucket_name)
        return response['LocationConstraint']

    def create_bucket(self, str_bucket_name, LocationConstraint):
        self.client.create_bucket(
            Bucket=str_bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint': LocationConstraint,
            },
        )

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Hugo博客搭建教程以及配置调优
我们首先需要安装Scoop,这是一个适用于Windows的包管理器,个人认为非常好用
慕阳MuYoung
2025/06/12
280
分布式存储——ceph 的 python 基础接口
python 使用 boto 库完成分布式存储读、写、判断接口 import boto import boto.s3.connection from boto.s3.key import Key import os class ImageFeatIO: __read_singleton = None __write_singleton = None __read_count = 0 __write_count =0 def __init__(self):
用户2434869
2018/09/12
1.4K0
安装s3cmd
一、测试S3访问     root@node4:~# apt-get install python-boto     root@node4:~# vim s2test.py     import boto     import boto.s3.connection     access_key = 'SSCRZQ0L7O6UM71OYV7H'     secret_key = '8VQ8Gr5CaxL5ZokorupYbf5xQ+AXYqA+KFa4OlZ+'     conn = boto.connect_s3(     aws_access_key_id = access_key,     aws_secret_access_key = secret_key,     #host = '{hostname}'     host = 'node4',     is_secure=False,     calling_format = boto.s3.connection.OrdinaryCallingFormat(),     )     bucket = conn.create_bucket('my-new-bucket-node4')     for bucket in conn.get_all_buckets():             print "{name}\t{created}".format(                    name = bucket.name,                    created = bucket.creation_date,     )     root@node4:~# python s2test.py     说明:access_key和secret_key需修改成被测试的用户的access_key和secret_key。 二、下载s3cmd安装包并安装     下载地址:https://sourceforge.net/projects/s3tools/files/s3cmd/     我们这里选择s3cmd-1.5.2.tar.gz版本。     root@node4:~# tar -zxvf s3cmd-1.5.2.tar.gz     root@node4:~# cd s3cmd-1.5.2     root@node4:~# apt-get install python-setuptools     root@node4:~# python setup.py install     root@node4:~# s3cmd --configure     注意:access_key和secret_key需分别配置成S3用户的access_key和secret_key     配置完成后会生成/root/.s3cfg文件,我们修改该文件中的host_base和host_bucket两项,用主机名替代原有网址。     root@node4:~# vim /root/.s3cfg     host_base = node4     host_bucket = %(bucket)s.node4     root@node4:~# ln -s /s3cmd-1.5.2/build/scripts-2.7/s3cmd  /usr/bin/s3cmd 三、安装dnsmasq     root@node4:~# apt-get install dnsmasq     root@node4:~# vim /etc/dnsmasq.conf     address = /node4/192.168.107.24(node4为主机名,192.168.107.24为该主机的IP地址)     listen-address = 127.0.0.1     root@node4:~# service dnsmasq restart
py3study
2020/01/06
9750
RGW奇淫技巧-玩转system特权
开启system特权 root@demohost:/home/user# radosgw-admin user modify --system=1 --uid=s3user { "user_id": "s3user", "display_name": "s3user", "email": "", "suspended": 0, "max_buckets": 1000, "auid": 0, "subusers": [], "keys": [
用户1260683
2018/01/31
6740
RGW奇淫技巧-玩转system特权
基于RGW的多媒体处理框架
基于RGW的多媒体处理框架 背景 业务需要在原有RGW的服务基础上加上对多媒体类资源的处理,比如图片resize、视频转码等。参考过多个厂家的设计,发现对方的多媒体类处理都是在URL里面加上query
用户1260683
2019/11/06
1.1K0
S3 简单使用
S3 全名是 Simple Storage Service,简便的存储服务。amazon (S3) 是一个公开的服务,Web 应用程序开发人员可以使用它存储数字资产,包括图片、视频、音乐和文档。S3 提供一个 RESTful API 以编程方式实现与该服务的交互。可以通过 Amazon S3 随时在 Web 上的任何位置存储和检索的任意大小的数据。
用户4945346
2023/01/11
3.2K0
python boto和boto3操作bucket
使用boto进行https的连接失败, validate_certs设置成True或False没有任何作用
用户1214487
2020/09/18
2.2K0
初试 Ceph 存储之块设备、文件系统、对象存储
哎_小羊
2018/01/02
6.7K0
初试 Ceph 存储之块设备、文件系统、对象存储
云端数据备份与恢复的最佳实践
在现代数字化生活中,数据的重要性不言而喻。无论是个人用户的照片和文档,还是企业的核心业务数据,意外丢失都会带来巨大的损失。作为一名技术从业者,我一直倡导使用云端技术来备份和恢复数据,因为它不仅可靠,而且在灾难发生时可以快速恢复数据。今天,我想分享一些关于云端数据备份与恢复的最佳实践,希望对你有所帮助。
Echo_Wish
2025/01/18
4470
云端数据备份与恢复的最佳实践
Ceph对象存储安装部署及验证
今天来玩下Ceph的对象存储,在开始之前呢,先扯会闲篇,我觉得生活中处处是非结构化数据,最简单的举例,下面两个行业,一个是直播,一个是摄影。
DevinGeng
2019/04/09
2.3K0
Ceph对象存储安装部署及验证
RGW Bucket Shard优化
bucket index是整个RGW里面一个非常关键的数据结构,用于存储bucket的索引数据,默认情况下单个bucket的index全部存储在一个shard文件(shard数量为0,主要以OMAP-keys方式存储在leveldb中),随着单个bucket内的Object数量增加,整个shard文件的体积也在不断增长,当shard文件体积过大就会引发各种问题。
Lucien168
2020/07/20
3.3K0
RGW Bucket Shard优化
查询bucket已用量脚本-python
目前仅支持ceph的s3方案,具体配置看说明 # -*- coding: utf-8 -*- import requests import json from email.utils import formatdate import hmac py3k = False from hashlib import sha1 as sha try: from urlparse import urlparse from base64 import encodestring except:
用户1260683
2018/01/31
2.4K1
如何使用亚马逊对象存储AWS S3 SDK访问腾讯云存储COS
COS 提供了 AWS S3 兼容的 API,因此当您的数据从 S3 迁移到 COS 之后,只需要进行简单的配置修改,即可让您的客户端应用轻松兼容 COS 服务。本文主要介绍不同开发平台的 S3 SDK 的适配步骤。在完成添加适配步骤后,您就可以使用 S3 SDK 的接口来访问 COS 上的文件了。
云存储
2020/05/26
4.5K0
10 张图详解 K8S 中部署 Ceph 与功能测试实战
前提是需要一个 k8s 环境,k8s 环境的部署可以参考这篇文章:32 张配图详解 K8S 1.24 高可用部署,保姆级详细版!
我的小碗汤
2023/03/20
3.5K0
10 张图详解 K8S 中部署 Ceph 与功能测试实战
Ceph分布式存储之对象存储
为了使用 REST 接口, 首先需要为 S3 接口初始化一个 Ceph 对象网关用户. 然后为 Swift 接口新建一个子用户.
yuanfan2012
2020/01/17
3.8K1
Ceph分布式存储之对象存储
Amazon S3 分布式存储的 python 接口实现
Amazon s3 是一种分布式的对象存储。用键值对的方式,来存储数据。其中,存入的所有数据都是一个对象(object),每一个对象都有一个键(key)存在。
用户2434869
2018/09/12
2.1K0
Tensorflow file_io的用法
S3 对象存储的使用越来越广泛,其中的好处就不多说了,这里用 Tensorflow 举个例子。
runzhliu
2020/08/06
1.1K0
Tensorflow file_io的用法
Ceph:关于Ceph 集群如何访问的一些笔记
对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是对大众理想的懦弱回归,是随波逐流,是对内心的恐惧 ——赫尔曼·黑塞《德米安》
山河已无恙
2023/08/21
6860
Ceph:关于Ceph 集群如何访问的一些笔记
AWS S3 学习小结
1.首先,这个是AWS的开发资源使用文档:AWS开发文档,AWS官网 – S3教程
全栈程序员站长
2022/11/03
1.9K0
RGW 服务端加密爬坑记
参考了官方文档,决定采用 Customer-Provided Keys(Amazon SSE-C)方式进行加密
用户1260683
2018/07/31
2.6K0
RGW 服务端加密爬坑记
相关推荐
Hugo博客搭建教程以及配置调优
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验