利用Python脚本下载阿里云rds实例备份,上传至oss

价格对比

  • RDS备份空间费用信息
    计费量=数据备份量(OSS+OAS)+日志备份量(OSS)- 50%*实例购买的存储空间(单位为 GB,只入不舍),价格为 0.001 元/GB/小时。
  • OSS归档型单价
    归档型存储,Object最短存储期限为60天,早于60天删除、修改、覆盖Object,需要补足未满60天的剩余天数的存储费用,超过60天不需要补。价格为0.033/GB/月

环境准备

安装sdk

安装对应sdk,查看sdk列表
以下脚本依赖的sdk版本

1
2
3
4
aliyun-python-sdk-core (2.3.5)
aliyun-python-sdk-core-v3 (2.5.2)
aliyun-python-sdk-rds (2.1.1)
oss2 (2.4.0)

申请Access Key

新建RAM账号,创建ak

脚本

aliyun_api_core.py

认证模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# -*- coding: UTF-8 -*-
import oss2
import json
from aliyunsdkcore import client


class aliyun_api_core(object):
def __init__(self):
self.clt = client.AcsClient(
ak="",
secret="")

self.ossauth = oss2.Auth(
access_key_id="",
access_key_secret="")

# oss访问地址
self.endpoint = 'http://oss-cn-hangzhou-internal.aliyuncs.com'

# bucket信息
self.bucket = oss2.Bucket(self.ossauth, self.endpoint, "bucket_name")

def request_api(self, request, *values):
if values:
for value in values:
for k, v in value.items():
request.add_query_param(k, v)
request.set_accept_format('json')
result = self.clt.do_action_with_exception(request)
return json.dumps(json.loads(result.decode('utf-8')), indent=4, sort_keys=False, ensure_ascii=False)


if __name__ == '__main__':
t = aliyun_api_core()

aliyun_api_oss.py

oss上传相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -*- coding: UTF-8 -*-
import oss2
from aliyun_api.aliyun_api_core import aliyun_api_core

core = aliyun_api_core()


class aliyun_api_oss(object):
def __init__(self):
self.ossauth = core.ossauth
self.endpoint = core.endpoint
self.bucket = core.bucket

def multithread_upload(self, backup_file_name, backup_file_path):
'''多线程上传,自动分片'''
key = backup_file_name
pathname = backup_file_path

# 判断文件是否存在,已存在的跳过
exist = self.bucket.object_exists(backup_file_name)
if exist:
pass
else:
oss2.resumable_upload(self.bucket, key, pathname, multipart_threshold=64 * 1024 * 1024, num_threads=4)


if __name__ == '__main__':
t = aliyun_api_oss()

aliyun_api_rds.py

rds备份文件查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# -*- coding: UTF-8 -*-
import json
from aliyun_api.aliyun_api_core import aliyun_api_core
from aliyunsdkrds.request.v20140815 import DescribeRegionsRequest, DescribeBackupsRequest, DescribeDBInstancesRequest, \
DescribeDBInstanceAttributeRequest, DescribeBackupPolicyRequest, DescribeBinlogFilesRequest

core = aliyun_api_core()

class aliyun_api_rds(object):
def __init__(self):
self.clt = core.clt
self.request_api = core.request_api

def DescribeRegions(self, **kwargs):
'''查询RDS地域和可用区信息'''
request = DescribeRegionsRequest.DescribeRegionsRequest()
values = {"action_name": "DescribeRegions"}
values = dict(values, **kwargs)
result = self.request_api(request, values)
return result

def DescribeDBInstances(self, RegionId="cn-hangzhou", **kwargs):
'''查看RDS实例列表'''
request = DescribeDBInstancesRequest.DescribeDBInstancesRequest()
values = {"action_name": "DescribeDBInstances", "RegionId": RegionId, "PageSize": 100}
values = dict(values, **kwargs)
result = self.request_api(request, values)
return result

def DescribeDBInstanceAttribute(self, DBInstanceId, **kwargs):
'''查看RDS实例详情'''
request = DescribeDBInstanceAttributeRequest.DescribeDBInstanceAttributeRequest()
values = {"action_name": "DescribeDBInstanceAttribute", "DBInstanceId": DBInstanceId}
values = dict(values, **kwargs)
result = self.request_api(request, values)
return result

def DescribeBackups(self, DBInstanceId, StartTime, EndTime, BackupStatus="Success", **kwargs):
'''查看RDS实例备份列表'''
request = DescribeBackupsRequest.DescribeBackupsRequest()
values = {"action_name": "DescribeBackups", "DBInstanceId": DBInstanceId,
"StartTime": StartTime,
"EndTime": EndTime,
"BackupStatus": BackupStatus}
values = dict(values, **kwargs)
result = self.request_api(request, values)
return result

def DescribeBinlogFiles(self, DBInstanceId, StartTime, EndTime, **kwargs):
'''查看RDS实例BINLOG备份列表'''
request = DescribeBinlogFilesRequest.DescribeBinlogFilesRequest()
values = {"action_name": "DescribeBinlogFiles", "DBInstanceId": DBInstanceId,
"StartTime": StartTime,
"EndTime": EndTime,
"PageSize": 100}
values = dict(values, **kwargs)
result = self.request_api(request, values)
return result



if __name__ == '__main__':
t = aliyun_api_rds()

backup.py

备份主模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# -*- coding:UTF-8 -*-
import os
import sys
import json
import datetime
import urllib.request
import logging
import logging.config

from aliyun_api.aliyun_api_rds import aliyun_api_rds
from aliyun_api.aliyun_api_oss import aliyun_api_oss

conf_filepath = os.path.join(os.path.dirname(__file__), 'log/logging.conf')
logging.config.fileConfig(conf_filepath)

api_rds = aliyun_api_rds()
api_oss = aliyun_api_oss()

# 本地路径
basedir = "/aliyun_backup/"
basedir_rds = os.path.join(basedir, 'RDS/')
basedir_rds_binlog = os.path.join(basedir, 'RDS/BINLOG/')

# 目标目录(在Oss中的目录)
ossdir_rds = "RDS/"
ossdir_rds_binlog = "RDS/BINLOG/"

# RDS区域id sys.argv[1]
RegionId = ""

today = datetime.date.today()

# 起止时间为前一天的0点到24点
StartTime = '%sT16:00Z' % (today - datetime.timedelta(days=2))
EndTime = '%sT15:59Z' % (today - datetime.timedelta(days=1))

BinLog_StartTime = '%sT16:00:00Z' % (today - datetime.timedelta(days=2))
BinLog_EndTime = '%sT15:59:59Z' % (today - datetime.timedelta(days=1))


def report_hook(count, block_size, total_size):
'''
下载进度显示回调函数
@count:已经下载的数据块
@block_size:数据块的大小
@total_size:远程文件的大小
'''
per = 100.0 * count * block_size / total_size
logging.info("%.2f%%" % (100.0 * count * block_size / total_size))


def time_format(str_time):
'''
阿里云2017-12-10T16:00:00Z时间格式上加上8小时时区
'''
Ymd = str_time.split('T')[0]
HMS = str_time.split('T')[1].split('Z')[0]
str_time = '%s %s' % (Ymd, HMS)
time = datetime.datetime.strptime(str_time, "%Y-%m-%d %H:%M:%S")
format_time = time + datetime.timedelta(hours=8)
return format_time


def download_backup_file(basedir, ossdir, DBInstanceId, backup_file_url, **kwargs):
'''
下载备份文件
'''
# 此处不替换貌似无法下载
if 'i-internal' in backup_file_url:
backup_file_url = backup_file_url.replace('i-internal', 'internal')
# 拼接文件名,否则BINLOG无法知道时间范围,不能增量还原
if basedir == basedir_rds_binlog:
LogBeginTime = time_format(kwargs['LogBeginTime']).strftime("%Y%m%d%H%M%S")
LogEndTime = time_format(kwargs['LogEndTime']).strftime("%Y%m%d%H%M%S")
backup_file_name = DBInstanceId + '_hins' + str(kwargs['HostInstanceID']) + '_' + LogBeginTime + '_' + LogEndTime + '_' + backup_file_url.rsplit('/', 1)[1].split('?')[0]
else:
backup_file_name = DBInstanceId + '_' + backup_file_url.rsplit('/', 1)[1].split('?')[0]
backup_file_dir = os.path.abspath(basedir + DBInstanceId)
logging.info(backup_file_url)
# 判断目录是否存在,不存在创建下载路径
if not os.path.exists(backup_file_dir):
os.makedirs(backup_file_dir)
backup_file_local_path = os.path.join(backup_file_dir, backup_file_name)
backup_file_remote_path = ossdir + DBInstanceId + '/' + backup_file_name
urllib.request.urlretrieve(backup_file_url, backup_file_local_path)
return backup_file_local_path, backup_file_remote_path


class backup_rds_cl(object):
def backup_rds(self, DBInstanceId, StartTime, EndTime):
'''
备份指定实例ID的备份数据到OSS
'''
# 获取RDS详情
DBInstance = json.loads(api_rds.DescribeDBInstanceAttribute(DBInstanceId))['Items']['DBInstanceAttribute'][0]

# 获取RDS实例的备份列表
DBInstanceBackupList = json.loads(api_rds.DescribeBackups(DBInstanceId, StartTime, EndTime))

# 遍历所有RDS备份
DBInstanceBackupCount = DBInstanceBackupList['TotalRecordCount'] # 时间范围内实例备份数
logging.info("(%s)实例id:%s,从%s到%s,共有%d个备份文件可以下载" % (
DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], StartTime, EndTime,
DBInstanceBackupCount))
if DBInstanceBackupCount > 0:
i = 0
for DBInstanceBackup in DBInstanceBackupList['Items']['Backup']:
i = i + 1
logging.info("(%s)实例id:%s,正在下载第%d个备份文件,请稍候..." % (
DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], i))
backup_file_url = DBInstanceBackup['BackupIntranetDownloadURL'] # 内网下载地址 BackupDownloadURL

# 下载文件到本地
ossdir = ossdir_rds
backup_file_local_path, backup_file_remote_path = \
download_backup_file(basedir_rds, ossdir, DBInstance['DBInstanceId'], backup_file_url)

logging.info("(%s)实例id:%s,第%d个备份文件下载完成,开始上传到OSS" % (
DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], i))
# 上传到OSS
api_oss.multithread_upload(backup_file_remote_path, backup_file_local_path)
logging.info(
"(%s)实例id:%s,第%d个备份文件上传完成" %
(DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], i))
# 删除上传成功的文件
os.remove(backup_file_local_path)
logging.info("(%s)实例id:%s,所有备份上传完成!!!" %
(DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId']))

def backup_rds_binlog(self, DBInstanceId, StartTime, EndTime):
'''
备份指定实例ID的BINLOG备份数据到OSS
'''
# 获取RDS详情
DBInstance = json.loads(api_rds.DescribeDBInstanceAttribute(DBInstanceId))['Items']['DBInstanceAttribute'][0]

# 获取RDS实例的binlog备份列表
DBInstanceBinlogBackupList = json.loads(api_rds.DescribeBinlogFiles(DBInstanceId, StartTime, EndTime))

# 遍历所有RDS备份
DBInstanceBackupCount = DBInstanceBinlogBackupList['TotalRecordCount'] # 时间范围内实例备份数
logging.info("(%s)实例id:%s,从%s到%s,共有%d个BINLOG备份文件可以下载" % (
DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], StartTime, EndTime,
DBInstanceBackupCount))
if DBInstanceBackupCount > 0:
i = 0
for DBInstanceBackup in DBInstanceBinlogBackupList['Items']['BinLogFile']:
i = i + 1
logging.info("(%s)实例id:%s,正在下载第%d个BINLOG备份文件,请稍候..." % (
DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], i))
backup_file_url = DBInstanceBackup['IntranetDownloadLink'] # 内网下载地址 DownloadLink\IntranetDownloadLink

# 下载文件到本地
backup_file_local_path, backup_file_remote_path = \
download_backup_file(basedir_rds_binlog, ossdir_rds_binlog, DBInstance['DBInstanceId'],
backup_file_url, HostInstanceID=DBInstanceBackup['HostInstanceID'],
LogBeginTime=DBInstanceBackup['LogBeginTime'],
LogEndTime=DBInstanceBackup['LogEndTime'])

logging.info("(%s)实例id:%s,第%d个BINLOG备份文件下载完成,开始上传到OSS" % (
DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], i))

# 上传到OSS
api_oss.multithread_upload(backup_file_remote_path, backup_file_local_path)
logging.info(
"(%s)实例id:%s,第%d个BINLOG备份文件上传完成" %
(DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId'], i))
# 删除上传成功的文件
os.remove(backup_file_local_path)
logging.info("(%s)实例id:%s,所有BINLOG备份上传完成!!!" %
(DBInstance['DBInstanceDescription'], DBInstance['DBInstanceId']))

def backup_all_rds(self, RegionId, StartTime, EndTime):
'''
备份全部实例ID的备份数据到OSS
'''
# 获取RDS实例列表
DescribeDBInstances = json.loads(api_rds.DescribeDBInstances(RegionId))

# 遍历所有RDS实例
DBInstanceCount = DescribeDBInstances['TotalRecordCount'] # 区域内实例数,如果多区域可遍历
if DBInstanceCount > 0:
for DBInstance in DescribeDBInstances['Items']['DBInstance']:
self.backup_rds(DBInstance['DBInstanceId'], StartTime, EndTime)
logging.info("所有实例的备份全部上传完成!!!")

def backup_allbinlog_rds(self, RegionId, StartTime, EndTime):
'''
备份全部实例ID的BILONG备份数据到OSS
'''

# 获取RDS实例列表
DescribeDBInstances = json.loads(api_rds.DescribeDBInstances(RegionId, Engine='MySQL'))

# 遍历所有RDS实例
DBInstanceCount = DescribeDBInstances['TotalRecordCount'] # 区域内实例数,如果多区域可遍历
if DBInstanceCount > 0:
for DBInstance in DescribeDBInstances['Items']['DBInstance']:
self.backup_rds_binlog(DBInstance['DBInstanceId'], StartTime, EndTime)
logging.info("所有实例的BINLOG备份全部上传完成!!!")

if __name__ == '__main__':
# 备份全部rds实例
rds = backup_rds_cl()
logging.info("=================开始上传rds实例备份================")
rds.backup_all_rds(RegionId, StartTime, EndTime)
logging.info("=================开始上传rds实例BINLOG备份================")
rds.backup_allbinlog_rds(RegionId, BinLog_StartTime, BinLog_EndTime)

logging.conf

log配置文件,记录备份详细情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# logging.conf
[loggers]
keys=root

[logger_root]
level=INFO
handlers=consoleHandler,fileHandler

#################################################
[handlers]
keys=consoleHandler,fileHandler

[handler_consoleHandler]
class=StreamHandler
level=INFO
formatter=simpleFormatter
args=(sys.stdout,)

[handler_fileHandler]
class=FileHandler
level=INFO
formatter=simpleFormatter
args=('log/logger.log','a','utf-8')

#################################################
[formatters]
keys=simpleFormatter

[formatter_simpleFormatter]
format=%(asctime)s %(filename)s : %(levelname)s %(message)s
datefmt=%Y-%m-%d %A %H:%M:%S