cos_op.py 24 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import os
  4. import time
  5. import json
  6. import hashlib
  7. import urllib
  8. from contextlib import closing
  9. import cos_auth
  10. from cos_err import CosErr
  11. from cos_request import UploadFileRequest
  12. from cos_request import UploadSliceFileRequest
  13. from cos_request import UpdateFileRequest
  14. from cos_request import DelFileRequest
  15. from cos_request import StatFileRequest
  16. from cos_request import CreateFolderRequest
  17. from cos_request import UpdateFolderRequest
  18. from cos_request import StatFolderRequest
  19. from cos_request import DelFolderRequest
  20. from cos_request import ListFolderRequest, DownloadFileRequest, MoveFileRequest
  21. from cos_common import Sha1Util
  22. from logging import getLogger
  23. from traceback import format_exc
  24. logger = getLogger(__name__)
  25. class BaseOp(object):
  26. """
  27. BaseOp基本操作类型
  28. """
  29. def __init__(self, cred, config, http_session):
  30. """ 初始化类
  31. :param cred: 用户的身份信息
  32. :param config: cos_config配置类
  33. :param http_session: http 会话
  34. """
  35. self._cred = cred
  36. self._config = config
  37. self._http_session = http_session
  38. self._expired_period = self._config.get_sign_expired()
  39. def set_cred(self, cred):
  40. """设置用户的身份信息
  41. :param cred:
  42. :return:
  43. """
  44. self._cred = cred
  45. def set_config(self, config):
  46. """ 设置config
  47. :param config:
  48. :return:
  49. """
  50. self._config = config
  51. self._expired_period = self._config.get_sign_expired()
  52. def _build_url(self, bucket, cos_path):
  53. """生成url
  54. :param bucket:
  55. :param cos_path:
  56. :return:
  57. """
  58. bucket = bucket.encode('utf8')
  59. end_point = self._config.get_endpoint().rstrip('/').encode('utf8')
  60. appid = self._cred.get_appid()
  61. cos_path = urllib.quote(cos_path.encode('utf8'), '~/')
  62. url = '%s/%s/%s%s' % (end_point, appid, bucket, cos_path)
  63. return url
  64. def build_download_url(self, bucket, cos_path, sign):
  65. # Only support http now
  66. appid = self._cred.get_appid()
  67. hostname = self._config.get_download_hostname()
  68. cos_path = urllib.quote(cos_path)
  69. url_tmpl = 'http://{bucket}-{appid}.{hostname}{cos_path}?sign={sign}'
  70. return url_tmpl.format(bucket=bucket, appid=appid, hostname=hostname, cos_path=cos_path, sign=sign)
  71. def send_request(self, method, bucket, cos_path, **kwargs):
  72. """ 发送http请求
  73. :param method:
  74. :param bucket:
  75. :param cos_path:
  76. :param args:
  77. :return:
  78. """
  79. url = self._build_url(bucket, cos_path)
  80. logger.debug("sending request, method: %s, bucket: %s, cos_path: %s" % (method, bucket, cos_path))
  81. try:
  82. if method == 'POST':
  83. http_resp = self._http_session.post(url, verify=False, **kwargs)
  84. else:
  85. http_resp = self._http_session.get(url, verify=False, **kwargs)
  86. status_code = http_resp.status_code
  87. if status_code == 200 or status_code == 400:
  88. return http_resp.json()
  89. else:
  90. logger.warning("request failed, response message: %s" % http_resp.text)
  91. err_detail = 'url:%s, status_code:%d' % (url, status_code)
  92. return CosErr.get_err_msg(CosErr.NETWORK_ERROR, err_detail)
  93. except Exception as e:
  94. logger.exception("request failed, return SERVER_ERROR")
  95. err_detail = 'url:%s, exception:%s traceback:%s' % (url, str(e), format_exc())
  96. return CosErr.get_err_msg(CosErr.SERVER_ERROR, err_detail)
  97. def _check_params(self, request):
  98. """检查用户输入参数, 检查通过返回None, 否则返回一个代表错误原因的dict
  99. :param request:
  100. :return:
  101. """
  102. if not self._cred.check_params_valid():
  103. return CosErr.get_err_msg(CosErr.PARAMS_ERROR, self._cred.get_err_tips())
  104. if not request.check_params_valid():
  105. return CosErr.get_err_msg(CosErr.PARAMS_ERROR, request.get_err_tips())
  106. return None
  107. def del_base(self, request):
  108. """删除文件或者目录, is_file_op为True表示是文件操作
  109. :param request:
  110. :return:
  111. """
  112. check_params_ret = self._check_params(request)
  113. if check_params_ret is not None:
  114. return check_params_ret
  115. auth = cos_auth.Auth(self._cred)
  116. bucket = request.get_bucket_name()
  117. cos_path = request.get_cos_path()
  118. sign = auth.sign_once(bucket, cos_path)
  119. http_header = dict()
  120. http_header['Authorization'] = sign
  121. http_header['Content-Type'] = 'application/json'
  122. http_header['User-Agent'] = self._config.get_user_agent()
  123. http_body = {'op': 'delete'}
  124. timeout = self._config.get_timeout()
  125. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  126. def stat_base(self, request):
  127. """获取文件和目录的属性
  128. :param request:
  129. :return:
  130. """
  131. check_params_ret = self._check_params(request)
  132. if check_params_ret is not None:
  133. return check_params_ret
  134. auth = cos_auth.Auth(self._cred)
  135. bucket = request.get_bucket_name()
  136. cos_path = request.get_cos_path()
  137. expired = int(time.time()) + self._expired_period
  138. sign = auth.sign_more(bucket, cos_path, expired)
  139. http_header = dict()
  140. http_header['Authorization'] = sign
  141. http_header['User-Agent'] = self._config.get_user_agent()
  142. http_body = dict()
  143. http_body['op'] = 'stat'
  144. timeout = self._config.get_timeout()
  145. return self.send_request('GET', bucket, cos_path, headers=http_header, params=http_body, timeout=timeout)
  146. class FileOp(BaseOp):
  147. """FileOp 文件相关操作"""
  148. def __init__(self, cred, config, http_session):
  149. """ 初始化类
  150. :param cred: 用户的身份信息
  151. :param config: cos_config配置类
  152. :param http_session: http 会话
  153. """
  154. BaseOp.__init__(self, cred, config, http_session)
  155. # 单文件上传的最大上限是20MB
  156. self.max_single_file = 20 * 1024 * 1024
  157. @staticmethod
  158. def _sha1_content(content):
  159. """获取content的sha1
  160. :param content:
  161. :return:
  162. """
  163. sha1_obj = hashlib.sha1()
  164. sha1_obj.update(content)
  165. return sha1_obj.hexdigest()
  166. def update_file(self, request):
  167. """更新文件
  168. :param request:
  169. :return:
  170. """
  171. assert isinstance(request, UpdateFileRequest)
  172. logger.debug("request: " + str(request.get_custom_headers()))
  173. check_params_ret = self._check_params(request)
  174. if check_params_ret is not None:
  175. return check_params_ret
  176. logger.debug("params verify successfully")
  177. auth = cos_auth.Auth(self._cred)
  178. bucket = request.get_bucket_name()
  179. cos_path = request.get_cos_path()
  180. sign = auth.sign_once(bucket, cos_path)
  181. http_header = dict()
  182. http_header['Authorization'] = sign
  183. http_header['Content-Type'] = 'application/json'
  184. http_header['User-Agent'] = self._config.get_user_agent()
  185. http_body = dict()
  186. http_body['op'] = 'update'
  187. if request.get_biz_attr() is not None:
  188. http_body['biz_attr'] = request.get_biz_attr()
  189. if request.get_authority() is not None:
  190. http_body['authority'] = request.get_authority()
  191. if request.get_custom_headers() is not None and len(request.get_custom_headers()) is not 0:
  192. http_body['custom_headers'] = request.get_custom_headers()
  193. logger.debug("Update Request Header: " + json.dumps(http_body))
  194. timeout = self._config.get_timeout()
  195. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  196. def del_file(self, request):
  197. """删除文件
  198. :param request:
  199. :return:
  200. """
  201. assert isinstance(request, DelFileRequest)
  202. return self.del_base(request)
  203. def stat_file(self, request):
  204. """获取文件的属性
  205. :param request:
  206. :return:
  207. """
  208. assert isinstance(request, StatFileRequest)
  209. return self.stat_base(request)
  210. def upload_file(self, request):
  211. """上传文件, 根据用户的文件大小,选择单文件上传和分片上传策略
  212. :param request:
  213. :return:
  214. """
  215. assert isinstance(request, UploadFileRequest)
  216. check_params_ret = self._check_params(request)
  217. if check_params_ret is not None:
  218. return check_params_ret
  219. local_path = request.get_local_path()
  220. file_size = os.path.getsize(local_path)
  221. suit_single_file_zie = 8 * 1024 * 1024
  222. if file_size < suit_single_file_zie:
  223. return self.upload_single_file(request)
  224. else:
  225. bucket = request.get_bucket_name()
  226. cos_path = request.get_cos_path()
  227. local_path = request.get_local_path()
  228. slice_size = 1024 * 1024
  229. biz_attr = request.get_biz_attr()
  230. upload_slice_request = UploadSliceFileRequest(bucket, cos_path, local_path, slice_size, biz_attr)
  231. upload_slice_request.set_insert_only(request.get_insert_only())
  232. return self.upload_slice_file(upload_slice_request)
  233. def upload_single_file(self, request):
  234. """ 单文件上传
  235. :param request:
  236. :return:
  237. """
  238. assert isinstance(request, UploadFileRequest)
  239. check_params_ret = self._check_params(request)
  240. if check_params_ret is not None:
  241. return check_params_ret
  242. local_path = request.get_local_path()
  243. file_size = os.path.getsize(local_path)
  244. # 判断文件是否超过单文件最大上限, 如果超过则返回错误
  245. # 并提示用户使用别的接口
  246. if file_size > self.max_single_file:
  247. return CosErr.get_err_msg(CosErr.NETWORK_ERROR, 'file is too big, please use upload_file interface')
  248. auth = cos_auth.Auth(self._cred)
  249. bucket = request.get_bucket_name()
  250. cos_path = request.get_cos_path()
  251. expired = int(time.time()) + self._expired_period
  252. sign = auth.sign_more(bucket, cos_path, expired)
  253. http_header = dict()
  254. http_header['Authorization'] = sign
  255. http_header['User-Agent'] = self._config.get_user_agent()
  256. with open(local_path, 'rb') as f:
  257. file_content = f.read()
  258. http_body = dict()
  259. http_body['op'] = 'upload'
  260. http_body['filecontent'] = file_content
  261. http_body['sha'] = FileOp._sha1_content(file_content)
  262. http_body['biz_attr'] = request.get_biz_attr()
  263. http_body['insertOnly'] = str(request.get_insert_only())
  264. timeout = self._config.get_timeout()
  265. ret = self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  266. if request.get_insert_only() != 0:
  267. return ret
  268. if ret[u'code'] == 0:
  269. return ret
  270. # try to delete object, and re-post request
  271. del_request = DelFileRequest(bucket_name=request.get_bucket_name(), cos_path=request.get_cos_path())
  272. ret = self.del_file(del_request)
  273. if ret[u'code'] == 0:
  274. return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  275. else:
  276. return ret
  277. def _upload_slice_file(self, request):
  278. assert isinstance(request, UploadSliceFileRequest)
  279. check_params_ret = self._check_params(request)
  280. if check_params_ret is not None:
  281. return check_params_ret
  282. local_path = request.get_local_path()
  283. slice_size = request.get_slice_size()
  284. enable_sha1 = request.enable_sha1
  285. if enable_sha1 is True:
  286. sha1_by_slice_list = Sha1Util.get_sha1_by_slice(local_path, slice_size)
  287. request.sha1_list = sha1_by_slice_list
  288. request.sha1_content = sha1_by_slice_list[-1]["datasha"]
  289. else:
  290. request.sha1_list = None
  291. request.sha1_content = None
  292. control_ret = self._upload_slice_control(request)
  293. # 表示控制分片已经产生错误信息
  294. if control_ret[u'code'] != 0:
  295. return control_ret
  296. # 命中秒传
  297. if u'access_url' in control_ret[u'data']:
  298. return control_ret
  299. local_path = request.get_local_path()
  300. file_size = os.path.getsize(local_path)
  301. slice_size = control_ret[u'data'][u'slice_size']
  302. offset = 0
  303. session = control_ret[u'data'][u'session']
  304. # ?concurrency
  305. if request._max_con <= 1 or (
  306. u'serial_upload' in control_ret[u'data'] and control_ret[u'data'][u'serial_upload'] == 1):
  307. logger.info("upload file serially")
  308. slice_idx = 0
  309. with open(local_path, 'rb') as local_file:
  310. while offset < file_size:
  311. file_content = local_file.read(slice_size)
  312. data_ret = self._upload_slice_data(request, file_content, session, offset)
  313. if data_ret[u'code'] == 0:
  314. if u'access_url' in data_ret[u'data']:
  315. return data_ret
  316. else:
  317. return data_ret
  318. offset += slice_size
  319. slice_idx += 1
  320. else:
  321. logger.info('upload file concurrently')
  322. from threadpool import SimpleThreadPool
  323. pool = SimpleThreadPool(request._max_con)
  324. slice_idx = 0
  325. with open(local_path, 'rb') as local_file:
  326. while offset < file_size:
  327. file_content = local_file.read(slice_size)
  328. pool.add_task(self._upload_slice_data, request, file_content, session, offset)
  329. offset += slice_size
  330. slice_idx += 1
  331. pool.wait_completion()
  332. result = pool.get_result()
  333. if not result['success_all']:
  334. return {u'code': 1, u'message': str(result)}
  335. data_ret = self._upload_slice_finish(request, session, file_size)
  336. return data_ret
  337. def upload_slice_file(self, request):
  338. """分片文件上传(串行)
  339. :param request:
  340. :return:
  341. """
  342. ret = self._upload_slice_file(request)
  343. if ret[u'code'] == 0:
  344. return ret
  345. if request.get_insert_only() == 0:
  346. del_request = DelFileRequest(request.get_bucket_name(), request.get_cos_path())
  347. ret = self.del_file(del_request)
  348. if ret[u'code'] == 0:
  349. return self._upload_slice_file(request)
  350. else:
  351. return ret
  352. else:
  353. return ret
  354. def _upload_slice_finish(self, request, session, filesize):
  355. auth = cos_auth.Auth(self._cred)
  356. bucket = request.get_bucket_name()
  357. cos_path = request.get_cos_path()
  358. expired = int(time.time()) + self._expired_period
  359. sign = auth.sign_more(bucket, cos_path, expired)
  360. http_header = dict()
  361. http_header['Authorization'] = sign
  362. http_header['User-Agent'] = self._config.get_user_agent()
  363. http_body = dict()
  364. http_body['op'] = "upload_slice_finish"
  365. http_body['session'] = session
  366. http_body['filesize'] = str(filesize)
  367. if request.sha1_list is not None:
  368. http_body['sha'] = request.sha1_list[-1]["datasha"]
  369. timeout = self._config.get_timeout()
  370. return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  371. def _upload_slice_control(self, request):
  372. """串行分片第一步, 上传控制分片
  373. :param request:
  374. :return:
  375. """
  376. auth = cos_auth.Auth(self._cred)
  377. bucket = request.get_bucket_name()
  378. cos_path = request.get_cos_path()
  379. expired = int(time.time()) + self._expired_period
  380. sign = auth.sign_more(bucket, cos_path, expired)
  381. http_header = dict()
  382. http_header['Authorization'] = sign
  383. http_header['User-Agent'] = self._config.get_user_agent()
  384. local_path = request.get_local_path()
  385. file_size = os.path.getsize(local_path)
  386. slice_size = request.get_slice_size()
  387. biz_atrr = request.get_biz_attr()
  388. http_body = dict()
  389. http_body['op'] = 'upload_slice_init'
  390. if request.enable_sha1:
  391. http_body['sha'] = request.sha1_list[-1]["datasha"]
  392. http_body['uploadparts'] = json.dumps(request.sha1_list)
  393. http_body['filesize'] = str(file_size)
  394. http_body['slice_size'] = str(slice_size)
  395. http_body['biz_attr'] = biz_atrr
  396. http_body['insertOnly'] = str(request.get_insert_only())
  397. timeout = self._config.get_timeout()
  398. return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  399. def _upload_slice_data(self, request, file_content, session, offset, retry=3):
  400. """串行分片第二步, 上传数据分片
  401. :param request:
  402. :param file_content:
  403. :param session:
  404. :param offset:
  405. :return:
  406. """
  407. bucket = request.get_bucket_name()
  408. cos_path = request.get_cos_path()
  409. auth = cos_auth.Auth(self._cred)
  410. expired = int(time.time()) + self._expired_period
  411. sign = auth.sign_more(bucket, cos_path, expired)
  412. http_header = dict()
  413. http_header['Authorization'] = sign
  414. http_header['User-Agent'] = self._config.get_user_agent()
  415. http_body = dict()
  416. http_body['op'] = 'upload_slice_data'
  417. http_body['filecontent'] = file_content
  418. http_body['session'] = session
  419. http_body['offset'] = str(offset)
  420. if request.sha1_content is not None:
  421. http_body['sha'] = request.sha1_content
  422. timeout = self._config.get_timeout()
  423. for _ in range(retry):
  424. ret = self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  425. if ret['code'] == 0:
  426. return ret
  427. else:
  428. return ret
  429. def __download_url(self, uri, filename):
  430. session = self._http_session
  431. with closing(session.get(uri, stream=True, timeout=150)) as ret:
  432. if ret.status_code in [200, 206]:
  433. if 'Content-Length' in ret.headers:
  434. content_len = int(ret.headers['Content-Length'])
  435. else:
  436. raise IOError("download failed without Content-Length header")
  437. file_len = 0
  438. with open(filename, 'wb') as f:
  439. for chunk in ret.iter_content(chunk_size=1024):
  440. if chunk:
  441. file_len += len(chunk)
  442. f.write(chunk)
  443. f.flush()
  444. if file_len != content_len:
  445. raise IOError("download failed with incomplete file")
  446. else:
  447. raise IOError("download failed with status code:" + str(ret.status_code))
  448. def download_file(self, request):
  449. assert isinstance(request, DownloadFileRequest)
  450. auth = cos_auth.Auth(self._cred)
  451. sign = auth.sign_download(request.get_bucket_name(), request.get_cos_path(), self._config.get_sign_expired())
  452. url = self.build_download_url(request.get_bucket_name(), request.get_cos_path(), sign)
  453. logger.info("Uri is %s" % url)
  454. try:
  455. self.__download_url(url, request._local_filename)
  456. return {u'code': 0, u'message': "download successfully"}
  457. except Exception as e:
  458. return {u'code': 1, u'message': "download failed, exception: " + str(e)}
  459. def __move_file(self, request):
  460. auth = cos_auth.Auth(self._cred)
  461. bucket = request.get_bucket_name()
  462. cos_path = request.get_cos_path()
  463. sign = auth.sign_once(bucket, cos_path)
  464. http_header = dict()
  465. http_header['Authorization'] = sign
  466. http_header['User-Agent'] = self._config.get_user_agent()
  467. http_body = dict()
  468. http_body['op'] = 'move'
  469. http_body['dest_fileid'] = request.dest_path
  470. http_body['to_over_write'] = str(1 if request.overwrite else 0)
  471. timeout = self._config.get_timeout()
  472. return self.send_request('POST', bucket, cos_path, headers=http_header, params=http_body, timeout=timeout)
  473. def move_file(self, request):
  474. assert isinstance(request, MoveFileRequest)
  475. return self.__move_file(request)
  476. class FolderOp(BaseOp):
  477. """FolderOp 目录相关操作"""
  478. def __init__(self, cred, config, http_session):
  479. BaseOp.__init__(self, cred, config, http_session)
  480. def update_folder(self, request):
  481. """更新目录
  482. :param request:
  483. :return:
  484. """
  485. assert isinstance(request, UpdateFolderRequest)
  486. check_params_ret = self._check_params(request)
  487. if check_params_ret is not None:
  488. return check_params_ret
  489. auth = cos_auth.Auth(self._cred)
  490. bucket = request.get_bucket_name()
  491. cos_path = request.get_cos_path()
  492. sign = auth.sign_once(bucket, cos_path)
  493. http_header = dict()
  494. http_header['Authorization'] = sign
  495. http_header['Content-Type'] = 'application/json'
  496. http_header['User-Agent'] = self._config.get_user_agent()
  497. http_body = dict()
  498. http_body['op'] = 'update'
  499. http_body['biz_attr'] = request.get_biz_attr()
  500. timeout = self._config.get_timeout()
  501. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  502. def del_folder(self, request):
  503. """删除目录
  504. :param request:
  505. :return:
  506. """
  507. assert isinstance(request, DelFolderRequest)
  508. return self.del_base(request)
  509. def stat_folder(self, request):
  510. """获取目录属性
  511. :param request:
  512. :return:
  513. """
  514. assert isinstance(request, StatFolderRequest)
  515. return self.stat_base(request)
  516. def create_folder(self, request):
  517. """创建目录
  518. :param request:
  519. :return:
  520. """
  521. assert isinstance(request, CreateFolderRequest)
  522. check_params_ret = self._check_params(request)
  523. if check_params_ret is not None:
  524. return check_params_ret
  525. auth = cos_auth.Auth(self._cred)
  526. bucket = request.get_bucket_name()
  527. cos_path = request.get_cos_path()
  528. expired = int(time.time()) + self._expired_period
  529. sign = auth.sign_more(bucket, cos_path, expired)
  530. http_header = dict()
  531. http_header['Authorization'] = sign
  532. http_header['Content-Type'] = 'application/json'
  533. http_header['User-Agent'] = self._config.get_user_agent()
  534. http_body = dict()
  535. http_body['op'] = 'create'
  536. http_body['biz_attr'] = request.get_biz_attr()
  537. timeout = self._config.get_timeout()
  538. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  539. def list_folder(self, request):
  540. """list目录
  541. :param request:
  542. :return:
  543. """
  544. assert isinstance(request, ListFolderRequest)
  545. check_params_ret = self._check_params(request)
  546. if check_params_ret is not None:
  547. return check_params_ret
  548. http_body = dict()
  549. http_body['op'] = 'list'
  550. http_body['num'] = request.get_num()
  551. http_body['context'] = request.get_context()
  552. auth = cos_auth.Auth(self._cred)
  553. bucket = request.get_bucket_name()
  554. list_path = request.get_cos_path() + request.get_prefix()
  555. expired = int(time.time()) + self._expired_period
  556. sign = auth.sign_more(bucket, list_path, expired)
  557. http_header = dict()
  558. http_header['Authorization'] = sign
  559. http_header['User-Agent'] = self._config.get_user_agent()
  560. timeout = self._config.get_timeout()
  561. return self.send_request('GET', bucket, list_path, headers=http_header, params=http_body, timeout=timeout)