cos_op.py 24 KB


  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import os
  4. import time
  5. import json
  6. import hashlib
  7. import urllib
  8. from contextlib import closing
  9. import cos_auth
  10. from cos_err import CosErr
  11. from cos_request import UploadFileRequest
  12. from cos_request import UploadSliceFileRequest
  13. from cos_request import UpdateFileRequest
  14. from cos_request import DelFileRequest
  15. from cos_request import StatFileRequest
  16. from cos_request import CreateFolderRequest
  17. from cos_request import UpdateFolderRequest
  18. from cos_request import StatFolderRequest
  19. from cos_request import DelFolderRequest
  20. from cos_request import ListFolderRequest, DownloadFileRequest, MoveFileRequest
  21. from cos_common import Sha1Util
  22. from logging import getLogger
  23. from traceback import format_exc
  24. logger = getLogger(__name__)
  25. class BaseOp(object):
  26. """
  27. BaseOp基本操作类型
  28. """
  29. def __init__(self, cred, config, http_session):
  30. """ 初始化类
  31. :param cred: 用户的身份信息
  32. :param config: cos_config配置类
  33. :param http_session: http 会话
  34. """
  35. self._cred = cred
  36. self._config = config
  37. self._http_session = http_session
  38. self._expired_period = self._config.get_sign_expired()
  39. def set_cred(self, cred):
  40. """设置用户的身份信息
  41. :param cred:
  42. :return:
  43. """
  44. self._cred = cred
  45. def set_config(self, config):
  46. """ 设置config
  47. :param config:
  48. :return:
  49. """
  50. self._config = config
  51. self._expired_period = self._config.get_sign_expired()
  52. def _build_url(self, bucket, cos_path):
  53. """生成url
  54. :param bucket:
  55. :param cos_path:
  56. :return:
  57. """
  58. bucket = bucket.encode('utf8')
  59. end_point = self._config.get_endpoint().rstrip('/').encode('utf8')
  60. appid = self._cred.get_appid()
  61. cos_path = urllib.quote(cos_path.encode('utf8'), '~/')
  62. url = '%s/%s/%s%s' % (end_point, appid, bucket, cos_path)
  63. return url
  64. def build_download_url(self, bucket, cos_path, sign):
  65. # Only support http now
  66. appid = self._cred.get_appid()
  67. hostname = self._config.get_download_hostname()
  68. cos_path = urllib.quote(cos_path)
  69. url_tmpl = 'http://{bucket}-{appid}.{hostname}{cos_path}?sign={sign}'
  70. return url_tmpl.format(bucket=bucket, appid=appid, hostname=hostname, cos_path=cos_path, sign=sign)
  71. def send_request(self, method, bucket, cos_path, **kwargs):
  72. """ 发送http请求
  73. :param method:
  74. :param bucket:
  75. :param cos_path:
  76. :param args:
  77. :return:
  78. """
  79. url = self._build_url(bucket, cos_path)
  80. logger.debug("sending request, method: %s, bucket: %s, cos_path: %s" % (method, bucket, cos_path))
  81. try:
  82. if method == 'POST':
  83. http_resp = self._http_session.post(url, verify=False, **kwargs)
  84. else:
  85. http_resp = self._http_session.get(url, verify=False, **kwargs)
  86. status_code = http_resp.status_code
  87. if status_code < 500:
  88. return http_resp.json()
  89. else:
  90. logger.warning("request failed, response message: %s" % http_resp.text)
  91. err_detail = 'url:%s, status_code:%d' % (url, status_code)
  92. return CosErr.get_err_msg(CosErr.NETWORK_ERROR, err_detail)
  93. except Exception as e:
  94. logger.exception("request failed, return SERVER_ERROR")
  95. err_detail = 'url:%s, exception:%s traceback:%s' % (url, str(e), format_exc())
  96. return CosErr.get_err_msg(CosErr.SERVER_ERROR, err_detail)
  97. def _check_params(self, request):
  98. """检查用户输入参数, 检查通过返回None, 否则返回一个代表错误原因的dict
  99. :param request:
  100. :return:
  101. """
  102. if not self._cred.check_params_valid():
  103. return CosErr.get_err_msg(CosErr.PARAMS_ERROR, self._cred.get_err_tips())
  104. if not request.check_params_valid():
  105. return CosErr.get_err_msg(CosErr.PARAMS_ERROR, request.get_err_tips())
  106. return None
  107. def del_base(self, request):
  108. """删除文件或者目录, is_file_op为True表示是文件操作
  109. :param request:
  110. :return:
  111. """
  112. check_params_ret = self._check_params(request)
  113. if check_params_ret is not None:
  114. return check_params_ret
  115. auth = cos_auth.Auth(self._cred)
  116. bucket = request.get_bucket_name()
  117. cos_path = request.get_cos_path()
  118. sign = auth.sign_once(bucket, cos_path)
  119. http_header = dict()
  120. http_header['Authorization'] = sign
  121. http_header['Content-Type'] = 'application/json'
  122. http_header['User-Agent'] = self._config.get_user_agent()
  123. http_body = {'op': 'delete'}
  124. timeout = self._config.get_timeout()
  125. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  126. def stat_base(self, request):
  127. """获取文件和目录的属性
  128. :param request:
  129. :return:
  130. """
  131. check_params_ret = self._check_params(request)
  132. if check_params_ret is not None:
  133. return check_params_ret
  134. auth = cos_auth.Auth(self._cred)
  135. bucket = request.get_bucket_name()
  136. cos_path = request.get_cos_path()
  137. expired = int(time.time()) + self._expired_period
  138. sign = auth.sign_more(bucket, cos_path, expired)
  139. http_header = dict()
  140. http_header['Authorization'] = sign
  141. http_header['User-Agent'] = self._config.get_user_agent()
  142. http_body = dict()
  143. http_body['op'] = 'stat'
  144. timeout = self._config.get_timeout()
  145. return self.send_request('GET', bucket, cos_path, headers=http_header, params=http_body, timeout=timeout)
  146. class FileOp(BaseOp):
  147. """FileOp 文件相关操作"""
  148. def __init__(self, cred, config, http_session):
  149. """ 初始化类
  150. :param cred: 用户的身份信息
  151. :param config: cos_config配置类
  152. :param http_session: http 会话
  153. """
  154. BaseOp.__init__(self, cred, config, http_session)
  155. # 单文件上传的最大上限是20MB
  156. self.max_single_file = 20 * 1024 * 1024
  157. @staticmethod
  158. def _sha1_content(content):
  159. """获取content的sha1
  160. :param content:
  161. :return:
  162. """
  163. sha1_obj = hashlib.sha1()
  164. sha1_obj.update(content)
  165. return sha1_obj.hexdigest()
  166. def update_file(self, request):
  167. """更新文件
  168. :param request:
  169. :return:
  170. """
  171. assert isinstance(request, UpdateFileRequest)
  172. logger.debug("request: " + str(request.get_custom_headers()))
  173. check_params_ret = self._check_params(request)
  174. if check_params_ret is not None:
  175. return check_params_ret
  176. logger.debug("params verify successfully")
  177. auth = cos_auth.Auth(self._cred)
  178. bucket = request.get_bucket_name()
  179. cos_path = request.get_cos_path()
  180. sign = auth.sign_once(bucket, cos_path)
  181. http_header = dict()
  182. http_header['Authorization'] = sign
  183. http_header['Content-Type'] = 'application/json'
  184. http_header['User-Agent'] = self._config.get_user_agent()
  185. http_body = dict()
  186. http_body['op'] = 'update'
  187. if request.get_biz_attr() is not None:
  188. http_body['biz_attr'] = request.get_biz_attr()
  189. if request.get_authority() is not None:
  190. http_body['authority'] = request.get_authority()
  191. if request.get_custom_headers() is not None and len(request.get_custom_headers()) is not 0:
  192. http_body['custom_headers'] = request.get_custom_headers()
  193. logger.debug("Update Request Header: " + json.dumps(http_body))
  194. timeout = self._config.get_timeout()
  195. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  196. def del_file(self, request):
  197. """删除文件
  198. :param request:
  199. :return:
  200. """
  201. assert isinstance(request, DelFileRequest)
  202. return self.del_base(request)
  203. def stat_file(self, request):
  204. """获取文件的属性
  205. :param request:
  206. :return:
  207. """
  208. assert isinstance(request, StatFileRequest)
  209. return self.stat_base(request)
  210. def upload_file(self, request):
  211. """上传文件, 根据用户的文件大小,选择单文件上传和分片上传策略
  212. :param request:
  213. :return:
  214. """
  215. assert isinstance(request, UploadFileRequest)
  216. check_params_ret = self._check_params(request)
  217. if check_params_ret is not None:
  218. return check_params_ret
  219. local_path = request.get_local_path()
  220. file_size = os.path.getsize(local_path)
  221. suit_single_file_zie = 8 * 1024 * 1024
  222. if file_size < suit_single_file_zie:
  223. return self.upload_single_file(request)
  224. else:
  225. bucket = request.get_bucket_name()
  226. cos_path = request.get_cos_path()
  227. local_path = request.get_local_path()
  228. slice_size = 1024 * 1024
  229. biz_attr = request.get_biz_attr()
  230. upload_slice_request = UploadSliceFileRequest(bucket, cos_path, local_path, slice_size, biz_attr)
  231. upload_slice_request.set_insert_only(request.get_insert_only())
  232. return self.upload_slice_file(upload_slice_request)
  233. def upload_single_file(self, request):
  234. """ 单文件上传
  235. :param request:
  236. :return:
  237. """
  238. assert isinstance(request, UploadFileRequest)
  239. check_params_ret = self._check_params(request)
  240. if check_params_ret is not None:
  241. return check_params_ret
  242. local_path = request.get_local_path()
  243. file_size = os.path.getsize(local_path)
  244. # 判断文件是否超过单文件最大上限, 如果超过则返回错误
  245. # 并提示用户使用别的接口
  246. if file_size > self.max_single_file:
  247. return CosErr.get_err_msg(CosErr.NETWORK_ERROR, 'file is too big, please use upload_file interface')
  248. auth = cos_auth.Auth(self._cred)
  249. bucket = request.get_bucket_name()
  250. cos_path = request.get_cos_path()
  251. expired = int(time.time()) + self._expired_period
  252. sign = auth.sign_more(bucket, cos_path, expired)
  253. http_header = dict()
  254. http_header['Authorization'] = sign
  255. http_header['User-Agent'] = self._config.get_user_agent()
  256. with open(local_path, 'rb') as f:
  257. file_content = f.read()
  258. http_body = dict()
  259. http_body['op'] = 'upload'
  260. http_body['filecontent'] = file_content
  261. http_body['sha'] = FileOp._sha1_content(file_content)
  262. http_body['biz_attr'] = request.get_biz_attr()
  263. http_body['insertOnly'] = str(request.get_insert_only())
  264. timeout = self._config.get_timeout()
  265. ret = self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  266. if request.get_insert_only() != 0:
  267. return ret
  268. if ret[u'code'] == 0:
  269. return ret
  270. # try to delete object, and re-post request
  271. del_request = DelFileRequest(bucket_name=request.get_bucket_name(), cos_path=request.get_cos_path())
  272. ret = self.del_file(del_request)
  273. if ret[u'code'] == 0:
  274. return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  275. else:
  276. return ret
  277. def _upload_slice_file(self, request):
  278. assert isinstance(request, UploadSliceFileRequest)
  279. check_params_ret = self._check_params(request)
  280. if check_params_ret is not None:
  281. return check_params_ret
  282. local_path = request.get_local_path()
  283. slice_size = request.get_slice_size()
  284. enable_sha1 = request.enable_sha1
  285. if enable_sha1 is True:
  286. sha1_by_slice_list = Sha1Util.get_sha1_by_slice(local_path, slice_size)
  287. request.sha1_list = sha1_by_slice_list
  288. request.sha1_content = sha1_by_slice_list[-1]["datasha"]
  289. else:
  290. request.sha1_list = None
  291. request.sha1_content = None
  292. control_ret = self._upload_slice_control(request)
  293. # 表示控制分片已经产生错误信息
  294. if control_ret[u'code'] != 0:
  295. return control_ret
  296. # 命中秒传
  297. if u'access_url' in control_ret[u'data']:
  298. return control_ret
  299. local_path = request.get_local_path()
  300. file_size = os.path.getsize(local_path)
  301. if u'slice_size' in control_ret[u'data']:
  302. slice_size = control_ret[u'data'][u'slice_size']
  303. offset = 0
  304. session = control_ret[u'data'][u'session']
  305. # ?concurrency
  306. if request._max_con <= 1 or (
  307. u'serial_upload' in control_ret[u'data'] and control_ret[u'data'][u'serial_upload'] == 1):
  308. logger.info("upload file serially")
  309. slice_idx = 0
  310. with open(local_path, 'rb') as local_file:
  311. while offset < file_size:
  312. file_content = local_file.read(slice_size)
  313. data_ret = self._upload_slice_data(request, file_content, session, offset)
  314. if data_ret[u'code'] == 0:
  315. if u'access_url' in data_ret[u'data']:
  316. return data_ret
  317. else:
  318. return data_ret
  319. offset += slice_size
  320. slice_idx += 1
  321. else:
  322. logger.info('upload file concurrently')
  323. from threadpool import SimpleThreadPool
  324. pool = SimpleThreadPool(request._max_con)
  325. slice_idx = 0
  326. with open(local_path, 'rb') as local_file:
  327. while offset < file_size:
  328. file_content = local_file.read(slice_size)
  329. pool.add_task(self._upload_slice_data, request, file_content, session, offset)
  330. offset += slice_size
  331. slice_idx += 1
  332. pool.wait_completion()
  333. result = pool.get_result()
  334. if not result['success_all']:
  335. return {u'code': 1, u'message': str(result)}
  336. data_ret = self._upload_slice_finish(request, session, file_size)
  337. return data_ret
  338. def upload_slice_file(self, request):
  339. """分片文件上传(串行)
  340. :param request:
  341. :return:
  342. """
  343. ret = self._upload_slice_file(request)
  344. if ret[u'code'] == 0:
  345. return ret
  346. if request.get_insert_only() == 0:
  347. del_request = DelFileRequest(request.get_bucket_name(), request.get_cos_path())
  348. ret = self.del_file(del_request)
  349. if ret[u'code'] == 0:
  350. return self._upload_slice_file(request)
  351. else:
  352. return ret
  353. else:
  354. return ret
  355. def _upload_slice_finish(self, request, session, filesize):
  356. auth = cos_auth.Auth(self._cred)
  357. bucket = request.get_bucket_name()
  358. cos_path = request.get_cos_path()
  359. expired = int(time.time()) + self._expired_period
  360. sign = auth.sign_more(bucket, cos_path, expired)
  361. http_header = dict()
  362. http_header['Authorization'] = sign
  363. http_header['User-Agent'] = self._config.get_user_agent()
  364. http_body = dict()
  365. http_body['op'] = "upload_slice_finish"
  366. http_body['session'] = session
  367. http_body['filesize'] = str(filesize)
  368. if request.sha1_list is not None:
  369. http_body['sha'] = request.sha1_list[-1]["datasha"]
  370. timeout = self._config.get_timeout()
  371. return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  372. def _upload_slice_control(self, request):
  373. """串行分片第一步, 上传控制分片
  374. :param request:
  375. :return:
  376. """
  377. auth = cos_auth.Auth(self._cred)
  378. bucket = request.get_bucket_name()
  379. cos_path = request.get_cos_path()
  380. expired = int(time.time()) + self._expired_period
  381. sign = auth.sign_more(bucket, cos_path, expired)
  382. http_header = dict()
  383. http_header['Authorization'] = sign
  384. http_header['User-Agent'] = self._config.get_user_agent()
  385. local_path = request.get_local_path()
  386. file_size = os.path.getsize(local_path)
  387. slice_size = request.get_slice_size()
  388. biz_atrr = request.get_biz_attr()
  389. http_body = dict()
  390. http_body['op'] = 'upload_slice_init'
  391. if request.enable_sha1:
  392. http_body['sha'] = request.sha1_list[-1]["datasha"]
  393. http_body['uploadparts'] = json.dumps(request.sha1_list)
  394. http_body['filesize'] = str(file_size)
  395. http_body['slice_size'] = str(slice_size)
  396. http_body['biz_attr'] = biz_atrr
  397. http_body['insertOnly'] = str(request.get_insert_only())
  398. timeout = self._config.get_timeout()
  399. return self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  400. def _upload_slice_data(self, request, file_content, session, offset, retry=3):
  401. """串行分片第二步, 上传数据分片
  402. :param request:
  403. :param file_content:
  404. :param session:
  405. :param offset:
  406. :return:
  407. """
  408. bucket = request.get_bucket_name()
  409. cos_path = request.get_cos_path()
  410. auth = cos_auth.Auth(self._cred)
  411. expired = int(time.time()) + self._expired_period
  412. sign = auth.sign_more(bucket, cos_path, expired)
  413. http_header = dict()
  414. http_header['Authorization'] = sign
  415. http_header['User-Agent'] = self._config.get_user_agent()
  416. http_body = dict()
  417. http_body['op'] = 'upload_slice_data'
  418. http_body['filecontent'] = file_content
  419. http_body['session'] = session
  420. http_body['offset'] = str(offset)
  421. if request.sha1_content is not None:
  422. http_body['sha'] = request.sha1_content
  423. timeout = self._config.get_timeout()
  424. for _ in range(retry):
  425. ret = self.send_request('POST', bucket, cos_path, headers=http_header, files=http_body, timeout=timeout)
  426. if ret['code'] == 0:
  427. return ret
  428. else:
  429. return ret
  430. def __download_url(self, uri, filename, headers):
  431. session = self._http_session
  432. with closing(session.get(uri, stream=True, timeout=30, headers=headers)) as ret:
  433. if ret.status_code in [200, 206]:
  434. if 'Content-Length' in ret.headers:
  435. content_len = int(ret.headers['Content-Length'])
  436. else:
  437. raise IOError("download failed without Content-Length header")
  438. file_len = 0
  439. with open(filename, 'wb') as f:
  440. for chunk in ret.iter_content(chunk_size=1024):
  441. if chunk:
  442. file_len += len(chunk)
  443. f.write(chunk)
  444. f.flush()
  445. if file_len != content_len:
  446. raise IOError("download failed with incomplete file")
  447. else:
  448. raise IOError("download failed with status code:" + str(ret.status_code))
  449. def download_file(self, request):
  450. assert isinstance(request, DownloadFileRequest)
  451. auth = cos_auth.Auth(self._cred)
  452. sign = auth.sign_download(request.get_bucket_name(), request.get_cos_path(), self._config.get_sign_expired())
  453. url = self.build_download_url(request.get_bucket_name(), request.get_cos_path(), sign)
  454. logger.info("Uri is %s" % url)
  455. try:
  456. self.__download_url(url, request._local_filename, request._custom_headers)
  457. return {u'code': 0, u'message': "download successfully"}
  458. except Exception as e:
  459. return {u'code': 1, u'message': "download failed, exception: " + str(e)}
  460. def __move_file(self, request):
  461. auth = cos_auth.Auth(self._cred)
  462. bucket = request.get_bucket_name()
  463. cos_path = request.get_cos_path()
  464. sign = auth.sign_once(bucket, cos_path)
  465. http_header = dict()
  466. http_header['Authorization'] = sign
  467. http_header['User-Agent'] = self._config.get_user_agent()
  468. http_body = dict()
  469. http_body['op'] = 'move'
  470. http_body['dest_fileid'] = request.dest_path
  471. http_body['to_over_write'] = str(1 if request.overwrite else 0)
  472. timeout = self._config.get_timeout()
  473. return self.send_request('POST', bucket, cos_path, headers=http_header, params=http_body, timeout=timeout)
  474. def move_file(self, request):
  475. assert isinstance(request, MoveFileRequest)
  476. return self.__move_file(request)
  477. class FolderOp(BaseOp):
  478. """FolderOp 目录相关操作"""
  479. def __init__(self, cred, config, http_session):
  480. BaseOp.__init__(self, cred, config, http_session)
  481. def update_folder(self, request):
  482. """更新目录
  483. :param request:
  484. :return:
  485. """
  486. assert isinstance(request, UpdateFolderRequest)
  487. check_params_ret = self._check_params(request)
  488. if check_params_ret is not None:
  489. return check_params_ret
  490. auth = cos_auth.Auth(self._cred)
  491. bucket = request.get_bucket_name()
  492. cos_path = request.get_cos_path()
  493. sign = auth.sign_once(bucket, cos_path)
  494. http_header = dict()
  495. http_header['Authorization'] = sign
  496. http_header['Content-Type'] = 'application/json'
  497. http_header['User-Agent'] = self._config.get_user_agent()
  498. http_body = dict()
  499. http_body['op'] = 'update'
  500. http_body['biz_attr'] = request.get_biz_attr()
  501. timeout = self._config.get_timeout()
  502. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  503. def del_folder(self, request):
  504. """删除目录
  505. :param request:
  506. :return:
  507. """
  508. assert isinstance(request, DelFolderRequest)
  509. return self.del_base(request)
  510. def stat_folder(self, request):
  511. """获取目录属性
  512. :param request:
  513. :return:
  514. """
  515. assert isinstance(request, StatFolderRequest)
  516. return self.stat_base(request)
  517. def create_folder(self, request):
  518. """创建目录
  519. :param request:
  520. :return:
  521. """
  522. assert isinstance(request, CreateFolderRequest)
  523. check_params_ret = self._check_params(request)
  524. if check_params_ret is not None:
  525. return check_params_ret
  526. auth = cos_auth.Auth(self._cred)
  527. bucket = request.get_bucket_name()
  528. cos_path = request.get_cos_path()
  529. expired = int(time.time()) + self._expired_period
  530. sign = auth.sign_more(bucket, cos_path, expired)
  531. http_header = dict()
  532. http_header['Authorization'] = sign
  533. http_header['Content-Type'] = 'application/json'
  534. http_header['User-Agent'] = self._config.get_user_agent()
  535. http_body = dict()
  536. http_body['op'] = 'create'
  537. http_body['biz_attr'] = request.get_biz_attr()
  538. timeout = self._config.get_timeout()
  539. return self.send_request('POST', bucket, cos_path, headers=http_header, data=json.dumps(http_body), timeout=timeout)
  540. def list_folder(self, request):
  541. """list目录
  542. :param request:
  543. :return:
  544. """
  545. assert isinstance(request, ListFolderRequest)
  546. check_params_ret = self._check_params(request)
  547. if check_params_ret is not None:
  548. return check_params_ret
  549. http_body = dict()
  550. http_body['op'] = 'list'
  551. http_body['num'] = request.get_num()
  552. http_body['context'] = request.get_context()
  553. auth = cos_auth.Auth(self._cred)
  554. bucket = request.get_bucket_name()
  555. list_path = request.get_cos_path() + request.get_prefix()
  556. expired = int(time.time()) + self._expired_period
  557. sign = auth.sign_more(bucket, list_path, expired)
  558. http_header = dict()
  559. http_header['Authorization'] = sign
  560. http_header['User-Agent'] = self._config.get_user_agent()
  561. timeout = self._config.get_timeout()
  562. return self.send_request('GET', bucket, list_path, headers=http_header, params=http_body, timeout=timeout)