This repository has been archived by the owner on Jul 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
/
signstring.py
71 lines (57 loc) · 2.53 KB
/
signstring.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# coding=utf-8
import os
import hashlib
import urllib
import time
import json
from urllib import parse
from typing import Union,Text,Dict
# from aftership.const import API_KEY_FILED_NAME, API_ENDPOINT, AS_SIGNATURE_HMAC_SHA256, AS_SIGNATURE_RSA_SHA256, AS_API_KEY
BODY = Union[Text,Dict]
class SignString():
def __init__(self, api_secret:str):
self.api_secret = api_secret
def _gen_sign_string(self, method: str, body:BODY, content_type: str, date: str, canonicalized_as_headers: str,
canonicalized_resource: str) -> str:
result = ''
result += method + '\n'
if body:
if isinstance(body, dict):
body = json.dumps(body)
body = hashlib.md5(body.encode()).hexdigest().upper()
else:
body = ''
content_type = ''
result += body + '\n'
result += content_type + '\n'
result += date + '\n'
result += canonicalized_as_headers + '\n'
result += canonicalized_resource
return result
def _get_canonicalized_as_headers(self, headers: dict) -> str:
new_header = {}
for k, v in headers.items():
new_key = k.lower()
new_value = v.strip()
new_header.update({new_key: new_value})
new_header = dict(sorted(new_header.items()))
result = '\n'.join([k + ':' + v for k, v in new_header.items()])
return result
def _get_canonicalized_resource(self, raw_url: str) -> str:
url_parse_result = parse.urlsplit(raw_url)
path = url_parse_result.path
query = urllib.parse.urlencode(sorted(dict(parse.parse_qsl(url_parse_result.query)).items()))
if query:
path = path + '?' + query
return path
def gen_sign_string(self, method: str, uri: str, body: str, as_header: dict, content_type: str) -> tuple:
if self.api_secret is None:
self.api_secret = os.getenv('AFTERSHIP_API_SECRET')
canonicalized_as_headers = self._get_canonicalized_as_headers(as_header)
canonicalized_resource = self._get_canonicalized_resource(uri)
date = time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime())
# date = "Tue, 29 Jun 2021 07:55:55 GMT"
sign_string = self._gen_sign_string(method=method, body=body, content_type=content_type, date=date,
canonicalized_as_headers=canonicalized_as_headers,
canonicalized_resource=canonicalized_resource)
return date, sign_string