前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用Python调用云Api在阿里云申请免费证书并同步到腾讯云

利用Python调用云Api在阿里云申请免费证书并同步到腾讯云

原创
作者头像
小宇-xiaoyu
发布2022-07-05 13:41:31
4.8K0
发布2022-07-05 13:41:31
举报
文章被收录于专栏:玩转Lighthouse.

腾讯云证书免费额度用完了怎么办? 调用云api 从阿里云申请证书并托管到腾讯云

0.准备工作

使用本代码请先进行子用户创建

前往创建子用户(腾讯云):https://console.cloud.tencent.com/cam

授权DNSPod 全读写访问权限、SSL证书(SSL)全读写访问权限

前往创建子用户(阿里云):https://ram.console.aliyun.com/users

授权管理云盾证书服务的权限(AliyunYundunCertFullAccess)

请注意 为了保障您的账户以及云上资产的安全 请谨慎保管SecretId 与 SecretKey 并定期更新 删除无用权限

1.SDK下载

请确保Python版本为3.6+

查看Python版本

代码语言:javascript
复制
python3 -V

安装腾讯云Python SDK

代码语言:javascript
复制
pip install -i https://mirrors.tencent.com/pypi/simple/ --upgrade tencentcloud-sdk-python

安装阿里云SDK

代码语言:javascript
复制
pip install -i https://mirrors.aliyun.com/pypi/simple/ --upgrade alibabacloud_tea_openapi alibabacloud_cas20180713==1.0.1 alibabacloud_cas20200407==1.0.5

2.代码部分

别忘了去创建阿里云的免费证书!

代码语言:javascript
复制
# -*- coding: utf-8 -*-

import json
import tencentcloud.ssl.v20191205.ssl_client
import tencentcloud.ssl.v20191205.models
from time import sleep
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.dnspod.v20210323 import dnspod_client, models
from alibabacloud_cas20180713.client import Client as cas20180713Client
from alibabacloud_cas20180713 import models as cas_20180713_models
from alibabacloud_cas20200407.client import Client as cas20200407Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_cas20200407 import models as cas_20200407_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


# aliyun
access_key_id = ''
access_key_secret = ''
# tencentcloud
SecretId = ''
SecretKey = ''


class Ver2020:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
    ) -> cas20200407Client:

        config = open_api_models.Config(
            # 您的 AccessKey ID,
            access_key_id=access_key_id,
            # 您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # 访问的域名
        config.endpoint = f'cas.aliyuncs.com'
        return cas20200407Client(config)

    @staticmethod
    def main():
        client = Ver2020.create_client()
        list_user_certificate_order_request = cas_20200407_models.ListUserCertificateOrderRequest()
        runtime = util_models.RuntimeOptions()
        cert_id = ''
        error_1 = 0
        try:
            # 复制代码运行请自行打印 API 的返回值
            response = client.list_user_certificate_order_with_options(list_user_certificate_order_request, runtime)
            try:
                for i in range(len(response.body.certificate_order_list)+1):
                    if response.body.certificate_order_list[i].domain == '':
                        cert_id = response.body.certificate_order_list[i].instance_id
                        print('获取证书id成功:'+cert_id)
                        break
            except IndexError:
                error_1 = 1
                print('没有空闲的证书,请尝试去阿里云控制台申请免费额度或切换账号')
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)
        return cert_id, error_1


class Ver2018:
    def __init__(self):
        pass

    @staticmethod
    def create_client(
            access_key_id: str,
            access_key_secret: str,
    ) -> cas20180713Client:

        config = open_api_models.Config(
            # 您的 AccessKey ID,
            access_key_id=access_key_id,
            # 您的 AccessKey Secret,
            access_key_secret=access_key_secret
        )
        # 访问的域名
        config.endpoint = f'cas.aliyuncs.com'
        return cas20180713Client(config)

    @staticmethod
    def main(cert_id, domian):
        client = Ver2018.create_client('accessKeyId', 'accessKeySecret')
        create_dvorder_audit_request = cas_20180713_models.CreateDVOrderAuditRequest(
            lang='zh',
            instance_id='{0}'.format(cert_id),
            domain='{0}'.format(domian),
            domain_verify_type='DNS',
            username='zhangsan',
            email='i@{0}'.format(domian),
            mobile='13888888888',
            province='zhejiang',
            city='hangzhou'
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            response = client.create_dvorder_audit_with_options(create_dvorder_audit_request, runtime)
            print('已提交申请 等待获取Dns记录值')
        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)

    @staticmethod
    def get_order_result(order_id, result):
        check_name = ''
        check_value = ''
        private_key = ''
        certificate = ''
        client = Ver2018.create_client('accessKeyId', 'accessKeySecret')
        describe_dvorder_result_request = cas_20180713_models.DescribeDVOrderResultRequest(
            instance_id=order_id
        )
        runtime = util_models.RuntimeOptions()
        try:
            # 复制代码运行请自行打印 API 的返回值
            response = client.describe_dvorder_result_with_options(describe_dvorder_result_request, runtime)
            if response.body.order_status == 'checking':
                check_name = response.body.check_name
                check_value = response.body.check_value
                print(result)
            elif response.body.order_status == 'issued':
                private_key = response.body.private_key
                certificate = response.body.certificate
                print('证书公私钥获取成功,等待上传至腾讯云证书托管')

        except Exception as error:
            # 如有需要,请打印 error
            UtilClient.assert_as_string(error.message)

        return certificate, private_key, check_value, check_name


class QCloud:
    def __init__(self):
        pass

    @staticmethod
    def add_dns_record(main_domian, sub_domian, dns_value):
        try:
            cred = credential.Credential("{0}".format(SecretId), "{0}".format(SecretKey))
            httpProfile = HttpProfile()
            httpProfile.endpoint = "dnspod.tencentcloudapi.com"

            clientProfile = ClientProfile()
            clientProfile.httpProfile = httpProfile
            client = dnspod_client.DnspodClient(cred, "", clientProfile)

            req = models.CreateRecordRequest()
            params = {
                "Domain": main_domian,
                "SubDomain": sub_domian,
                "RecordType": 'TXT',
                "RecordLine": "默认",
                "Value": dns_value
            }
            req.from_json_string(json.dumps(params))

            resp = client.CreateRecord(req)
            print('解析添加成功! '+resp.to_json_string())

        except TencentCloudSDKException as err:
            print(err)

    @staticmethod
    def upload_cert(public_key, private_key, cert_name):
        try:
            cred = credential.Credential("{0}".format(SecretId), "{0}".format(SecretKey))
            httpProfile = HttpProfile()
            httpProfile.endpoint = "ssl.tencentcloudapi.com"

            clientProfile = ClientProfile()
            clientProfile.httpProfile = httpProfile
            client = tencentcloud.ssl.v20191205.ssl_client.SslClient(cred, "", clientProfile)

            req = tencentcloud.ssl.v20191205.models.UploadCertificateRequest()
            params = {
                "CertificatePublicKey": public_key,
                "CertificatePrivateKey": private_key,
                "Alias": cert_name
            }
            req.from_json_string(json.dumps(params))

            resp = client.UploadCertificate(req)
            print('上传成功!  '+resp.to_json_string())
        except TencentCloudSDKException as err:
            print(err)


if __name__ == '__main__':
    # tmd 名字太难起了 你们凑合看吧 
    # 获取证书id,如果没有空闲证书a[1]返回1,如果有空闲证书则返回0(出现其他错误也返回1)
    a = Ver2020.main()
    # 如果有空闲证书则执行
    if a[1] == 0:
        # 要申请证书的域名
        b = input('domian=')
        # 提交申请
        Ver2018.main(a[0], b)
        # 5秒后获取Dns验证参数
        sleep(5)
        # 这就是个提示语
        i = '等待添加解析'
        # return certificate(公钥), private_key(私钥), check_value(记录值), check_name(主机记录)
        c = Ver2018.get_order_result(a, i)

        # 获取主域名
        d = b.split('.')[-2]+'.' + b.split('.')[-1]
        # 避免重复添加
        f = []
        while True:
            # 这就是个提示语
            if len(f) == 1:
                j = '等待解析生效(预计1~2分钟)....'
            else:
                j = '......'

            c = Ver2018.get_order_result(a, j)
            # 判断是否返回公钥
            if c[0] == '':
                # 是否为首次添加dns解析
                if len(f) == 0:
                    # 主机记录
                    e = c[3].replace('.'+d, '')
                    # print(e)
                    # 提交解析
                    QCloud.add_dns_record(d, e, c[2])
                    # 避免重复添加解析
                    f.append('1')
                    # 重复循环
                    continue
                else:
                    # 若仍未获取到公钥 休息10s 重复此操作 直到获取到公钥(约1min左右)
                    sleep(40)
                    continue
            else:
                # 公钥
                g = c[0]
                # 私钥
                h = c[1]

                # 上传到腾讯云进行证书托管 参数为 公钥 私钥 域名
                QCloud.upload_cert(g, h, b)
                # 结束循环
                break

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0.准备工作
  • 1.SDK下载
  • 2.代码部分
相关产品与服务
云 API
云 API 是腾讯云开放生态的基石。通过云 API,只需少量的代码即可快速操作云产品;在熟练的情况下,使用云 API 完成一些频繁调用的功能可以极大提高效率;除此之外,通过 API 可以组合功能,实现更高级的功能,易于自动化, 易于远程调用, 兼容性强,对系统要求低。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档