Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Less restrictive content-type header check for google authentication (ignores charset) #1382

Merged
23 changes: 23 additions & 0 deletions google/auth/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import base64
import calendar
import datetime
from email.message import Message
import sys
import urllib

Expand Down Expand Up @@ -63,6 +64,28 @@ def decorator(method):
return decorator


def parse_content_type(header_value):
"""Parse a 'content-type' header value to get just the plain media-type (without parameters).

This is done using the class Message from email.message as suggested in PEP 594
(because the cgi is now deprecated and will be removed in python 3.13,
see https://peps.python.org/pep-0594/#cgi).

Args:
header_value (str): The value of a 'content-type' header as a string.

Returns:
str: A string with just the lowercase media-type from the parsed 'content-type' header.
If the provided content-type is not parsable, returns 'text/plain',
the default value for textual files.
"""
m = Message()
m["content-type"] = header_value
return (
m.get_content_type()
) # Despite the name, actually returns just the media-type


def utcnow():
"""Returns the current UTC datetime.

Expand Down
5 changes: 4 additions & 1 deletion google/auth/compute_engine/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ def get(

if response.status == http_client.OK:
content = _helpers.from_bytes(response.data)
if response.headers["content-type"] == "application/json":
if (
_helpers.parse_content_type(response.headers["content-type"])
== "application/json"
):
try:
return json.loads(content)
except ValueError as caught_exc:
Expand Down
18 changes: 18 additions & 0 deletions tests/compute_engine/test__metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@ def test_get_success_json():
assert result[key] == value


def test_get_success_json_content_type_charset():
key, value = "foo", "bar"

data = json.dumps({key: value})
request = make_request(
data, headers={"content-type": "application/json; charset=UTF-8"}
)

result = _metadata.get(request, PATH)

request.assert_called_once_with(
method="GET",
url=_metadata._METADATA_ROOT + PATH,
headers=_metadata._METADATA_HEADERS,
)
assert result[key] == value


def test_get_success_retry():
key, value = "foo", "bar"

Expand Down
26 changes: 26 additions & 0 deletions tests/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,32 @@ def func2(): # pragma: NO COVER
_helpers.copy_docstring(SourceClass)(func2)


def test_parse_content_type_plain():
assert _helpers.parse_content_type("text/html") == "text/html"
assert _helpers.parse_content_type("application/xml") == "application/xml"
assert _helpers.parse_content_type("application/json") == "application/json"


def test_parse_content_type_with_parameters():
content_type_html = "text/html; charset=UTF-8"
content_type_xml = "application/xml; charset=UTF-16; version=1.0"
content_type_json = "application/json; charset=UTF-8; indent=2"
assert _helpers.parse_content_type(content_type_html) == "text/html"
assert _helpers.parse_content_type(content_type_xml) == "application/xml"
assert _helpers.parse_content_type(content_type_json) == "application/json"


def test_parse_content_type_missing_or_broken():
content_type_foo = None
content_type_bar = ""
content_type_baz = "1234"
content_type_qux = " ; charset=UTF-8"
assert _helpers.parse_content_type(content_type_foo) == "text/plain"
assert _helpers.parse_content_type(content_type_bar) == "text/plain"
assert _helpers.parse_content_type(content_type_baz) == "text/plain"
assert _helpers.parse_content_type(content_type_qux) == "text/plain"


def test_utcnow():
assert isinstance(_helpers.utcnow(), datetime.datetime)

Expand Down