Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Feature/aes signature #50

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions aftership/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@
__version__ = '1.3.0'

api_key = None

api_secret = None
8 changes: 7 additions & 1 deletion aftership/const.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
API_KEY_FILED_NAME = 'aftership-api-key'
AFTERSHIP_API_KEY = 'aftership-api-key'
AS_API_KEY = 'as-api-key'
YoungWing marked this conversation as resolved.
Show resolved Hide resolved
AS_SIGNATURE_HMAC_SHA256 = 'as-signature-hmac-sha256'

CONTENT_TYPE = "application/json"

API_VERSION = "v4"
API_ENDPOINT = "https://api.aftership.com/v4/"

SIGNATURE_AES_HMAC_SHA256 = "AES-HMAC-SHA256"
5 changes: 5 additions & 0 deletions aftership/courier.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .request import make_request
from .response import process_response
import json

__all__ = ['list_couriers', 'list_all_couriers', 'detect_courier']

Expand All @@ -24,3 +25,7 @@ def detect_courier(tracking, **kwargs):
"""
response = make_request('POST', 'couriers/detect', json=dict(tracking=tracking), **kwargs)
return process_response(response)

def post_orders(order, **kwargs):
response = make_request('POST', 'commerce/v1/orders', json=dict(order=json.loads(order)), **kwargs)
return process_response(response)
16 changes: 16 additions & 0 deletions aftership/hmac/hmac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# coding=utf-8

import os
import hashlib
import base64
import hmac

class Hmac():
def __init__(self, api_secret):
self.api_secret = api_secret

def hmac_signature(self, sign_string: str) -> str:
if self.api_secret is None:
self.api_secret = os.getenv('AFTERSHIP_API_SECRET')
signature_str = hmac.new(bytes(self.api_secret.encode()), msg=bytes(sign_string.encode()), digestmod=hashlib.sha256).digest()
return base64.b64encode(signature_str).decode()
68 changes: 63 additions & 5 deletions aftership/request.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import requests

from urllib.parse import urljoin
from urllib.parse import urlparse
from aftership.hmac.hmac import Hmac

import requests
from aftership.signstring.signstring import SignString

from .const import API_KEY_FILED_NAME, API_ENDPOINT
from .util import get_api_key
from .const import AFTERSHIP_API_KEY, API_ENDPOINT, API_VERSION, AS_SIGNATURE_HMAC_SHA256, AS_API_KEY, CONTENT_TYPE, SIGNATURE_AES_HMAC_SHA256
from .util import get_api_key, get_api_secret


def build_request_url(path):
Expand All @@ -12,9 +16,63 @@ def build_request_url(path):

def make_request(method, path, **kwargs):
url = build_request_url(path)
res = urlparse(url)
params = kwargs.get('params', None)
path = res.path

params_str = ""
if params:
params_str = '&'.join([ str(key)+'='+str(value) for key,value in params.items()])

if not path.startswith("/"):
path = '/' + path

if len(params_str)>0:
path = '{}?{}'.format(path, params_str)

signature_type = kwargs.pop('signature_type', None)
if signature_type is None:
return request_with_token(method, url, **kwargs)

body = kwargs.get('json', None)
content_type = None
if (method == "POST" or method == "PUT" or method == "PATCH") and body is not None:
content_type = CONTENT_TYPE

# if using SignString, you must use AS_API_KEY header
if signature_type == SIGNATURE_AES_HMAC_SHA256:
return request_with_aes_hmac256_signature(method, url, path, content_type, **kwargs)

return None


def request_with_token(method, url, **kwargs):
headers = kwargs.pop('headers', dict())
if headers.get(API_KEY_FILED_NAME) is None:
headers[API_KEY_FILED_NAME] = get_api_key()
if headers.get(AFTERSHIP_API_KEY) is None and headers.get(AS_API_KEY) is None:
headers[AFTERSHIP_API_KEY] = get_api_key()

kwargs['headers'] = headers
return requests.request(method, url, **kwargs)

def request_with_aes_hmac256_signature(method, url, path, content_type, **kwargs):
headers = kwargs.pop('headers', dict())
if headers.get(AS_API_KEY, None) is None:
headers[AS_API_KEY] = get_api_key()

body = kwargs.get('json', None)
date, sign_string = gen_sign_string(method, path, body, headers, content_type)

hmac = Hmac(get_api_secret())
hmac_signature = hmac.hmac_signature(sign_string)
headers[AS_SIGNATURE_HMAC_SHA256] = hmac_signature
headers['Date'] = date

if content_type is not None:
headers["Content-Type"] = content_type

kwargs['headers'] = headers
return requests.request(method, url, **kwargs)

def gen_sign_string(method, path, body, headers, content_type):
s = SignString(headers[AS_API_KEY])
return s.gen_sign_string(method, path, body, headers, content_type)
2 changes: 1 addition & 1 deletion aftership/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def process_response(response):
json_content = response.json()
except json.JSONDecodeError:
raise InternalError

if response.status_code // 100 == 2:
return json_content['data']

Expand Down
69 changes: 69 additions & 0 deletions aftership/signstring/signstring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# coding=utf-8

import os
import hashlib
import urllib
import time
import json
from urllib import parse
from typing import Union,Text,Dict

BODY = Union[Text,Dict]

class SignString():
def __init__(self, api_secret:str):
self.api_secret = api_secret

def _gen_sign_string(self, method: str, body:BODY, content_type: str, date: str, canonicalized_as_headers: str,
canonicalized_resource: str) -> str:
result = ''
result += method + '\n'
if body:
if isinstance(body, dict):
body = json.dumps(body)
body = hashlib.md5(body.encode()).hexdigest().upper()
else:
body = ''
content_type = ''

result += body + '\n'
result += content_type + '\n'
result += date + '\n'
result += canonicalized_as_headers + '\n'
result += canonicalized_resource

return result

def _get_canonicalized_as_headers(self, headers: dict) -> str:
new_header = {}
for k, v in headers.items():
new_key = k.lower()
new_value = v.strip()
new_header.update({new_key: new_value})

new_header = dict(sorted(new_header.items()))

result = '\n'.join([k + ':' + v for k, v in new_header.items()])
return result

def _get_canonicalized_resource(self, raw_url: str) -> str:
url_parse_result = parse.urlsplit(raw_url)
path = url_parse_result.path
query = urllib.parse.urlencode(sorted(dict(parse.parse_qsl(url_parse_result.query)).items()))
if query:
path = path + '?' + query
return path

def gen_sign_string(self, method: str, uri: str, body: str, as_header: dict, content_type: str) -> tuple:
if self.api_secret is None:
self.api_secret = os.getenv('AFTERSHIP_API_SECRET')

canonicalized_as_headers = self._get_canonicalized_as_headers(as_header)
canonicalized_resource = self._get_canonicalized_resource(uri)

date = time.strftime('%a, %d %b %Y %H:%M:%S GMT', time.gmtime())
sign_string = self._gen_sign_string(method=method, body=body, content_type=content_type, date=date,
canonicalized_as_headers=canonicalized_as_headers,
canonicalized_resource=canonicalized_resource)

return date, sign_string
6 changes: 6 additions & 0 deletions aftership/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ def get_api_key():
if aftership.api_key is not None:
return aftership.api_key
return os.getenv('AFTERSHIP_API_KEY')

def get_api_secret():
"""Get AfterShip API secret"""
if aftership.api_secret is not None:
return aftership.api_secret
return os.getenv('AFTERSHIP_API_SECRET')
21 changes: 21 additions & 0 deletions examples/courier_example_aes_signature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import aftership

aftership.api_key = 'asak_61f86xxx'
aftership.api_secret = 'assk_e8b4bxxx'


def get_enabled_courier_names():
result = aftership.courier.list_couriers(signature_type="AES")
courier_list = [courier['name'] for courier in result['couriers']]
return courier_list


def get_supported_courier_names():
result = aftership.courier.list_all_couriers(signature_type="AES")
courier_list = [courier['name'] for courier in result['couriers']]
return courier_list


if __name__ == '__main__':
enabled_couriers = get_enabled_courier_names(signature_type="AES")
print(enabled_couriers)
8 changes: 8 additions & 0 deletions examples/notification_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@ def add_notification(tracking_id, emails=None, smses=None, webhook=None):
raise ValueError('You must specify one of emails, smses or webhook')

aftership.notification.add_notification(tracking_id=tracking_id, notification=update_params)

## using HMAC-SHA256 signature
aftership.notification.add_notification(tracking_id=tracking_id, notification=update_params, signature_type=const.SIGNATURE_AES_HMAC_SHA256)




def list_notifications(tracking_id):
notification = aftership.notification.list_notifications(tracking_id=tracking_id)

## using HMAC-SHA256 signature
notification = aftership.notification.list_notifications(tracking_id=tracking_id, signature_type=const.SIGNATURE_AES_HMAC_SHA256)
return notification


Expand Down
5 changes: 5 additions & 0 deletions examples/tracking_example.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import aftership
from aftership import const


# aftership.api_key = 'PUT_YOUR_AFTERSHIP_KEY_HERE'
Expand All @@ -14,6 +15,10 @@ def create_tracking(slug, tracking_number):

def update_tracking(tracking_id, **values):
aftership.tracking.update_tracking(tracking_id=tracking_id, tracking=values)

## using HMAC-SHA256 signature
aftership.tracking.update_tracking(tracking_id=tracking_id, tracking=values, signature_type=const.SIGNATURE_AES_HMAC_SHA256)

return True


Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ packages = [
[tool.poetry.dependencies]
python = "^3.6"
requests = "^2.0.0'"
pycryptodome = "^3.15.0"

[tool.poetry.dev-dependencies]
pytest = "^6.0.1"
Expand Down
25 changes: 24 additions & 1 deletion tests/test_couriers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import pytest

from aftership import courier
from aftership import const

body='{"source_id":"1710452100BO","name":"1710452100BO","number":"1710452100BO","currency":"EUR","status":"closed","financial_status":"paid","fulfillment_status":"fulfilled","order_total":"399","shipping_total":"0","tax_total":"63.71","discount_total":"0","subtotal":"335.29","items":[{"source_id":"230278-01","sku":"230278-01","quantity":1,"unit_weight":{"unit":"kg","value":8.06},"unit_price":"399","discount":"0","tax":"63.71","return_quantity":1,"fulfillable_quantity":1,"source_product_id":"230278-01","source_variant_id":"230278-01","title":"Cinetic Big Ball Multi Floor 2","hs_code":null,"origin_country_region":"DEU","image_urls":["https://dyson-h.assetsadobe2.com/is/image/content/dam/dyson/images/products/primary/230278-01.png"]}],"source_created_at":"2022-08-08T09:34:03.000Z","source_updated_at":"2022-08-08T09:34:03.000Z","customer":{"source_id":"1710452100BO","first_name":"MICH DE EU RETURNS","last_name":"MICH DE EU RETURNS","emails":["[email protected]"],"phones":["3745345234"]},"shipping_address":{"first_name":"Boon","address_line_1":"Albrechtstrasse","city":"Oberhausen","state":"Nordrhein-Westfalen","country_region":"DEU","postal_code":"46145","phone":"3745345234"}}'


class CourierTestCase(TestCase):
Expand All @@ -12,14 +15,34 @@ def test_get_couriers(self):
self.assertIn('total', resp)
self.assertIn('couriers', resp)

@pytest.mark.vcr()
def test_aes_get_couriers(self):
resp = courier.list_couriers(signature_type=const.SIGNATURE_AES_HMAC_SHA256)
self.assertIn('total', resp)
self.assertIn('couriers', resp)


@pytest.mark.vcr()
def test_get_all_couriers(self):
resp = courier.list_all_couriers()
self.assertIn('total', resp)
self.assertIn('couriers', resp)

@pytest.mark.vcr()
def test_aes_get_all_couriers(self):
resp = courier.list_all_couriers(signature_type=const.SIGNATURE_AES_HMAC_SHA256)
self.assertIn('total', resp)
self.assertIn('couriers', resp)

@pytest.mark.vcr()
def test_detect_courier(self):
resp = courier.detect_courier(tracking={'tracking_number': '1234567890'})
resp = courier.detect_courier(tracking={"tracking_number": "1234567890"})
self.assertIn('total', resp)
self.assertIn('couriers', resp)

@pytest.mark.vcr()
def test_aes_detect_courier(self):
resp = courier.detect_courier(signature_type=const.SIGNATURE_AES_HMAC_SHA256,
tracking={"tracking_number": "1234567890"})
self.assertIn('total', resp)
self.assertIn('couriers', resp)
12 changes: 9 additions & 3 deletions tests/test_estimated_delivery_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import pytest

from aftership import estimated_delivery_date
from aftership import const



class EstimatedDeliveryDateTestCase(TestCase):
@pytest.mark.vcr()
def test_batch_predict_estimated_delivery_date(self):
resp = estimated_delivery_date.batch_predict_estimated_delivery_date(
estimated_delivery_dates=[
estimated_delivery_dates=[
{
"slug": "fedex",
"service_type_name": "FEDEX HOME DELIVERY",
Expand All @@ -36,5 +37,10 @@ def test_batch_predict_estimated_delivery_date(self):
"pickup_time": ""
}
}
])
]
resp = estimated_delivery_date.batch_predict_estimated_delivery_date(estimated_delivery_dates=estimated_delivery_dates)
print(resp)
resp = estimated_delivery_date.batch_predict_estimated_delivery_date(estimated_delivery_dates=estimated_delivery_dates,
signature_type=const.SIGNATURE_AES_HMAC_SHA256)
print(resp)

Loading