Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
This:
  * adds coverage for the get_oauth2_token() method
  * changes AuthUrls to a TypedDict
  * changes the url used for personal token access in gateway
  • Loading branch information
gravesm committed Oct 15, 2024
1 parent bd842fb commit e33d9ea
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 6 deletions.
20 changes: 14 additions & 6 deletions awxkit/awxkit/api/pages/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import logging
import typing

from requests.auth import HTTPBasicAuth

Expand All @@ -12,6 +13,11 @@
log = logging.getLogger(__name__)


class AuthUrls(typing.TypedDict):
access_token: str
personal_token: str


class Base(Page):
def silent_delete(self):
"""Delete the object. If it's already deleted, ignore the error"""
Expand Down Expand Up @@ -147,21 +153,21 @@ def _request_token(self, auth_urls, username, password, client_id, description,
HTTPBasicAuth(client_id, client_secret)(req)
req.headers['Content-Type'] = 'application/x-www-form-urlencoded'
resp = self.connection.post(
auth_urls.access_token,
auth_urls["access_token"],
data={"grant_type": "password", "username": username, "password": password, "scope": scope},
headers=req.headers,
)
elif client_id:
req.headers['Content-Type'] = 'application/x-www-form-urlencoded'
resp = self.connection.post(
auth_urls.access_token,
auth_urls["access_token"],
data={"grant_type": "password", "username": username, "password": password, "client_id": client_id, "scope": scope},
headers=req.headers,
)
else:
HTTPBasicAuth(username, password)(req)
resp = self.connection.post(
f"{auth_urls.personal_token}{username}/personal_tokens/",
auth_urls['personal_token'],
json={"description": description, "application": None, "scope": scope},
headers=req.headers,
)
Expand All @@ -178,13 +184,15 @@ def get_oauth2_token(self, username='', password='', client_id=None, description
default_cred = config.credentials.default
username = username or default_cred.username
password = password or default_cred.password

Check warning on line 186 in awxkit/awxkit/api/pages/base.py

View check run for this annotation

Codecov / codecov/patch

awxkit/awxkit/api/pages/base.py#L184-L186

Added lines #L184 - L186 were not covered by tests
AuthUrls = collections.namedtuple("AuthUrls", ["access_token", "personal_token"])
# Try gateway first, fallback to controller
urls: AuthUrls = {"access_token": "/o/token/", "personal_token": f"{config.gateway_base_path}v1/tokens/"}
try:
urls = AuthUrls(access_token="/o/token/", personal_token=f"{config.gateway_base_path}v1/users/")
return self._request_token(urls, username, password, client_id, description, client_secret, scope)
except exc.NotFound:
urls = AuthUrls(access_token=f"{config.api_base_path}o/token/", personal_token=f"{config.api_base_path}v2/users/")
urls: AuthUrls = {

Check warning on line 192 in awxkit/awxkit/api/pages/base.py

View check run for this annotation

Codecov / codecov/patch

awxkit/awxkit/api/pages/base.py#L188-L192

Added lines #L188 - L192 were not covered by tests
"access_token": f"{config.api_base_path}o/token/",
"personal_token": f"{config.api_base_path}v2/users/{username}/personal_tokens/",
}
return self._request_token(urls, username, password, client_id, description, client_secret, scope)

Check warning on line 196 in awxkit/awxkit/api/pages/base.py

View check run for this annotation

Codecov / codecov/patch

awxkit/awxkit/api/pages/base.py#L196

Added line #L196 was not covered by tests

def load_session(self, username='', password=''):
Expand Down
59 changes: 59 additions & 0 deletions awxkit/test/api/pages/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from unittest.mock import Mock, PropertyMock

from http.client import NOT_FOUND
import pytest

from awxkit.api.pages import Base
from awxkit.config import config


@pytest.fixture(autouse=True)
def setup_config():
config.credentials = {"default": {"username": "foo", "password": "bar"}}
config.base_url = ""


@pytest.fixture
def response():
r = Mock()
r.status_code = NOT_FOUND
r.json.return_value = {
"token": "my_personal_token",
"access_token": "my_token",
}
return r


@pytest.mark.parametrize(
"kwargs,url,token",
[
({"client_id": "foo", "client_secret": "bar"}, "/o/token/", "my_token"),
({"client_id": "foo"}, "/o/token/", "my_token"),
({}, "/api/gateway/v1/tokens/", "my_personal_token"),
],
)
def test_get_oauth2_token_from_gateway(mocker, response, kwargs, url, token):
post = mocker.patch("requests.Session.post", return_value=response)
base = Base()
ret = base.get_oauth2_token(**kwargs)
assert post.call_count == 1
assert post.call_args.args[0] == url
assert ret == token


@pytest.mark.parametrize(
"kwargs,url,token",
[
({"client_id": "foo", "client_secret": "bar"}, "/api/o/token/", "my_token"),
({"client_id": "foo"}, "/api/o/token/", "my_token"),
({}, "/api/v2/users/foo/personal_tokens/", "my_personal_token"),
],
)
def test_get_oauth2_token_from_controller(mocker, response, kwargs, url, token):
type(response).ok = PropertyMock(side_effect=[False, True])
post = mocker.patch("requests.Session.post", return_value=response)
base = Base()
ret = base.get_oauth2_token(**kwargs)
assert post.call_count == 2
assert post.call_args.args[0] == url
assert ret == token

0 comments on commit e33d9ea

Please sign in to comment.