Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't require client-secret for outlook token requests #65

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 56 additions & 46 deletions scripts/sasl-xoauth2-tool.in
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import sys
import urllib.parse
import urllib.request

from typing import Optional,Dict,Union,IO,Any
from typing import Optional, Dict, Union, IO, Any

try:
import argcomplete #type: ignore
import argcomplete # type: ignore
except ImportError:
argcomplete = None

Expand All @@ -34,23 +34,23 @@ GOOGLE_OAUTH2_TOKEN_URL = 'https://accounts.google.com/o/oauth2/token'
GOOGLE_OAUTH2_RESULT_PATH = '/oauth2_result'


def url_safe_escape(instring:str) -> str:
def url_safe_escape(instring: str) -> str:
return urllib.parse.quote(instring, safe='~-._')


def gmail_redirect_uri(local_port:int) -> str:
def gmail_redirect_uri(local_port: int) -> str:
return 'http:https://127.0.0.1:%d%s' % (local_port, GOOGLE_OAUTH2_RESULT_PATH)


def gmail_get_auth_url(client_id:str, scope:str, local_port:int) -> str:
def gmail_get_auth_url(client_id: str, scope: str, local_port: int) -> str:
client_id = url_safe_escape(client_id)
scope = url_safe_escape(scope)
redirect_uri = url_safe_escape(gmail_redirect_uri(local_port))
return '{}?client_id={}&scope={}&response_type={}&redirect_uri={}'.format(
GOOGLE_OAUTH2_AUTH_URL, client_id, scope, 'code', redirect_uri)


def gmail_get_token_from_code(client_id:str, client_secret:str, authorization_code:str, local_port:int) -> Any:
def gmail_get_token_from_code(client_id: str, client_secret: str, authorization_code: str, local_port: int) -> Any:
params = {}
params['client_id'] = client_id
params['client_secret'] = client_secret
Expand All @@ -62,9 +62,9 @@ def gmail_get_token_from_code(client_id:str, client_secret:str, authorization_co
return json.loads(response)


def gmail_get_RequestHandler(client_id:str, client_secret:str, output_file:IO[str]) -> type:
def gmail_get_RequestHandler(client_id: str, client_secret: str, output_file: IO[str]) -> type:
class GMailRequestHandler(http.server.BaseHTTPRequestHandler):
def log_request(self, code:Union[int,str]='-', size:Union[int,str]='-') -> None:
def log_request(self, code: Union[int, str] = '-', size: Union[int, str] = '-') -> None:
# Silence request logging.
return

Expand Down Expand Up @@ -103,11 +103,11 @@ def gmail_get_RequestHandler(client_id:str, client_secret:str, output_file:IO[st
if len(qs['code']) != 1:
return None
return qs['code'][0]

return GMailRequestHandler


def get_token_gmail(client_id:str, client_secret:str, scope:str, output_file:IO[str]) -> None:
def get_token_gmail(client_id: str, client_secret: str, scope: str, output_file: IO[str]) -> None:
request_handler_class = gmail_get_RequestHandler(
client_id,
client_secret,
Expand All @@ -128,9 +128,9 @@ OUTLOOK_REDIRECT_URI = "https://login.microsoftonline.com/common/oauth2/nativecl
OUTLOOK_SCOPE = "openid offline_access https://outlook.office.com/SMTP.Send"


def outlook_get_authorization_code(client_id:str, tenant:str) -> str:
def outlook_get_authorization_code(client_id: str, tenant: str) -> str:
url = f"https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize"
query:Dict[str,str] = {}
query: Dict[str, str] = {}
query['client_id'] = client_id
query['response_type'] = 'code'
query['redirect_uri'] = OUTLOOK_REDIRECT_URI
Expand All @@ -141,7 +141,7 @@ def outlook_get_authorization_code(client_id:str, tenant:str) -> str:
f"{url}?{urllib.parse.urlencode(query)}\n",
file=sys.stderr)

resulting_url_input:str = input("Resulting URL: ")
resulting_url_input: str = input("Resulting URL: ")
if OUTLOOK_REDIRECT_URI not in resulting_url_input:
raise Exception(f"Resulting URL does not contain expected prefix: {OUTLOOK_REDIRECT_URI}")
resulting_url = urllib.parse.urlparse(resulting_url_input)
Expand All @@ -151,11 +151,12 @@ def outlook_get_authorization_code(client_id:str, tenant:str) -> str:
return code["code"][0]


def outlook_get_initial_tokens(client_id:str, client_secret:str, tenant:str, code:str) -> Dict[str,Union[str,int]]:
def outlook_get_initial_tokens(client_id: str, client_secret: str, tenant: str, code: str) -> Dict[str, Union[str, int]]:
url = f"https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token"
token_request:Dict[str,str] = {}
token_request: Dict[str, str] = {}
token_request['client_id'] = client_id
token_request['client_secret'] = client_secret
if client_secret:
token_request['client_secret'] = client_secret
token_request['scope'] = OUTLOOK_SCOPE
token_request['code'] = code
token_request['redirect_uri'] = OUTLOOK_REDIRECT_URI
Expand All @@ -165,7 +166,7 @@ def outlook_get_initial_tokens(client_id:str, client_secret:str, tenant:str, cod
urllib.request.Request(
url,
data=urllib.parse.urlencode(token_request).encode('ascii'),
headers={ "Content-Type": "application/x-www-form-urlencoded" }))
headers={"Content-Type": "application/x-www-form-urlencoded"}))
if resp.code != 200:
raise Exception(f"Request failed: {resp.code}")
try:
Expand All @@ -175,11 +176,11 @@ def outlook_get_initial_tokens(client_id:str, client_secret:str, tenant:str, cod
'refresh_token': content["refresh_token"],
'expiry': 0,
}
except:
except Exception:
raise Exception(f"Tokens not found in response: {content}")


def get_token_outlook(client_id:str, client_secret:str, tenant:str, output_file:IO[str]) -> None:
def get_token_outlook(client_id: str, client_secret: str, tenant: str, output_file: IO[str]) -> None:
code = outlook_get_authorization_code(client_id, tenant)
tokens = outlook_get_initial_tokens(client_id, client_secret, tenant, code)
json.dump(tokens, output_file, indent=4)
Expand All @@ -192,14 +193,12 @@ subparse = parser.add_subparsers()


def argparse_get_parser() -> argparse.ArgumentParser:
return parser
return parser


def subcommand_get_token(args:argparse.Namespace) -> None:
if not args.client_secret:
def subcommand_get_token(args: argparse.Namespace) -> None:
if args.client_secret == ':':
args.client_secret = input('Please enter OAuth2 client secret: ')
if not args.client_secret:
parser.error("fetching initial token requires 'client-secret' argument.")
if args.service == 'outlook':
if not args.tenant:
parser.error("'outlook' service requires 'tenant' argument.")
Expand All @@ -210,6 +209,8 @@ def subcommand_get_token(args:argparse.Namespace) -> None:
args.output_file,
)
elif args.service == 'gmail':
if not args.client_secret:
parser.error("'gmail' service requires 'client-secret' argument.")
if not args.scope:
parser.error("'gmail' service requires 'scope' argument.")
get_token_gmail(
Expand All @@ -220,7 +221,10 @@ def subcommand_get_token(args:argparse.Namespace) -> None:
)


sp_get_token = subparse.add_parser('get-token', description='Fetches initial access and refresh tokens from an OAuth 2 provider')
sp_get_token = subparse.add_parser(
'get-token',
description='Fetches initial access and refresh tokens from an OAuth 2 provider',
)
sp_get_token.set_defaults(func=subcommand_get_token)
sp_get_token.add_argument(
'service', choices=['outlook', 'gmail'],
Expand All @@ -236,7 +240,7 @@ sp_get_token.add_argument(
)
sp_get_token.add_argument(
'--client-secret',
help="required for both services, will prompt the user if blank",
help="required for gmail, sometimes required for outlook; will prompt if value is ':'",
)
sp_get_token.add_argument(
'--scope',
Expand All @@ -248,31 +252,37 @@ sp_get_token.add_argument(
)


def subcommand_test_config(args:argparse.Namespace) -> None:
subprocess_args = [TEST_TOOL_PATH]
if args.config_file:
subprocess_args.extend(['--config', args.config_file])
result = subprocess.run(subprocess_args, shell=False)
sys.exit(result.returncode)
def subcommand_test_config(args: argparse.Namespace) -> None:
subprocess_args = [TEST_TOOL_PATH]
if args.config_file:
subprocess_args.extend(['--config', args.config_file])
result = subprocess.run(subprocess_args, shell=False)
sys.exit(result.returncode)


sp_test_config = subparse.add_parser('test-config', description='Tests a sasl-xoauth2 config file for syntax errors')
sp_test_config = subparse.add_parser(
'test-config',
description='Tests a sasl-xoauth2 config file for syntax errors',
)
sp_test_config.set_defaults(func=subcommand_test_config)
sp_test_config.add_argument(
'--config-file',
help="config file path (defaults to '%s')" % DEFAULT_CONFIG_FILE,
)


def subcommand_test_token_refresh(args:argparse.Namespace) -> None:
subprocess_args = [TEST_TOOL_PATH, '--token', args.token_file]
if args.config_file:
subprocess_args.extend(['--config', args.config_file])
result = subprocess.run(subprocess_args, shell=False)
sys.exit(result.returncode)
def subcommand_test_token_refresh(args: argparse.Namespace) -> None:
subprocess_args = [TEST_TOOL_PATH, '--token', args.token_file]
if args.config_file:
subprocess_args.extend(['--config', args.config_file])
result = subprocess.run(subprocess_args, shell=False)
sys.exit(result.returncode)


sp_test_token_refresh = subparse.add_parser('test-token-refresh', description='Tests that a token can be refreshed (i.e., that the OAuth 2 flow is working correctly)')
sp_test_token_refresh = subparse.add_parser(
'test-token-refresh',
description='Tests that a token can be refreshed (i.e., that the OAuth 2 flow is working correctly)',
)
sp_test_token_refresh.set_defaults(func=subcommand_test_token_refresh)
sp_test_token_refresh.add_argument(
'--config-file',
Expand All @@ -295,14 +305,14 @@ def main() -> None:
'Maybe you want to "apt install python3-argcomplete"')
sys.exit(1)
try:
args = parser.parse_intermixed_args()
except:
args = parser.parse_args()
args = parser.parse_intermixed_args()
except Exception:
args = parser.parse_args()
if hasattr(args, 'func'):
args.func(args)
args.func(args)
else:
parser.print_help()
sys.exit(1)
parser.print_help()
sys.exit(1)


if __name__ == '__main__':
Expand Down