diff --git a/fossology/groups.py b/fossology/groups.py index 2d8c23b..c6ba8df 100644 --- a/fossology/groups.py +++ b/fossology/groups.py @@ -4,8 +4,7 @@ import logging from typing import List -import fossology -from fossology.exceptions import FossologyApiError, FossologyUnsupported +from fossology.exceptions import FossologyApiError from fossology.obj import Group, MemberPerm, UserGroupMember logger = logging.getLogger(__name__) @@ -15,20 +14,22 @@ class Groups: """Class dedicated to all "groups" related endpoints""" - def list_groups(self) -> List: + def list_groups(self, deletable: bool = False) -> List[Group]: """Get the list of groups (accessible groups for user, all groups for admin) + If parameter deletable is True, the method will return only deletable groups. + API Endpoint: GET /groups + :param deletable: wether to limit the scope only to deletable groups + :type deletable: bool (default: False) :return: a list of groups :rtype: list() :raises FossologyApiError: if the REST call failed """ - if fossology.versiontuple(self.version) < fossology.versiontuple("1.2.1"): - description = f"Endpoint /groups is not supported by your Fossology API version {self.version}" - raise FossologyUnsupported(description) - - response = self.session.get(f"{self.api}/groups") + endpoint = f"{self.api}/groups" + endpoint += "/deletable" if deletable else "" + response = self.session.get(endpoint) if response.status_code == 200: groups_list = [] response_list = response.json() @@ -37,10 +38,13 @@ def list_groups(self) -> List: groups_list.append(single_group) return groups_list else: - description = f"Unable to get a list of groups for {self.user.name}" + deletable = "deletable " if deletable else "" + description = ( + f"Unable to get a list of {deletable}groups for {self.user.name}" + ) raise FossologyApiError(description, response) - def list_group_members(self, group_id: int) -> List: + def list_group_members(self, group_id: int) -> List[UserGroupMember]: """Get the list of members for a given group (accessible groups for user, all groups for admin) API Endpoint: GET /groups/{id}/members @@ -49,10 +53,6 @@ def list_group_members(self, group_id: int) -> List: :rtype: list() :raises FossologyApiError: if the REST call failed """ - if fossology.versiontuple(self.version) < fossology.versiontuple("1.5.0"): - description = f"Endpoint /groups/id/members is not supported by your Fossology API version {self.version}" - raise FossologyUnsupported(description) - response = self.session.get(f"{self.api}/groups/{group_id}/members") if response.status_code == 200: members_list = [] @@ -74,20 +74,32 @@ def create_group(self, name): :type name: str :raises FossologyApiError: if the REST call failed """ - if fossology.versiontuple(self.version) < fossology.versiontuple("1.2.1"): - description = f"Endpoint /groups is not supported by your Fossology API version {self.version}" - raise FossologyUnsupported(description) - headers = {"name": f"{name}"} response = self.session.post(f"{self.api}/groups", headers=headers) if response.status_code == 200: logger.info(f"Group '{name}' has been added") - return else: description = f"Group {name} already exists, failed to create group or no group name provided" raise FossologyApiError(description, response) + def delete_group(self, group_id: int): + """Create a group + + API Endpoint: DELETE /groups/{group_id} + + :param group_id: the id of the group + :type group_id: int + :raises FossologyApiError: if the REST call failed + """ + response = self.session.delete(f"{self.api}/groups/{group_id}") + + if response.status_code == 202: + logger.info(f"Group {group_id} will be deleted") + else: + description = f"Group {group_id} could not be deleted" + raise FossologyApiError(description, response) + def add_group_member( self, group_id: int, user_id: int, perm: MemberPerm = MemberPerm.USER ): @@ -103,10 +115,6 @@ def add_group_member( :type perm: MemberPerm :raises FossologyApiError: if the REST call failed """ - if fossology.versiontuple(self.version) < fossology.versiontuple("1.5.0"): - description = f"Endpoint /groups/id/user/id is not supported by your Fossology API version {self.version}" - raise FossologyUnsupported(description) - data = dict() data["perm"] = perm.value response = self.session.post( @@ -121,4 +129,29 @@ def add_group_member( f"An error occurred while adding user {user_id} to group {group_id}" ) raise FossologyApiError(description, response) - return + + def delete_group_member(self, group_id: int, user_id: int): + """Delete a user from a group + + API Endpoint: DELETE /groups/{group_id}/user/{user_id} + + :param group_id: the id of the group + :param user_id: the id of the user + :type group_id: int + :type user_id: int + :raises FossologyApiError: if the REST call failed + """ + response = self.session.delete(f"{self.api}/groups/{group_id}/user/{user_id}") + if response.status_code == 200: + logger.info(f"User {user_id} will be removed from group {group_id}.") + elif response.status_code == 400: + description = f"Validation error while removing member {user_id} from group {group_id}." + raise FossologyApiError(description, response) + elif response.status_code == 404: + description = f"Member {user_id} or group {group_id} not found." + raise FossologyApiError(description, response) + else: + description = ( + f"An error occurred while deleting user {user_id} from group {group_id}" + ) + raise FossologyApiError(description, response) diff --git a/fossology/users.py b/fossology/users.py index 962fe5b..826f200 100644 --- a/fossology/users.py +++ b/fossology/users.py @@ -104,7 +104,7 @@ def create_user(self, user_spec: dict): response = self.session.post(f"{self.api}/users", json=user_spec) if response.status_code == 201: logger.info( - f"User {user_spec['name']} was created, user list_users() to get more information." + f"User {user_spec['name']} was created, call list_users() to get more information." ) elif response.status_code == 409: logger.info(f"User {user_spec['name']} already exists.") diff --git a/tests/conftest.py b/tests/conftest.py index 5ec8085..53e3b1b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,23 +5,14 @@ import os import secrets import time -from typing import Dict +from typing import Dict, Generator import pytest from click.testing import CliRunner import fossology from fossology.exceptions import AuthenticationError, FossologyApiError -from fossology.obj import ( - AccessLevel, - Agents, - Folder, - JobStatus, - TokenScope, - Upload, - User, - versiontuple, -) +from fossology.obj import AccessLevel, Agents, JobStatus, TokenScope, Upload logger = logging.getLogger("fossology") console = logging.StreamHandler() @@ -157,7 +148,7 @@ def test_file_path() -> str: @pytest.fixture(scope="session") -def upload_folder(foss: fossology.Fossology) -> Folder: +def upload_folder(foss: fossology.Fossology) -> Generator: name = "UploadFolderTest" desc = "Created via the Fossology Python API" folder = foss.create_folder(foss.rootFolder, name, description=desc) @@ -166,7 +157,7 @@ def upload_folder(foss: fossology.Fossology) -> Folder: @pytest.fixture(scope="session") -def move_folder(foss: fossology.Fossology) -> Folder: +def move_folder(foss: fossology.Fossology) -> Generator: folder = foss.create_folder( foss.rootFolder, "MoveUploadTest", "Test move upload function" ) @@ -178,7 +169,7 @@ def move_folder(foss: fossology.Fossology) -> Folder: def upload( foss: fossology.Fossology, test_file_path: str, -) -> Upload: +) -> Generator: upload = foss.upload_file( foss.rootFolder, file=test_file_path, @@ -195,7 +186,7 @@ def upload( @pytest.fixture(scope="session") def upload_with_jobs( foss: fossology.Fossology, test_file_path: str, foss_schedule_agents: dict -) -> Upload: +) -> Generator: upload = foss.upload_file( foss.rootFolder, file=test_file_path, @@ -211,14 +202,13 @@ def upload_with_jobs( time.sleep(5) -@pytest.fixture() -def created_foss_user(foss: fossology.Fossology, foss_user: dict) -> User: - if versiontuple(foss.version) < versiontuple("1.5.1"): - pytest.skip(f"user creation is not supported by API version {foss.version}") +@pytest.fixture(scope="session") +def created_foss_user(foss: fossology.Fossology, foss_user: dict) -> Generator: foss.create_user(foss_user) for user in foss.list_users(): if user.name == foss_user["name"]: - return user + yield user + foss.delete_user(user) # foss_cli specific @@ -233,7 +223,7 @@ def click_test_file() -> str: @pytest.fixture(scope="session") -def click_test_dict(foss_server) -> str: +def click_test_dict(foss_server) -> dict: d = dict() d["IS_REQUEST_FOR_HELP"] = False d["IS_REQUEST_FOR_CONFIG"] = False diff --git a/tests/test_groups.py b/tests/test_groups.py index 2a56411..80c59de 100644 --- a/tests/test_groups.py +++ b/tests/test_groups.py @@ -2,73 +2,157 @@ # SPDX-License-Identifier: MIT import secrets +from unittest.mock import Mock, call import pytest import responses import fossology -from fossology.exceptions import FossologyApiError, FossologyUnsupported -from fossology.obj import Group, MemberPerm, User +from fossology.exceptions import FossologyApiError +from fossology.obj import MemberPerm, User +# Helper functions +def get_group(foss: fossology.Fossology, name: str) -> int: + for group in foss.list_groups(): + if group.name == name: + return group + + +def verify_user_group_membership( + foss: fossology.Fossology, group_id: int, user_id: int +) -> bool: + for member in foss.list_group_members(group_id): + if member.user.id == user_id: + assert member.group_perm == MemberPerm.ADVISOR.value + return True + return False + + +# Test functions @responses.activate def test_list_groups_error(foss_server: str, foss: fossology.Fossology): - if fossology.versiontuple(foss.version) < fossology.versiontuple("1.2.1"): - with pytest.raises(FossologyUnsupported) as excinfo: - foss.list_groups() - assert ( - "Endpoint /groups is not supported by your Fossology API version" - in str(excinfo.value) - ) - else: - responses.add(responses.GET, f"{foss_server}/api/v1/groups", status=500) - with pytest.raises(FossologyApiError) as excinfo: - foss.list_groups() - assert f"Unable to get a list of groups for {foss.user.name}" in str( - excinfo.value - ) + responses.add(responses.GET, f"{foss_server}/api/v1/groups", status=500) + with pytest.raises(FossologyApiError) as excinfo: + foss.list_groups() + assert f"Unable to get a list of groups for {foss.user.name}" in str(excinfo.value) + + +@responses.activate +def test_delete_group_member_error(foss_server: str, foss: fossology.Fossology): + responses.add( + responses.DELETE, f"{foss_server}/api/v1/groups/42/user/84", status=400 + ) + with pytest.raises(FossologyApiError) as excinfo: + foss.delete_group_member(42, 84) + assert "Validation error while removing member 84 from group 42." in str( + excinfo.value + ) + + +@responses.activate +def test_delete_group_error(foss_server: str, foss: fossology.Fossology): + group_id = secrets.randbelow(10) + responses.add( + responses.DELETE, f"{foss_server}/api/v1/groups/{group_id}", status=500 + ) + with pytest.raises(FossologyApiError) as excinfo: + foss.delete_group(group_id) + assert f"Group {group_id} could not be deleted" in str(excinfo.value) def test_create_group(foss: fossology.Fossology): - if fossology.versiontuple(foss.version) < fossology.versiontuple("1.2.1"): - with pytest.raises(FossologyUnsupported) as excinfo: - foss.create_group("FossGroupTest") - assert ( - "Endpoint /groups is not supported by your Fossology API version" - in str(excinfo.value) - ) - else: - name = secrets.token_urlsafe(8) + name = secrets.token_urlsafe(8) + foss.create_group(name) + group = get_group(foss, name) + assert group + + # Recreate group to test API response 400 + with pytest.raises(FossologyApiError) as excinfo: foss.create_group(name) - groups = foss.list_groups() - assert groups - assert type(groups[0]) == Group + assert ( + f"Group {name} already exists, failed to create group or no group name provided" + in str(excinfo.value) + ) + # Cleanup + foss.delete_group(group.id) - # Recreate group to test API response 400 - with pytest.raises(FossologyApiError) as excinfo: - foss.create_group(name) - assert ( - f"Group {name} already exists, failed to create group or no group name provided" - in str(excinfo.value) - ) + +def test_list_deletable_groups(foss_server: str, foss: fossology.Fossology): + name = secrets.token_urlsafe(8) + foss.create_group(name) + group = get_group(foss, name) + assert group + deletable_groups = foss.list_groups(deletable=True) + assert deletable_groups + for group in deletable_groups: + foss.delete_group(group.id) def test_list_group_members(foss: fossology.Fossology, created_foss_user: User): - if fossology.versiontuple(foss.version) < fossology.versiontuple("1.5.0"): - with pytest.raises(FossologyUnsupported) as excinfo: - foss.list_group_members(2) - assert ( - "Endpoint /groups/id/members is not supported by your Fossology API version" - in str(excinfo.value) - ) - else: - name = secrets.token_urlsafe(8) - foss.create_group(name) - for group in foss.list_groups(): - if group.name == name: - group_id = group.id - foss.add_group_member(group_id, created_foss_user.id, MemberPerm.ADVISOR) - for member in foss.list_group_members(group.id): - if member.user.id == created_foss_user.id: - assert member.group_perm == MemberPerm.ADVISOR.value - foss.delete_user(created_foss_user) + name = secrets.token_urlsafe(8) + foss.create_group(name) + group = get_group(foss, name) + foss.add_group_member(group.id, created_foss_user.id, MemberPerm.ADVISOR) + assert verify_user_group_membership(foss, group.id, created_foss_user.id) + # Cleanup + foss.delete_group(group.id) + + +def test_add_group_member_if_user_does_not_exists_raises_fossology_api_error( + foss: fossology.Fossology, +): + name = secrets.token_urlsafe(8) + foss.create_group(name) + group = get_group(foss, name) + with pytest.raises(FossologyApiError) as excinfo: + foss.add_group_member(group.id, 42, MemberPerm.ADVISOR) + assert f"An error occurred while adding user 42 to group {group.id}" in str( + excinfo.value + ) + # Cleanup + foss.delete_group(group.id) + + +def test_add_group_member_if_member_already_exists_returns_400( + foss: fossology.Fossology, created_foss_user: User, monkeypatch: pytest.MonkeyPatch +): + mocked_logger = Mock() + monkeypatch.setattr("fossology.groups.logger", mocked_logger) + name = secrets.token_urlsafe(8) + foss.create_group(name) + group = get_group(foss, name) + foss.add_group_member(group.id, created_foss_user.id, MemberPerm.ADVISOR) + assert verify_user_group_membership(foss, group.id, created_foss_user.id) + assert ( + call(f"User {created_foss_user.id} has been added to group {group.id}.") + in mocked_logger.info.mock_calls + ) + foss.add_group_member(group.id, created_foss_user.id, MemberPerm.ADVISOR) + assert ( + call(f"User {created_foss_user.id} is already a member of group {group.id}.") + in mocked_logger.info.mock_calls + ) + # Cleanup + foss.delete_group(group.id) + + +def test_delete_group_member(foss: fossology.Fossology, created_foss_user: User): + name = secrets.token_urlsafe(8) + foss.create_group(name) + group = get_group(foss, name) + foss.add_group_member(group.id, created_foss_user.id, MemberPerm.ADVISOR) + assert verify_user_group_membership(foss, group.id, created_foss_user.id) + foss.delete_group_member(group.id, created_foss_user.id) + assert not verify_user_group_membership(foss, group.id, created_foss_user.id) + + # Cleanup + foss.delete_group(group.id) + + +def test_delete_group_member_if_group_does_not_exists_raises_fossology_api_error( + foss: fossology.Fossology, created_foss_user: User +): + with pytest.raises(FossologyApiError) as excinfo: + foss.delete_group_member(42, created_foss_user.id) + assert f"Member {created_foss_user.id} or group 42 not found." in str(excinfo.value)