-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added django auth backend to support magic login #25
Open
voith
wants to merge
3
commits into
magiclabs:master
Choose a base branch
from
voith:feat/support-django
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,3 +30,6 @@ cover/* | |
# Per-project virtualenvs | ||
virtualenv_run*/ | ||
.virtualenv_run_test/ | ||
|
||
# Pycharm | ||
.idea/ |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
from django.contrib.auth import get_user_model | ||
from django.contrib.auth.backends import ModelBackend | ||
|
||
from magic_admin.django.config import MagicAuthBackendMode | ||
from magic_admin.django.exceptions import ( | ||
MissingAuthorizationHeader, | ||
MissingUserEmailInput, | ||
PublicAddressDoesNotExist, | ||
UnsupportedAuthMode, | ||
UserEmailMissmatch, | ||
) | ||
from magic_admin.error import ( | ||
DIDTokenError, | ||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
) | ||
|
||
from magic_admin.utils.http import get_identity_token_from_header | ||
from magic_admin.magic import Magic | ||
from magic_admin.utils.logging import ( | ||
log_debug, | ||
log_info, | ||
) | ||
|
||
|
||
user_model = get_user_model() | ||
|
||
|
||
class MagicAuthBackend(ModelBackend): | ||
|
||
@staticmethod | ||
def _load_user_from_email(email): | ||
log_debug('Loading user by email.', email=email) | ||
try: | ||
return user_model.objects.get(email=email) | ||
except user_model.DoesNotExist: | ||
return None | ||
|
||
@staticmethod | ||
def _validate_identity_token_and_load_user( | ||
identity_token, | ||
email, | ||
public_address, | ||
): | ||
try: | ||
Magic().Token.validate(identity_token) | ||
except ( | ||
DIDTokenError, | ||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
) as e: | ||
log_debug( | ||
'DID Token failed validation. No user is to be retrieved.', | ||
error_class=e.__class__.__name__, | ||
) | ||
raise e | ||
return None | ||
|
||
try: | ||
user = user_model.get_by_public_address(public_address) | ||
except user_model.DoesNotExist: | ||
raise PublicAddressDoesNotExist() | ||
|
||
if user.email != email: | ||
raise UserEmailMissmatch() | ||
|
||
return user | ||
|
||
def user_can_authenticate(self, user): | ||
if user is None: | ||
return False | ||
|
||
return super().user_can_authenticate(user) | ||
|
||
def _update_user_with_public_address(self, user, public_address): | ||
if self.user_can_authenticate(user): | ||
user.update_user_with_public_address( | ||
user_id=None, | ||
public_address=public_address, | ||
user_obj=user, | ||
) | ||
|
||
def _handle_phantom_auth(self, request, email): | ||
identity_token = get_identity_token_from_header(request) | ||
if identity_token is None: | ||
raise MissingAuthorizationHeader() | ||
|
||
public_address = Magic().Token.get_public_address(identity_token) | ||
|
||
try: | ||
user = self._validate_identity_token_and_load_user( | ||
identity_token, | ||
email, | ||
public_address, | ||
) | ||
except PublicAddressDoesNotExist: | ||
user = self._load_user_from_email(email) | ||
if user is None: | ||
log_debug( | ||
'User is not authenticated. No user found with the given email.', | ||
email=email, | ||
) | ||
Magic().User.logout_by_public_address(public_address) | ||
return | ||
|
||
self._update_user_with_public_address(user, public_address) | ||
except UserEmailMissmatch as e: | ||
log_debug( | ||
'User is not authenticated. User email does not match for the ' | ||
'public address.', | ||
email=email, | ||
public_address=public_address, | ||
error_class=e.__class__.__name__, | ||
) | ||
Magic().User.logout_by_public_address(public_address) | ||
return | ||
|
||
if self.user_can_authenticate(user): | ||
log_info('User authenticated with DID Token.') | ||
return user | ||
|
||
def authenticate( | ||
self, | ||
request, | ||
user_email=None, | ||
mode=MagicAuthBackendMode.MAGIC, | ||
): | ||
if not user_email: | ||
raise MissingUserEmailInput() | ||
|
||
user_email = user_model.objects.normalize_email(user_email) | ||
|
||
if mode == MagicAuthBackendMode.MAGIC: | ||
return self._handle_phantom_auth(request, user_email) | ||
else: | ||
raise UnsupportedAuthMode() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
from django.contrib.auth import get_user_model | ||
from django.contrib.auth import login | ||
from django.contrib.auth import logout | ||
from django.contrib.auth.models import AnonymousUser | ||
from django.utils.deprecation import MiddlewareMixin | ||
|
||
from magic_admin.django.config import ( | ||
MAGIC_AUTH_BACKEND, | ||
MAGIC_IDENTITY_KEY, | ||
) | ||
from magic_admin.django.exceptions import UnableToLoadUserFromIdentityToken | ||
from magic_admin.error import ( | ||
DIDTokenError, | ||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
) | ||
from magic_admin.utils.http import get_identity_token_from_header | ||
from magic_admin.magic import Magic | ||
|
||
|
||
user_model = get_user_model() | ||
|
||
|
||
class MagicAuthMiddleware(MiddlewareMixin): | ||
|
||
@staticmethod | ||
def _persist_data_in_session(request, identity_token): | ||
request.session[MAGIC_IDENTITY_KEY] = identity_token | ||
request.session.modified = True | ||
|
||
@staticmethod | ||
def _load_identity_token_from_session(request): | ||
return request.session.get(MAGIC_IDENTITY_KEY, None) | ||
|
||
@staticmethod | ||
def _load_identity_token_from_header(request): | ||
return get_identity_token_from_header(request) | ||
|
||
@staticmethod | ||
def _is_request_related_to_magic(identity_token): | ||
return identity_token is not None | ||
|
||
@staticmethod | ||
def _try_loading_user_from_identity_token(identity_token): | ||
public_address = Magic().User.get_public_address(identity_token) | ||
|
||
try: | ||
return user_model.get_by_public_address(public_address) | ||
except user_model.DoesNotExist: | ||
return AnonymousUser() | ||
|
||
@staticmethod | ||
def can_user_be_logged_in(user): | ||
if user is None: | ||
return False | ||
|
||
is_active = getattr(user, 'is_active', None) | ||
return is_active or is_active is None | ||
|
||
def _attempt_handling_anonymous_user(self, request, identity_token): | ||
user = self._try_loading_user_from_identity_token(identity_token) | ||
|
||
if not user.is_anonymous and self.can_user_be_logged_in(user): | ||
# Log the user in to rehydrate the session. | ||
login(request, user, backend=MAGIC_AUTH_BACKEND) | ||
else: | ||
raise UnableToLoadUserFromIdentityToken() | ||
|
||
def _load_identity_token_from_sources(self, request): | ||
# The identity token from header take precedence over the one in session. | ||
return self._load_identity_token_from_header( | ||
request, | ||
) or self._load_identity_token_from_session(request) | ||
|
||
def process_request(self, request): | ||
assert hasattr(request, 'user'), ( | ||
'The Magic authentication middleware requires authentication ' | ||
'middleware to be installed. Edit your MIDDLEWARE setting to insert ' | ||
'`django.contrib.auth.middleware.AuthenticationMiddleware` before ' | ||
'`magic_admin.django.auth.middleware.MagicAuthMiddleware`.' | ||
) | ||
|
||
identity_token = self._load_identity_token_from_sources(request) | ||
|
||
if not self._is_request_related_to_magic(identity_token): | ||
return | ||
|
||
try: | ||
Magic().Token.validate(identity_token) | ||
except ( | ||
DIDTokenError, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about these exceptions, I just imported all the existing errors and plugged them here. I'm not sure what's the intended behavior here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #25 (comment) |
||
APIConnectionError, | ||
RateLimitingError, | ||
BadRequestError, | ||
AuthenticationError, | ||
ForbiddenError, | ||
APIError, | ||
) as e: | ||
logout(request) | ||
raise e | ||
return | ||
|
||
if request.user.is_anonymous: | ||
try: | ||
self._attempt_handling_anonymous_user(request, identity_token) | ||
except UnableToLoadUserFromIdentityToken: | ||
return | ||
|
||
if ( | ||
request.user.public_address and | ||
request.user.public_address != Magic().User.get_public_address( | ||
identity_token, | ||
) | ||
): | ||
logout(request) | ||
return | ||
|
||
if self.can_user_be_logged_in(request.user): | ||
self._persist_data_in_session(request, identity_token) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from django.contrib.auth.models import AnonymousUser | ||
from django.db import models | ||
from django.utils.translation import gettext_lazy as _ | ||
|
||
|
||
class MagicUserMixin(models.Model): | ||
|
||
public_address = models.CharField( | ||
_('public address'), | ||
max_length=128, | ||
unique=True, | ||
blank=True, | ||
null=True, | ||
db_index=True, | ||
default=None, | ||
) | ||
|
||
class Meta: | ||
abstract = True | ||
|
||
@classmethod | ||
def get_by_public_address(cls, public_address): | ||
return cls.objects.get(public_address=public_address) | ||
|
||
def update_user_with_public_address( | ||
self, | ||
user_id, | ||
public_address, | ||
user_obj=None, | ||
): | ||
if user_obj is None: | ||
user_obj = self.objects.get(pk=user_id) | ||
|
||
if user_obj.public_address == public_address: | ||
return user_obj | ||
|
||
user_obj.public_address = public_address | ||
user_obj.save(update_fields=['public_address']) | ||
|
||
return user_obj | ||
|
||
|
||
class MagicAnonymousUser(AnonymousUser): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
MAGIC_IDENTITY_KEY = '_magic_identity_token' | ||
MAGIC_AUTH_BACKEND = 'magic_admin.django.auth.backends.MagicAuthBackend' | ||
|
||
|
||
class MagicAuthBackendMode: | ||
|
||
DJANGO_DEFAULT_AUTH = 0 | ||
MAGIC = 1 | ||
|
||
ALLOWED_MODES = frozenset([DJANGO_DEFAULT_AUTH, MAGIC]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
class MagicDjangoException(Exception): | ||
pass | ||
|
||
|
||
class UnsupportedAuthMode(MagicDjangoException): | ||
pass | ||
|
||
|
||
class PublicAddressDoesNotExist(MagicDjangoException): | ||
pass | ||
|
||
|
||
class UserEmailMissmatch(MagicDjangoException): | ||
pass | ||
|
||
|
||
class MissingUserEmailInput(MagicDjangoException): | ||
pass | ||
|
||
|
||
class MissingAuthorizationHeader(MagicDjangoException): | ||
pass | ||
|
||
|
||
class UnableToLoadUserFromIdentityToken(MagicDjangoException): | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from django.contrib.auth import logout as django_logout | ||
|
||
from magic_admin.magic import Magic | ||
from magic_admin.utils.logging import log_debug | ||
|
||
|
||
def logout(request): | ||
user = request.user | ||
|
||
if not user.is_anonymous and user.public_address: | ||
Magic().User.logout_by_public_address(user.public_address) | ||
log_debug('Log out user from Magic') | ||
|
||
django_logout(request) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about these exceptions, I just imported all the existing errors and plugged them here. I'm not sure what's the intended behavior here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DIDTokenError
is the only error that you need to try-catch here as thevalidate
method will only raise theDIDTokenError
.https://docs.magic.link/admin-sdk/python/api-reference#validate and https://docs.magic.link/admin-sdk/python/api-reference#errors have information on the admin sdk error handling. Let me know if this answers your questions.