-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added django auth backend to support magic login #25
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,3 +30,6 @@ cover/* | |
# Per-project virtualenvs | ||
virtualenv_run*/ | ||
.virtualenv_run_test/ | ||
|
||
# Pycharm | ||
.idea/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
from django.contrib.auth import get_user_model | ||
from django.contrib.auth.backends import ModelBackend | ||
|
||
from magic_admin.django.config import MagicAuthBackendMode | ||
from magic_admin.django.exceptions import ( | ||
MissingAuthorizationHeader, | ||
MissingUserEmailInput, | ||
PublicAddressDoesNotExist, | ||
UnsupportedAuthMode, | ||
UserEmailMissmatch, | ||
) | ||
from magic_admin.error import ( | ||
DIDTokenError, | ||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
) | ||
|
||
from magic_admin.utils.http import get_identity_token_from_header | ||
from magic_admin.magic import Magic | ||
from magic_admin.utils.logging import ( | ||
log_debug, | ||
log_info, | ||
) | ||
|
||
|
||
user_model = get_user_model() | ||
|
||
|
||
class MagicAuthBackend(ModelBackend): | ||
|
||
@staticmethod | ||
def _load_user_from_email(email): | ||
log_debug('Loading user by email.', email=email) | ||
try: | ||
return user_model.objects.get(email=email) | ||
except user_model.DoesNotExist: | ||
return None | ||
|
||
@staticmethod | ||
def _validate_identity_token_and_load_user( | ||
identity_token, | ||
email, | ||
public_address, | ||
): | ||
try: | ||
Magic().Token.validate(identity_token) | ||
except ( | ||
DIDTokenError, | ||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
) as e: | ||
log_debug( | ||
'DID Token failed validation. No user is to be retrieved.', | ||
error_class=e.__class__.__name__, | ||
) | ||
return None | ||
|
||
try: | ||
user = user_model.get_by_public_address(public_address) | ||
except user_model.DoesNotExist: | ||
raise PublicAddressDoesNotExist() | ||
|
||
if user.email != email: | ||
raise UserEmailMissmatch() | ||
|
||
return user | ||
|
||
def user_can_authenticate(self, user): | ||
if user is None: | ||
return False | ||
|
||
return super().user_can_authenticate(user) | ||
|
||
def _update_user_with_public_address(self, user, public_address): | ||
if self.user_can_authenticate(user): | ||
user.update_user_with_public_address( | ||
user_id=None, | ||
public_address=public_address, | ||
user_obj=user, | ||
) | ||
|
||
def _handle_phantom_auth(self, request, email): | ||
identity_token = get_identity_token_from_header(request) | ||
if identity_token is None: | ||
raise MissingAuthorizationHeader() | ||
|
||
public_address = Magic().User.get_public_address(identity_token) | ||
|
||
try: | ||
user = self._validate_identity_token_and_load_user( | ||
identity_token, | ||
email, | ||
public_address, | ||
) | ||
except PublicAddressDoesNotExist: | ||
user = self._load_user_from_email(email) | ||
if user is None: | ||
log_debug( | ||
'User is not authenticated. No user found with the given email.', | ||
email=email, | ||
) | ||
Magic().User.logout_by_public_address(public_address) | ||
return | ||
|
||
self._update_user_with_public_address(user, public_address) | ||
except UserEmailMissmatch as e: | ||
log_debug( | ||
'User is not authenticated. User email does not match for the ' | ||
'public address.', | ||
email=email, | ||
public_address=public_address, | ||
error_class=e.__class__.__name__, | ||
) | ||
Magic().User.logout_by_public_address(public_address) | ||
return | ||
|
||
if self.user_can_authenticate(user): | ||
log_info('User authenticated with DID Token.') | ||
return user | ||
|
||
def authenticate( | ||
self, | ||
request, | ||
user_email=None, | ||
mode=MagicAuthBackendMode.MAGIC, | ||
*args, | ||
**kwargs, | ||
): | ||
if not user_email: | ||
raise MissingUserEmailInput() | ||
|
||
user_email = user_model.objects.normalize_email(user_email) | ||
|
||
if mode == MagicAuthBackendMode.MAGIC: | ||
return self._handle_phantom_auth(request, user_email) | ||
else: | ||
raise UnsupportedAuthMode() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
from django.contrib.auth import get_user_model | ||
from django.contrib.auth import login | ||
from django.contrib.auth import logout | ||
from django.contrib.auth.models import AnonymousUser | ||
from django.utils.deprecation import MiddlewareMixin | ||
|
||
from magic_admin.django.config import ( | ||
MAGIC_AUTH_BACKEND, | ||
MAGIC_IDENTITY_KEY, | ||
) | ||
from magic_admin.django.exceptions import UnableToLoadUserFromIdentityToken | ||
from magic_admin.error import ( | ||
DIDTokenError, | ||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
) | ||
from magic_admin.utils.http import get_identity_token_from_header | ||
from magic_admin.magic import Magic | ||
|
||
|
||
user_model = get_user_model() | ||
|
||
|
||
class MagicAuthMiddleware(MiddlewareMixin): | ||
|
||
@staticmethod | ||
def _persist_data_in_session(request, identity_token): | ||
request.session[MAGIC_IDENTITY_KEY] = identity_token | ||
request.session.modified = True | ||
|
||
@staticmethod | ||
def _load_identity_token_from_session(request): | ||
return request.session.get(MAGIC_IDENTITY_KEY, None) | ||
|
||
@staticmethod | ||
def _load_identity_token_from_header(request): | ||
return get_identity_token_from_header(request) | ||
|
||
@staticmethod | ||
def _is_request_related_to_magic(identity_token): | ||
return identity_token is not None | ||
|
||
@staticmethod | ||
def _try_loading_user_from_identity_token(identity_token): | ||
public_address = Magic().User.get_public_address(identity_token) | ||
|
||
try: | ||
return user_model.get_by_public_address(public_address) | ||
except user_model.DoesNotExist: | ||
return AnonymousUser() | ||
|
||
@staticmethod | ||
def can_user_be_logged_in(user): | ||
if user is None: | ||
return False | ||
|
||
is_active = getattr(user, 'is_active', None) | ||
return is_active or is_active is None | ||
|
||
def _attempt_handling_anonymous_user(self, request, identity_token): | ||
user = self._try_loading_user_from_identity_token(identity_token) | ||
|
||
if not user.is_anonymous and self.can_user_be_logged_in(user): | ||
# Log the user in to rehydrate the session. | ||
login(request, user, backend=MAGIC_AUTH_BACKEND) | ||
else: | ||
raise UnableToLoadUserFromIdentityToken() | ||
|
||
def _load_identity_token_from_sources(self, request): | ||
# The identity token from header take precedence over the one in session. | ||
return self._load_identity_token_from_header( | ||
request, | ||
) or self._load_identity_token_from_session(request) | ||
|
||
def process_request(self, request): | ||
assert hasattr(request, 'user'), ( | ||
'The Magic authentication middleware requires authentication ' | ||
'middleware to be installed. Edit your MIDDLEWARE setting to insert ' | ||
'`django.contrib.auth.middleware.AuthenticationMiddleware` before ' | ||
'`magic_admin.django.auth.middleware.MagicAuthMiddleware`.' | ||
) | ||
|
||
identity_token = self._load_identity_token_from_sources(request) | ||
|
||
if not self._is_request_related_to_magic(identity_token): | ||
return | ||
|
||
try: | ||
Magic().Token.validate(identity_token) | ||
except ( | ||
DIDTokenError, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about these exceptions, I just imported all the existing errors and plugged them here. I'm not sure what's the intended behavior here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #25 (comment) |
||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
): | ||
logout(request) | ||
return | ||
|
||
if request.user.is_anonymous: | ||
try: | ||
self._attempt_handling_anonymous_user(request, identity_token) | ||
except UnableToLoadUserFromIdentityToken: | ||
return | ||
|
||
if ( | ||
request.user.public_address and | ||
request.user.public_address != Magic().User.get_public_address( | ||
identity_token, | ||
) | ||
): | ||
logout(request) | ||
return | ||
|
||
if self.can_user_be_logged_in(request.user): | ||
self._persist_data_in_session(request, identity_token) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from django.contrib.auth.models import AnonymousUser | ||
from django.db import models | ||
from django.utils.translation import gettext_lazy as _ | ||
|
||
|
||
class MagicUserMixin(models.Model): | ||
|
||
public_address = models.CharField( | ||
_('public address'), | ||
max_length=128, | ||
unique=True, | ||
blank=True, | ||
null=True, | ||
db_index=True, | ||
default=None, | ||
) | ||
|
||
class Meta: | ||
abstract = True | ||
|
||
@classmethod | ||
def get_by_public_address(cls, public_address): | ||
return cls.objects.get(public_address=public_address) | ||
|
||
def update_user_with_public_address( | ||
self, | ||
user_id, | ||
public_address, | ||
user_obj=None, | ||
): | ||
if user_obj is None: | ||
user_obj = self.objects.get(pk=user_id) | ||
|
||
if user_obj.public_address == public_address: | ||
return user_obj | ||
|
||
user_obj.public_address = public_address | ||
user_obj.save(update_fields=['public_address']) | ||
|
||
return user_obj | ||
|
||
|
||
class MagicAnonymousUser(AnonymousUser): | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
MAGIC_IDENTITY_KEY = '_magic_identity_token' | ||
MAGIC_AUTH_BACKEND = 'magic_admin.django.auth.backends.MagicAuthBackend' | ||
|
||
|
||
class MagicAuthBackendMode: | ||
|
||
DJANGO_DEFAULT_AUTH = 0 | ||
MAGIC = 1 | ||
|
||
ALLOWED_MODES = frozenset([DJANGO_DEFAULT_AUTH, MAGIC]) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
class MagicDjangoException(Exception): | ||
pass | ||
|
||
|
||
class UnsupportedAuthMode(MagicDjangoException): | ||
pass | ||
|
||
|
||
class PublicAddressDoesNotExist(MagicDjangoException): | ||
pass | ||
|
||
|
||
class UserEmailMissmatch(MagicDjangoException): | ||
pass | ||
|
||
|
||
class MissingUserEmailInput(MagicDjangoException): | ||
pass | ||
|
||
|
||
class MissingAuthorizationHeader(MagicDjangoException): | ||
pass | ||
|
||
|
||
class UnableToLoadUserFromIdentityToken(MagicDjangoException): | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from django.contrib.auth import logout as django_logout | ||
|
||
from magic_admin.magic import Magic | ||
from magic_admin.utils.logging import log_debug | ||
|
||
|
||
def logout(request): | ||
user = request.user | ||
|
||
if not user.is_anonymous and user.public_address: | ||
Magic().User.logout_by_public_address(user.public_address) | ||
log_debug('Log out user from Magic') | ||
|
||
django_logout(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about these exceptions, I just imported all the existing errors and plugged them here. I'm not sure what's the intended behavior here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DIDTokenError
is the only error that you need to try-catch here as thevalidate
method will only raise theDIDTokenError
.https://docs.magic.link/admin-sdk/python/api-reference#validate and https://docs.magic.link/admin-sdk/python/api-reference#errors have information on the admin sdk error handling. Let me know if this answers your questions.